You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/26 18:07:39 UTC

[06/43] hbase git commit: HBASE-14978 Don't allow Multi to retain too many blocks

HBASE-14978 Don't allow Multi to retain too many blocks


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/217036d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/217036d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/217036d8

Branch: refs/heads/hbase-12439
Commit: 217036d81693f3806d271f6f5d5d7d8f17823745
Parents: bbfff0d
Author: Elliott Clark <ec...@apache.org>
Authored: Mon Dec 14 18:16:03 2015 -0800
Committer: Elliott Clark <ec...@apache.org>
Committed: Thu Dec 17 16:13:50 2015 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcCallContext.java |  3 +
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 11 +++
 .../hbase/regionserver/RSRpcServices.java       | 71 +++++++++++++-----
 .../hbase/client/TestMultiRespectsLimits.java   | 75 ++++++++++++++++++--
 4 files changed, 137 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/217036d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index d14e9b2..f41dfbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -90,4 +90,7 @@ public interface RpcCallContext extends Delayable {
    * onerous.
    */
   void incrementResponseCellSize(long cellSize);
+
+  long getResponseBlockSize();
+  void incrementResponseBlockSize(long blockSize);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/217036d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 2bef247..ed8d37d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -319,6 +319,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     private RpcCallback callback;
 
     private long responseCellSize = 0;
+    private long responseBlockSize = 0;
     private boolean retryImmediatelySupported;
 
     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
@@ -551,6 +552,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       responseCellSize += cellSize;
     }
 
