You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2013/07/02 01:34:16 UTC

svn commit: r1498741 - in /hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/

Author: cmccabe
Date: Mon Jul  1 23:34:15 2013
New Revision: 1498741

URL: http://svn.apache.org/r1498741
Log:
HADOOP-9676.  Make maximum RPC buffer size configurable (Colin Patrick McCabe)

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1498741&r1=1498740&r2=1498741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt Mon Jul  1 23:34:15 2013
@@ -130,6 +130,9 @@ Release 2.1.0-beta - 2013-07-02
 
     HADOOP-9619 Mark stability of .proto files (sanjay Radia)
 
+    HADOOP-9676.  Make maximum RPC buffer size configurable (Colin Patrick
+    McCabe)
+
   OPTIMIZATIONS
 
     HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1498741&r1=1498740&r2=1498741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Mon Jul  1 23:34:15 2013
@@ -64,6 +64,11 @@ public class CommonConfigurationKeys ext
     "ipc.server.read.threadpool.size";
   /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
   public static final int     IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
+  
+  public static final String IPC_MAXIMUM_DATA_LENGTH =
+      "ipc.maximum.data.length";
+  
+  public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024;
 
   /** How many calls per handler are allowed in the queue. */
   public static final String  IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1498741&r1=1498740&r2=1498741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Jul  1 23:34:15 2013
@@ -340,6 +340,7 @@ public abstract class Server {
   private int maxQueueSize;
   private final int maxRespSize;
   private int socketSendBufferSize;
+  private final int maxDataLength;
   private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
 
   volatile private boolean running = true;         // true while server runs
@@ -1377,7 +1378,22 @@ public abstract class Server {
         }
       }
     }
-    
+
+    private void checkDataLength(int dataLength) throws IOException {
+      if (dataLength < 0) {
+        String error = "Unexpected data length " + dataLength +
+                       "!! from " + getHostAddress();
+        LOG.warn(error);
+        throw new IOException(error);
+      } else if (dataLength > maxDataLength) {
+        String error = "Requested data length " + dataLength +
+              " is longer than maximum configured RPC length " + 
+            maxDataLength + ".  RPC came from " + getHostAddress();
+        LOG.warn(error);
+        throw new IOException(error);
+      }
+    }
+
     public int readAndProcess() throws IOException, InterruptedException {
       while (true) {
         /* Read at most one RPC. If the header is not read completely yet
@@ -1439,11 +1455,7 @@ public abstract class Server {
             dataLengthBuffer.clear();
             return 0; // ping message
           }
-          
-          if (dataLength < 0) {
-            LOG.warn("Unexpected data length " + dataLength + "!! from " + 
-                getHostAddress());
-          }
+          checkDataLength(dataLength);
           data = ByteBuffer.allocate(dataLength);
         }
         
@@ -1978,6 +1990,8 @@ public abstract class Server {
     this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
+    this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
     if (queueSizePerHandler != -1) {
       this.maxQueueSize = queueSizePerHandler;
     } else {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java?rev=1498741&r1=1498740&r2=1498741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java Mon Jul  1 23:34:15 2013
@@ -24,7 +24,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
@@ -113,6 +115,7 @@ public class TestProtoBufRpc {
   @Before
   public  void setUp() throws IOException { // Setup server for both protocols
     conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
     // Set RPC engine to protobuf RPC engine
     RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
 
@@ -230,4 +233,24 @@ public class TestProtoBufRpc {
           re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION));
     }
   }
-}
\ No newline at end of file
+  
+  @Test(timeout=6000)
+  public void testExtraLongRpc() throws Exception {
+    TestRpcService2 client = getClient2();
+    final String shortString = StringUtils.repeat("X", 4);
+    EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
+        .setMessage(shortString).build();
+    // short message goes through
+    EchoResponseProto echoResponse = client.echo2(null, echoRequest);
+    
+    final String longString = StringUtils.repeat("X", 4096);
+    echoRequest = EchoRequestProto.newBuilder()
+        .setMessage(longString).build();
+    try {
+      echoResponse = client.echo2(null, echoRequest);
+      Assert.fail("expected extra-long RPC to fail");
+    } catch (ServiceException se) {
+      // expected
+    }
+  }
+}