You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/03/23 00:32:42 UTC

[3/3] hbase git commit: HBASE-15212 RRCServer should enforce max request size

HBASE-15212 RRCServer should enforce max request size


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a8bd8eb9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a8bd8eb9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a8bd8eb9

Branch: refs/heads/branch-1.3
Commit: a8bd8eb92e1df54b4e58ed402f8e5a75977df8d1
Parents: cdd7137
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Mar 22 16:23:15 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Mar 22 16:24:43 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 15 ++++++-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       | 44 ++++++++++++++++++--
 2 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a8bd8eb9/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index f8c92c9..4a9a84e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -262,15 +262,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
   protected HBaseRPCErrorHandler errorHandler = null;
 
+  static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
 
   /** Default value for above params */
+  private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
 
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
+  private final int maxRequestSize;
   private final int warnResponseTime;
   private final int warnResponseSize;
   private final Server server;
@@ -1225,6 +1228,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     protected String hostAddress;
     protected int remotePort;
     ConnectionHeader connectionHeader;
+
     /**
      * Codec the client asked use.
      */
@@ -1609,11 +1613,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
           }
         }
         if (dataLength < 0) { // A data length of zero is legal.
-          throw new IllegalArgumentException("Unexpected data length "
+          throw new DoNotRetryIOException("Unexpected data length "
               + dataLength + "!! from " + getHostAddress());
         }
 
-       // TODO: check dataLength against some limit so that the client cannot OOM the server
+        if (dataLength > maxRequestSize) {
+          throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from "
+              + getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \""
+              + MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)");
+        }
+
         data = ByteBuffer.allocate(dataLength);
 
         // Increment the rpc count. This counter will be decreased when we write
@@ -2062,6 +2071,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
 
+    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
+
     // Start the listener here and let it bind to the port
     listener = new Listener(name);
     this.port = listener.getAddress().getPort();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8bd8eb9/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index ffe4d40..e8da9ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.http.ConnectionClosedException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -137,13 +138,17 @@ public abstract class AbstractTestIPC {
   static class TestRpcServer extends RpcServer {
 
     TestRpcServer() throws IOException {
-      this(new FifoRpcScheduler(CONF, 1));
+      this(new FifoRpcScheduler(CONF, 1), CONF);
+    }
+
+    TestRpcServer(Configuration conf) throws IOException {
+      this(new FifoRpcScheduler(conf, 1), conf);
     }
 
-    TestRpcServer(RpcScheduler scheduler) throws IOException {
+    TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
       super(null, "testRpcServer", Lists
           .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
-          "localhost", 0), CONF, scheduler);
+          "localhost", 0), conf, scheduler);
     }
 
     @Override
@@ -267,7 +272,7 @@ public abstract class AbstractTestIPC {
   @Test
   public void testRpcScheduler() throws IOException, InterruptedException {
     RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
-    RpcServer rpcServer = new TestRpcServer(scheduler);
+    RpcServer rpcServer = new TestRpcServer(scheduler, CONF);
     verify(scheduler).init((RpcScheduler.Context) anyObject());
     AbstractRpcClient client = createRpcClient(CONF);
     try {
@@ -292,6 +297,37 @@ public abstract class AbstractTestIPC {
     }
   }
 
+  /** Tests that the rpc scheduler is called when requests arrive. */
+  @Test
+  public void testRpcMaxRequestSize() throws IOException, InterruptedException {
+    Configuration conf = new Configuration(CONF);
+    conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
+    RpcServer rpcServer = new TestRpcServer(conf);
+    AbstractRpcClient client = createRpcClient(conf);
+    try {
+      rpcServer.start();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      // set total RPC size bigger than 100 bytes
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello.hello.hello.hello."
+          + "hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello").build();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      try {
+        client.call(new PayloadCarryingRpcController(
+          CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
+          md.getOutputType().toProto(), User.getCurrent(), address,
+          new MetricsConnection.CallStats());
+        fail("RPC should have failed because it exceeds max request size");
+      } catch(ConnectionClosingException | ConnectionClosedException ex) {
+        // pass
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
   /**
    * Instance of RpcServer that echoes client hostAddress back to client
    */