+    @Override
+    public long getResponseBlockSize() {
+      return responseBlockSize;
+    }
+
+    @Override
+    public void incrementResponseBlockSize(long blockSize) {
+      responseBlockSize += blockSize;
+    }
+
     /**
      * If we have a response, and delay is not set, then respond
      * immediately.  Otherwise, do not respond to client.  This is

http://git-wip-us.apache.org/repos/asf/hbase/blob/217036d8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bba38f79..c82d49c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ByteBufferedCell;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
@@ -658,6 +660,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     List<ClientProtos.Action> mutations = null;
     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
     IOException sizeIOE = null;
+    Object lastBlock = null;
     for (ClientProtos.Action action : actions.getActionList()) {
       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
       try {
@@ -665,7 +668,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
         if (context != null
             && context.isRetryImmediatelySupported()
-            && context.getResponseCellSize() > maxQuotaResultSize) {
+            && (context.getResponseCellSize() > maxQuotaResultSize
+              || context.getResponseBlockSize() > maxQuotaResultSize)) {
 
           // We're storing the exception since the exception and reason string won't
           // change after the response size limit is reached.
@@ -674,15 +678,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             // Throwing will kill the JVM's JIT.
             //
             // Instead just create the exception and then store it.
-            sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: "
-                    + context.getResponseCellSize());
+            sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
+                + " CellSize: " + context.getResponseCellSize()
+                + " BlockSize: " + context.getResponseBlockSize());
 
             // Only report the exception once since there's only one request that
             // caused the exception. Otherwise this number will dominate the exceptions count.
             rpcServer.getMetrics().exception(sizeIOE);
           }
 
-          // Now that there's an exception is know to be created
+          // Now that there's an exception is known to be created
           // use it for the response.
           //
           // This will create a copy in the builder.
@@ -755,9 +760,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           } else {
             pbResult = ProtobufUtil.toResult(r);
           }
-          if (context != null) {
-            context.incrementResponseCellSize(Result.getTotalSizeOfCells(r));
-          }
+          lastBlock = addSize(context, r, lastBlock);
           resultOrExceptionBuilder =
             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
         }
@@ -1070,6 +1073,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return 0L;
   }
 
+  /**
+   * Method to account for the size of retained cells and retained data blocks.
+   * @return an object that represents the last referenced block from this response.
+   */
+  Object addSize(RpcCallContext context, Result r, Object lastBlock) {
+    if (context != null && !r.isEmpty()) {
+      for (Cell c : r.rawCells()) {
+        context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
+
+        // Since byte buffers can point all kinds of crazy places it's harder to keep track
+        // of which blocks are kept alive by what byte buffer.
+        // So we make a guess.
+        if (c instanceof ByteBufferedCell) {
+          ByteBufferedCell bbCell = (ByteBufferedCell) c;
+          ByteBuffer bb = bbCell.getValueByteBuffer();
+          if (bb != lastBlock) {
+            context.incrementResponseBlockSize(bb.capacity());
+            lastBlock = bb;
+          }
+        } else {
+          // We're using the last block being the same as the current block as
+          // a proxy for pointing to a new block. This won't be exact.
+          // If there are multiple gets that bounce back and forth
+          // Then it's possible that this will over count the size of
+          // referenced blocks. However it's better to over count and
+          // use two rpcs than to OOME the regionserver.
+          byte[] valueArray = c.getValueArray();
+          if (valueArray != lastBlock) {
+            context.incrementResponseBlockSize(valueArray.length);
+            lastBlock = valueArray;
+          }
+        }
+
+      }
+    }
+    return lastBlock;
+  }
+
   RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
       throws LeaseStillHeldException {
     Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
@@ -2467,6 +2508,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
       assert scanner != null;
       RpcCallContext context = RpcServer.getCurrentCall();
+      Object lastBlock = null;
 
       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
@@ -2500,11 +2542,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               scanner, results, rows);
             if (!results.isEmpty()) {
               for (Result r : results) {
-                for (Cell cell : r.rawCells()) {
-                  if (context != null) {
-                    context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
-                  }
-                }
+                lastBlock = addSize(context, r, lastBlock);
               }
             }
             if (bypass != null && bypass.booleanValue()) {
@@ -2601,13 +2639,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                   moreRows = scanner.nextRaw(values, scannerContext);
 
                   if (!values.isEmpty()) {
-                    for (Cell cell : values) {
-                      if (context != null) {
-                        context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
-                      }
-                    }
                     final boolean partial = scannerContext.partialResultFormed();
-                    results.add(Result.create(values, null, stale, partial));
+                    Result r = Result.create(values, null, stale, partial);
+                    lastBlock = addSize(context, r, lastBlock);
+                    results.add(r);
                     i++;
                   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/217036d8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
index 47dd7be..28e1855 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -36,6 +37,7 @@ import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static junit.framework.TestCase.assertEquals;
 
@@ -73,7 +75,7 @@ public class TestMultiRespectsLimits {
     TEST_UTIL.loadTable(t, FAMILY, false);
 
     // Split the table to make sure that the chunking happens accross regions.
-    try (final Admin admin = TEST_UTIL.getHBaseAdmin()) {
+    try (final Admin admin = TEST_UTIL.getAdmin()) {
       admin.split(name);
       TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
         @Override
@@ -87,16 +89,79 @@ public class TestMultiRespectsLimits {
     for (int i = 0; i < MAX_SIZE; i++) {
       gets.add(new Get(HBaseTestingUtility.ROWS[i]));
     }
-    Result[] results = t.get(gets);
-    assertEquals(MAX_SIZE, results.length);
+
     RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
     BaseSource s = rpcServer.getMetrics().getMetricsSource();
+    long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
+    long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s);
+
+    Result[] results = t.get(gets);
+    assertEquals(MAX_SIZE, results.length);
 
     // Cells from TEST_UTIL.loadTable have a length of 27.
     // Multiplying by less than that gives an easy lower bound on size.
     // However in reality each kv is being reported as much higher than that.
-    METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s);
+    METRICS_ASSERT.assertCounterGt("exceptions",
+        startingExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
+    METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
+        startingMultiExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
+  }
+
+  @Test
+  public void testBlockMultiLimits() throws Exception {
+    final TableName name = TableName.valueOf("testBlockMultiLimits");
+    Table t = TEST_UTIL.createTable(name, FAMILY);
+
+    final HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    RpcServerInterface rpcServer = regionServer.getRpcServer();
+    BaseSource s = rpcServer.getMetrics().getMetricsSource();
+    long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
+    long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s);
+
+    byte[] row = Bytes.toBytes("TEST");
+    byte[][] cols = new byte[][]{
+        Bytes.toBytes("0"), // Get this
+        Bytes.toBytes("1"), // Buffer
+        Bytes.toBytes("2"), // Get This
+        Bytes.toBytes("3"), // Buffer
+    };
+
+    // Set the value size so that one result will be less than the MAX_SIE
+    // however the block being reference will be larger than MAX_SIZE.
+    // This should cause the regionserver to try and send a result immediately.
+    byte[] value = new byte[MAX_SIZE - 200];
+    ThreadLocalRandom.current().nextBytes(value);
+
+    for (byte[] col:cols) {
+      Put p = new Put(row);
+      p.addImmutable(FAMILY, col, value);
+      t.put(p);
+    }
+
+    // Make sure that a flush happens
+    try (final Admin admin = TEST_UTIL.getAdmin()) {
+      admin.flush(name);
+      TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return regionServer.getOnlineRegions(name).get(0).getMaxFlushedSeqId() > 3;
+        }
+      });
+    }
+
+    List<Get> gets = new ArrayList<>(2);
+    Get g0 = new Get(row);
+    g0.addColumn(FAMILY, cols[0]);
+    gets.add(g0);
+
+    Get g2 = new Get(row);
+    g2.addColumn(FAMILY, cols[2]);
+    gets.add(g2);
+
+    Result[] results = t.get(gets);
+    assertEquals(2, results.length);
+    METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s);
     METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
-        (MAX_SIZE * 25) / MAX_SIZE, s);
+        startingMultiExceptions, s);
   }
 }