You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/03/23 00:32:42 UTC
[3/3] hbase git commit: HBASE-15212 RRCServer should enforce max
request size
HBASE-15212 RRCServer should enforce max request size
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a8bd8eb9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a8bd8eb9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a8bd8eb9
Branch: refs/heads/branch-1.3
Commit: a8bd8eb92e1df54b4e58ed402f8e5a75977df8d1
Parents: cdd7137
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Mar 22 16:23:15 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Mar 22 16:24:43 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 15 ++++++-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 44 ++++++++++++++++++--
2 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8bd8eb9/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index f8c92c9..4a9a84e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -262,15 +262,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected HBaseRPCErrorHandler errorHandler = null;
+ static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
/** Default value for above params */
+ private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
private static final ObjectMapper MAPPER = new ObjectMapper();
+ private final int maxRequestSize;
private final int warnResponseTime;
private final int warnResponseSize;
private final Server server;
@@ -1225,6 +1228,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected String hostAddress;
protected int remotePort;
ConnectionHeader connectionHeader;
+
/**
* Codec the client asked use.
*/
@@ -1609,11 +1613,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
if (dataLength < 0) { // A data length of zero is legal.
- throw new IllegalArgumentException("Unexpected data length "
+ throw new DoNotRetryIOException("Unexpected data length "
+ dataLength + "!! from " + getHostAddress());
}
- // TODO: check dataLength against some limit so that the client cannot OOM the server
+ if (dataLength > maxRequestSize) {
+ throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from "
+ + getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \""
+ + MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)");
+ }
+
data = ByteBuffer.allocate(dataLength);
// Increment the rpc count. This counter will be decreased when we write
@@ -2062,6 +2071,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
+ this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
+
// Start the listener here and let it bind to the port
listener = new Listener(name);
this.port = listener.getAddress().getPort();
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8bd8eb9/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index ffe4d40..e8da9ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
+import org.apache.http.ConnectionClosedException;
import org.junit.Assert;
import org.junit.Test;
@@ -137,13 +138,17 @@ public abstract class AbstractTestIPC {
static class TestRpcServer extends RpcServer {
TestRpcServer() throws IOException {
- this(new FifoRpcScheduler(CONF, 1));
+ this(new FifoRpcScheduler(CONF, 1), CONF);
+ }
+
+ TestRpcServer(Configuration conf) throws IOException {
+ this(new FifoRpcScheduler(conf, 1), conf);
}
- TestRpcServer(RpcScheduler scheduler) throws IOException {
+ TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
super(null, "testRpcServer", Lists
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
- "localhost", 0), CONF, scheduler);
+ "localhost", 0), conf, scheduler);
}
@Override
@@ -267,7 +272,7 @@ public abstract class AbstractTestIPC {
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
- RpcServer rpcServer = new TestRpcServer(scheduler);
+ RpcServer rpcServer = new TestRpcServer(scheduler, CONF);
verify(scheduler).init((RpcScheduler.Context) anyObject());
AbstractRpcClient client = createRpcClient(CONF);
try {
@@ -292,6 +297,37 @@ public abstract class AbstractTestIPC {
}
}
+ /** Tests that the rpc scheduler is called when requests arrive. */
+ @Test
+ public void testRpcMaxRequestSize() throws IOException, InterruptedException {
+ Configuration conf = new Configuration(CONF);
+ conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
+ RpcServer rpcServer = new TestRpcServer(conf);
+ AbstractRpcClient client = createRpcClient(conf);
+ try {
+ rpcServer.start();
+ MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+ // set total RPC size bigger than 100 bytes
+ EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello.hello.hello.hello."
+ + "hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ try {
+ client.call(new PayloadCarryingRpcController(
+ CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
+ md.getOutputType().toProto(), User.getCurrent(), address,
+ new MetricsConnection.CallStats());
+ fail("RPC should have failed because it exceeds max request size");
+ } catch(ConnectionClosingException | ConnectionClosedException ex) {
+ // pass
+ }
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
/**
* Instance of RpcServer that echoes client hostAddress back to client
*/