You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/26 18:07:39 UTC
[06/43] hbase git commit: HBASE-14978 Don't allow Multi to retain too
many blocks
HBASE-14978 Don't allow Multi to retain too many blocks
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/217036d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/217036d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/217036d8
Branch: refs/heads/hbase-12439
Commit: 217036d81693f3806d271f6f5d5d7d8f17823745
Parents: bbfff0d
Author: Elliott Clark <ec...@apache.org>
Authored: Mon Dec 14 18:16:03 2015 -0800
Committer: Elliott Clark <ec...@apache.org>
Committed: Thu Dec 17 16:13:50 2015 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/ipc/RpcCallContext.java | 3 +
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 11 +++
.../hbase/regionserver/RSRpcServices.java | 71 +++++++++++++-----
.../hbase/client/TestMultiRespectsLimits.java | 75 ++++++++++++++++++--
4 files changed, 137 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/217036d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index d14e9b2..f41dfbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -90,4 +90,7 @@ public interface RpcCallContext extends Delayable {
* onerous.
*/
void incrementResponseCellSize(long cellSize);
+
+ long getResponseBlockSize();
+ void incrementResponseBlockSize(long blockSize);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/217036d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 2bef247..ed8d37d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -319,6 +319,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private RpcCallback callback;
private long responseCellSize = 0;
+ private long responseBlockSize = 0;
private boolean retryImmediatelySupported;
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
@@ -551,6 +552,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
responseCellSize += cellSize;
}
+ @Override
+ public long getResponseBlockSize() {
+ return responseBlockSize;
+ }
+
+ @Override
+ public void incrementResponseBlockSize(long blockSize) {
+ responseBlockSize += blockSize;
+ }
+
/**
* If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is
http://git-wip-us.apache.org/repos/asf/hbase/blob/217036d8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bba38f79..c82d49c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ByteBufferedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@@ -658,6 +660,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
IOException sizeIOE = null;
+ Object lastBlock = null;
for (ClientProtos.Action action : actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
@@ -665,7 +668,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (context != null
&& context.isRetryImmediatelySupported()
- && context.getResponseCellSize() > maxQuotaResultSize) {
+ && (context.getResponseCellSize() > maxQuotaResultSize
+ || context.getResponseBlockSize() > maxQuotaResultSize)) {
// We're storing the exception since the exception and reason string won't
// change after the response size limit is reached.
@@ -674,15 +678,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Throwing will kill the JVM's JIT.
//
// Instead just create the exception and then store it.
- sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: "
- + context.getResponseCellSize());
+ sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
+ + " CellSize: " + context.getResponseCellSize()
+ + " BlockSize: " + context.getResponseBlockSize());
// Only report the exception once since there's only one request that
// caused the exception. Otherwise this number will dominate the exceptions count.
rpcServer.getMetrics().exception(sizeIOE);
}
- // Now that there's an exception is know to be created
+ // Now that there's an exception is known to be created
// use it for the response.
//
// This will create a copy in the builder.
@@ -755,9 +760,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else {
pbResult = ProtobufUtil.toResult(r);
}
- if (context != null) {
- context.incrementResponseCellSize(Result.getTotalSizeOfCells(r));
- }
+ lastBlock = addSize(context, r, lastBlock);
resultOrExceptionBuilder =
ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
}
@@ -1070,6 +1073,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return 0L;
}
+ /**
+ * Method to account for the size of retained cells and retained data blocks.
+ * @return an object that represents the last referenced block from this response.
+ */
+ Object addSize(RpcCallContext context, Result r, Object lastBlock) {
+ if (context != null && !r.isEmpty()) {
+ for (Cell c : r.rawCells()) {
+ context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
+
+ // Since byte buffers can point all kinds of crazy places it's harder to keep track
+ // of which blocks are kept alive by what byte buffer.
+ // So we make a guess.
+ if (c instanceof ByteBufferedCell) {
+ ByteBufferedCell bbCell = (ByteBufferedCell) c;
+ ByteBuffer bb = bbCell.getValueByteBuffer();
+ if (bb != lastBlock) {
+ context.incrementResponseBlockSize(bb.capacity());
+ lastBlock = bb;
+ }
+ } else {
+ // We're using the last block being the same as the current block as
+ // a proxy for pointing to a new block. This won't be exact.
+ // If there are multiple gets that bounce back and forth
+ // Then it's possible that this will over count the size of
+ // referenced blocks. However it's better to over count and
+ // use two rpcs than to OOME the regionserver.
+ byte[] valueArray = c.getValueArray();
+ if (valueArray != lastBlock) {
+ context.incrementResponseBlockSize(valueArray.length);
+ lastBlock = valueArray;
+ }
+ }
+
+ }
+ }
+ return lastBlock;
+ }
+
RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
throws LeaseStillHeldException {
Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
@@ -2467,6 +2508,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
assert scanner != null;
RpcCallContext context = RpcServer.getCurrentCall();
+ Object lastBlock = null;
quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
@@ -2500,11 +2542,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
scanner, results, rows);
if (!results.isEmpty()) {
for (Result r : results) {
- for (Cell cell : r.rawCells()) {
- if (context != null) {
- context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
- }
- }
+ lastBlock = addSize(context, r, lastBlock);
}
}
if (bypass != null && bypass.booleanValue()) {
@@ -2601,13 +2639,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
moreRows = scanner.nextRaw(values, scannerContext);
if (!values.isEmpty()) {
- for (Cell cell : values) {
- if (context != null) {
- context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
- }
- }
final boolean partial = scannerContext.partialResultFormed();
- results.add(Result.create(values, null, stale, partial));
+ Result r = Result.create(values, null, stale, partial);
+ lastBlock = addSize(context, r, lastBlock);
+ results.add(r);
i++;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/217036d8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
index 47dd7be..28e1855 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -36,6 +37,7 @@ import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import static junit.framework.TestCase.assertEquals;
@@ -73,7 +75,7 @@ public class TestMultiRespectsLimits {
TEST_UTIL.loadTable(t, FAMILY, false);
// Split the table to make sure that the chunking happens accross regions.
- try (final Admin admin = TEST_UTIL.getHBaseAdmin()) {
+ try (final Admin admin = TEST_UTIL.getAdmin()) {
admin.split(name);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
@@ -87,16 +89,79 @@ public class TestMultiRespectsLimits {
for (int i = 0; i < MAX_SIZE; i++) {
gets.add(new Get(HBaseTestingUtility.ROWS[i]));
}
- Result[] results = t.get(gets);
- assertEquals(MAX_SIZE, results.length);
+
RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
BaseSource s = rpcServer.getMetrics().getMetricsSource();
+ long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
+ long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s);
+
+ Result[] results = t.get(gets);
+ assertEquals(MAX_SIZE, results.length);
// Cells from TEST_UTIL.loadTable have a length of 27.
// Multiplying by less than that gives an easy lower bound on size.
// However in reality each kv is being reported as much higher than that.
- METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s);
+ METRICS_ASSERT.assertCounterGt("exceptions",
+ startingExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
+ METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
+ startingMultiExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
+ }
+
+ @Test
+ public void testBlockMultiLimits() throws Exception {
+ final TableName name = TableName.valueOf("testBlockMultiLimits");
+ Table t = TEST_UTIL.createTable(name, FAMILY);
+
+ final HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+ RpcServerInterface rpcServer = regionServer.getRpcServer();
+ BaseSource s = rpcServer.getMetrics().getMetricsSource();
+ long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
+ long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s);
+
+ byte[] row = Bytes.toBytes("TEST");
+ byte[][] cols = new byte[][]{
+ Bytes.toBytes("0"), // Get this
+ Bytes.toBytes("1"), // Buffer
+ Bytes.toBytes("2"), // Get This
+ Bytes.toBytes("3"), // Buffer
+ };
+
+ // Set the value size so that one result will be less than the MAX_SIE
+ // however the block being reference will be larger than MAX_SIZE.
+ // This should cause the regionserver to try and send a result immediately.
+ byte[] value = new byte[MAX_SIZE - 200];
+ ThreadLocalRandom.current().nextBytes(value);
+
+ for (byte[] col:cols) {
+ Put p = new Put(row);
+ p.addImmutable(FAMILY, col, value);
+ t.put(p);
+ }
+
+ // Make sure that a flush happens
+ try (final Admin admin = TEST_UTIL.getAdmin()) {
+ admin.flush(name);
+ TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return regionServer.getOnlineRegions(name).get(0).getMaxFlushedSeqId() > 3;
+ }
+ });
+ }
+
+ List<Get> gets = new ArrayList<>(2);
+ Get g0 = new Get(row);
+ g0.addColumn(FAMILY, cols[0]);
+ gets.add(g0);
+
+ Get g2 = new Get(row);
+ g2.addColumn(FAMILY, cols[2]);
+ gets.add(g2);
+
+ Result[] results = t.get(gets);
+ assertEquals(2, results.length);
+ METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
- (MAX_SIZE * 25) / MAX_SIZE, s);
+ startingMultiExceptions, s);
}
}