You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/15 14:50:21 UTC

svn commit: r1385057 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/...

Author: mbautin
Date: Sat Sep 15 12:50:20 2012
New Revision: 1385057

URL: http://svn.apache.org/viewvc?rev=1385057&view=rev
Log:
[HBASE-6747] [0.89-fb] Allow the HBase server to limit the size of results returned.

Author: aaiyer

Summary:
We have seen issues where fetching large rows can get the server
into a bad state. This results in Full GC issues, and/or the server
getting OutOfMemory Errors, that can be fatal.

We will henceforth bail quickly, and not pursue a request if it
tries to fetch more than (a configurable) number of bytes.

Test Plan: add a test to TestClientSide to test this behavior.

Reviewers: kannan, kranganathan

Differential Revision: https://phabricator.fb.com/D569695

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1385057&r1=1385056&r2=1385057&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Sat Sep 15 12:50:20 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -320,7 +321,8 @@ public class HFileReaderV2 extends Abstr
         cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
             cacheConf.isInMemory());
       }
-      ProfilingData pData = HRegionServer.threadLocalProfilingData.get();
+      Call call = HRegionServer.callContext.get();
+      ProfilingData pData = call == null ? null : call.getProfilingData();
       if (pData != null) {
         pData.incInt(ProfilingData.blockMissStr(
             hfileBlock.getBlockType().getCategory(),
@@ -359,7 +361,8 @@ public class HFileReaderV2 extends Abstr
             cachedBlock.getBlockType().getCategory();
           getSchemaMetrics().updateOnCacheHit(blockCategory, isCompaction);
 
-          ProfilingData pData = HRegionServer.threadLocalProfilingData.get();
+          Call call = HRegionServer.callContext.get();
+          ProfilingData pData = call == null ? null : call.getProfilingData();
           if (pData != null) {
             pData.incInt(ProfilingData.blockHitStr(
                 cachedBlock.getBlockType().getCategory(),

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1385057&r1=1385056&r2=1385057&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Sat Sep 15 12:50:20 2012
@@ -495,6 +495,7 @@ public class HBaseRPC {
     private final int warnResponseTime;
     private final int warnResponseSize;
 
+
     /**
      * Construct an RPC server.
      * @param instance the instance whose methods will be called
@@ -568,7 +569,8 @@ public class HBaseRPC {
           throw new IOException("Could not find requested method, the usual " +
               "cause is a version mismatch between client and server.");
         }
-        ProfilingData pData = HRegionServer.threadLocalProfilingData.get();
+        Call callInfo = HRegionServer.callContext.get();
+        ProfilingData pData = callInfo == null ? null : callInfo.getProfilingData();
         if (pData != null) {
           pData.addString(ProfilingData.RPC_METHOD_NAME, call.getMethodName ());
         }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1385057&r1=1385056&r2=1385057&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sat Sep 15 12:50:20 2012
@@ -229,7 +229,7 @@ public abstract class HBaseServer {
   }
 
   /** A call queued for handling. */
-  private static class Call {
+  public static class Call {
     protected int id;                             // the client's call id
     protected Writable param;                     // the parameter passed
     protected Connection connection;              // connection to client
@@ -243,6 +243,7 @@ public abstract class HBaseServer {
     protected boolean shouldProfile = false;
     protected ProfilingData profilingData = null;
     protected String tag = null;
+    protected long partialResponseSize; // size of the results collected so far
 
     public Call(int id, Writable param, Connection connection) {
       this.id = id;
@@ -250,6 +251,7 @@ public abstract class HBaseServer {
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
       this.response = null;
+      this.partialResponseSize = 0;
     }
     
     public void setTag(String tag) {
@@ -276,6 +278,14 @@ public abstract class HBaseServer {
       return this.compressionAlgo;
     }
 
+    public long getPartialResponseSize() {
+      return partialResponseSize;
+    }
+
+    public void setPartialResponseSize(long partialResponseSize) {
+      this.partialResponseSize = partialResponseSize;
+    }
+
     @Override
     public String toString() {
       return param.toString() + " from " + connection.toString();
@@ -284,6 +294,10 @@ public abstract class HBaseServer {
     public void setResponse(ByteBuffer response) {
       this.response = response;
     }
+
+    public ProfilingData getProfilingData(){
+      return this.profilingData;
+    }
   }
 
   /** Listens on the socket. Creates jobs for the handler threads*/
@@ -1086,8 +1100,10 @@ public abstract class HBaseServer {
           
           if (call.shouldProfile) {
             call.profilingData = new ProfilingData ();
-            HRegionServer.threadLocalProfilingData.set (call.profilingData);
+          } else {
+            call.profilingData = null;
           }
+          HRegionServer.callContext.set(call);
           
           CurCall.set(call);
           UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
@@ -1108,8 +1124,8 @@ public abstract class HBaseServer {
           if (call.shouldProfile) {
             call.profilingData.addLong(
                 ProfilingData.TOTAL_SERVER_TIME_MS, total);
-            HRegionServer.threadLocalProfilingData.remove ();
           }
+          HRegionServer.callContext.remove();
           
           int size = BUFFER_INITIAL_SIZE;
           if (value instanceof WritableWithSize) {
@@ -1158,12 +1174,12 @@ public abstract class HBaseServer {
             value.write(out);
             // write profiling data if requested
             if (call.getVersion () >= VERSION_RPCOPTIONS) {
-            	if (!call.shouldProfile) {
-            		out.writeBoolean(false);
-            	} else {
-            		out.writeBoolean(true);
-            		call.profilingData.write(out);
-            	}
+              if (!call.shouldProfile) {
+                out.writeBoolean(false);
+              } else {
+                out.writeBoolean(true);
+                call.profilingData.write(out);
+              }
             }
           } else {
             WritableUtils.writeString(out, errorClass);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1385057&r1=1385056&r2=1385057&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Sep 15 12:50:20 2012
@@ -118,6 +118,7 @@ import org.apache.hadoop.hbase.ipc.HBase
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
@@ -335,7 +336,7 @@ public class HRegionServer implements HR
   private String stopReason = "not stopping";
 
   // profiling threadlocal
-  public static final ThreadLocal<ProfilingData> threadLocalProfilingData = new ThreadLocal<ProfilingData> ();
+  public static final ThreadLocal<Call> callContext = new ThreadLocal<Call> ();
 
   /** Regionserver launched by the main method. Not used in tests. */
   private static HRegionServer mainRegionServer;
@@ -343,6 +344,20 @@ public class HRegionServer implements HR
   private final AtomicReference<HMasterRegionInterface> masterRef =
       new AtomicReference<HMasterRegionInterface>();
 
+  // maximum size (in bytes) for any allowed call. Applies to gets/nexts. If
+  // the size of the Result to be returned exceeds this value, the server will
+  // throw out an Exception. This is done to avoid OutOfMemory Errors, and
+  // large GC issues.
+  private static long responseSizeLimit;
+
+  public static long getResponseSizeLimit() {
+    return responseSizeLimit;
+  }
+
+  public static void setResponseSizeLimit(long responseSizeLimit) {
+    HRegionServer.responseSizeLimit = responseSizeLimit;
+  }
+
   /**
    * Starts a HRegionServer at the default location
    * @param conf
@@ -394,6 +409,9 @@ public class HRegionServer implements HR
         HConstants.HBASE_RPC_TIMEOUT_KEY,
         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
 
+    this.responseSizeLimit = conf.getLong("hbase.regionserver.results.size.max",
+        (long)Integer.MAX_VALUE); // set the max to 2G
+
     reinitialize();
     SchemaMetrics.configureGlobally(conf);
     cacheConfig = new CacheConfig(conf);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1385057&r1=1385056&r2=1385057&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Sep 15 12:50:20 2012
@@ -27,8 +27,11 @@ import java.util.NavigableSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -328,11 +331,16 @@ class StoreScanner extends NonLazyKeyVal
     KeyValue prevKV = null;
     List<KeyValue> results = new ArrayList<KeyValue>();
 
+
+    Call call = HRegionServer.callContext.get();
+    long quotaRemaining = (call == null) ? Long.MAX_VALUE
+        : HRegionServer.getResponseSizeLimit() - call.getPartialResponseSize();
+
     // Only do a sanity-check if store and comparator are available.
     KeyValue.KVComparator comparator =
         store != null ? store.getComparator() : null;
 
-    long cumulativeMetric = 0;
+    long addedResultsSize = 0;
     try {
       LOOP: while((kv = this.heap.peek()) != null) {
         // kv is no longer immutable due to KeyOnlyFilter! use copy for safety
@@ -365,9 +373,14 @@ class StoreScanner extends NonLazyKeyVal
             // add to results only if we have skipped #rowOffset kvs
             // also update metric accordingly
             if (this.countPerRow > storeOffset) {
-              if (metric != null) {
-                cumulativeMetric += copyKv.getLength();
+              addedResultsSize += copyKv.getLength();
+              if (addedResultsSize > quotaRemaining) {
+                LOG.warn("Result too large. Please consider using Batching."
+                    + " Cannot allow  operations that fetch more than "
+                    + HRegionServer.getResponseSizeLimit() + " bytes.");
+                throw new DoNotRetryIOException("Result too large");
               }
+
               results.add(copyKv);
             }
 
@@ -435,10 +448,15 @@ class StoreScanner extends NonLazyKeyVal
       }
     } finally { 
       // update the counter 
-      if (cumulativeMetric > 0 && metric != null) { 
+      if (addedResultsSize > 0 && metric != null) {
         HRegion.incrNumericMetric(this.metricNamePrefix + metric, 
-            cumulativeMetric);  
+            addedResultsSize);
       } 
+      // update the partial results size
+      if (call != null) {
+        call.setPartialResponseSize(call.getPartialResponseSize()
+            + addedResultsSize);
+      }
     }
 
     if (!results.isEmpty()) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1385057&r1=1385056&r2=1385057&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Sat Sep 15 12:50:20 2012
@@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -958,7 +959,8 @@ public class HLog implements Syncable {
     }
     long time = System.currentTimeMillis() - start;
     writeTime.inc(time);
-    ProfilingData pData = HRegionServer.threadLocalProfilingData.get();
+    Call call = HRegionServer.callContext.get();
+    ProfilingData pData = call == null ? null : call.getProfilingData();
     if (pData != null) {
       pData.addLong(ProfilingData.HLOG_WRITE_TIME_MS, time);
     }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1385057&r1=1385056&r2=1385057&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Sat Sep 15 12:50:20 2012
@@ -664,6 +664,76 @@ public class TestFromClientSide {
   }
 
   /**
+   * Test result limits. If we request a row(s) too large
+   * we should get an exception instead of the server trying until
+   * OOM.
+   */
+  @Test
+  public void testResultLimits() throws Exception {
+    // We want to set the max result size to something small
+    HRegionServer.setResponseSizeLimit(40000L);
+
+    byte [] TABLE = Bytes.toBytes("testResultLimits");
+    byte [][] ROWS = makeN(ROW, 3);
+    byte [][] FAMILIES = makeNAscii(FAMILY, 10);
+    byte [][] VALUES = new byte[3][];
+    VALUES[0] = new byte[2000];
+    VALUES[1] = new byte[3000];
+    VALUES[2] = new byte[5000];
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+
+    Get get1, get2, get3;
+    Put put;
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Insert one column to one family
+    ////////////////////////////////////////////////////////////////////////////
+
+    for (int r = 0; r < ROWS.length; r++) {
+      for (int f = 0; f < FAMILIES.length; f++) {
+        put = new Put(ROWS[r]);
+        put.add(FAMILIES[f], QUALIFIER, VALUES[r]);
+        ht.put(put);
+      }
+    }
+
+    // try with the small row.
+    // Expeced size 10 * 2000 < limit
+    get1 = new Get(ROWS[0]);
+    ht.get(get1);
+
+    // try with the small row.
+    // Expeced size 10 * 3000 < limit
+    get2 = new Get(ROWS[1]);
+    ht.get(get2);
+
+    // try with the large row.
+    // Expeced size 10 * 5000 > limit
+    try {
+      get3 = new Get(ROWS[2]);
+      ht.get(get3);
+      assert(false);
+    } catch (IOException e) {
+    }
+
+    // try with the multiple rows.
+    // Expeced size 10 * 2000 + 10 * 3000 > limit
+    try {
+      ArrayList<Get> list = new ArrayList<Get>();
+      list.add(get1);
+      list.add(get2);
+      ht.get(list);
+      assert(false);
+    } catch (IOException e) {
+    }
+
+    // reset the max response size
+    HRegionServer.setResponseSizeLimit(Integer.MAX_VALUE);
+  }
+
+
+  /**
    * Test basic puts, gets, scans, and deletes for a single row
    * in a multiple family table.
    */