You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/03/14 11:46:42 UTC

svn commit: r637047 - in /incubator/qpid/branches/M2.1/java: broker/etc/ broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/org/apache/qpid/client/protocol/ common/src/test/java/org/apache/mina/SocketIOTest/

Author: ritchiem
Date: Fri Mar 14 03:46:40 2008
New Revision: 637047

URL: http://svn.apache.org/viewvc?rev=637047&view=rev
Log:
QPID-592 : Parameterised the Read/Write buffer limits. On the broker extra config [read|write]BufferLimitSize on the client System properties qpid.[read|write].buffer.limit. All the defaults are 256k(262144).

Modified:
    incubator/qpid/branches/M2.1/java/broker/etc/config.xml
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
    incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java

Modified: incubator/qpid/branches/M2.1/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/etc/config.xml?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/M2.1/java/broker/etc/config.xml Fri Mar 14 03:46:40 2008
@@ -33,7 +33,9 @@
             <keystorePassword>keystorepass</keystorePassword>
         </ssl>-->
         <qpidnio>false</qpidnio>
-        <protectio>false</protectio>
+        <protectio>
+            <enabled>false</enabled>
+        </protectio>
         <transport>nio</transport>
         <port>5672</port>
         <sslport>8672</sslport>

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Fri Mar 14 03:46:40 2008
@@ -29,10 +29,8 @@
 import org.apache.mina.filter.ReadThrottleFilterBuilder;
 import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
 import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.SessionUtil;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQCodecFactory;
@@ -58,6 +56,11 @@
 
     private final IApplicationRegistry _applicationRegistry;
 
+    private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144";
+    private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144";
+
+    private final int BUFFER_READ_LIMIT_SIZE;
+    private final int BUFFER_WRITE_LIMIT_SIZE;
 
     public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
     {
@@ -67,6 +70,11 @@
     public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
     {
         _applicationRegistry = applicationRegistry;
+
+        // Read the configuration from the application registry
+        BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE));
+        BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE));
+
         _logger.debug("AMQPFastProtocolHandler created");
     }
 
@@ -115,27 +123,22 @@
 
         }
 
-        if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false))
+        if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false))
         {
             try
             {
 //        //Add IO Protection Filters
                 IoFilterChain chain = protocolSession.getFilterChain();
 
-                int buf_size = 32768;
-                if (protocolSession.getConfig() instanceof SocketSessionConfig)
-                {
-                    buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize();
-                }
 
                 protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
 
                 ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
-                readfilter.setMaximumConnectionBufferSize(buf_size);
+                readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
                 readfilter.attach(chain);
 
                 WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-                writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+                writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
                 writefilter.attach(chain);
 
                 protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Mar 14 03:46:40 2008
@@ -30,7 +30,6 @@
 import org.apache.mina.filter.codec.ProtocolCodecException;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -156,6 +155,12 @@
     /** Defines the default timeout to use for synchronous protocol commands. */
     private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
 
+    /** Default buffer size for pending messages reads */
+    private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
+
+    /** Default buffer size for pending messages writes */
+    private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
+
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
      *
@@ -219,19 +224,14 @@
                 //Add IO Protection Filters
                 IoFilterChain chain = session.getFilterChain();
 
-                int buf_size = 32768;
-                if (session.getConfig() instanceof SocketSessionConfig)
-                {
-                    buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
-                }
                 session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
 
                 ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
-                readfilter.setMaximumConnectionBufferSize(buf_size);
+                readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
                 readfilter.attach(chain);
 
                 WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-                writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+                writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
                 writefilter.attach(chain);
                 session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
 

Modified: incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java Fri Mar 14 03:46:40 2008
@@ -131,19 +131,20 @@
 
         private int _receivedCount = 0;
         private int _sentCount = 0;
+        private static final String DEFAULT_READ_BUFFER = "262144";
+        private static final String DEFAULT_WRITE_BUFFER = "262144";
 
         public void sessionCreated(IoSession session) throws Exception
         {
             IoFilterChain chain = session.getFilterChain();
-            int buf_size = ((SocketSessionConfig) session.getConfig()).getSendBufferSize();
 
             ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
-            readfilter.setMaximumConnectionBufferSize(buf_size);
+            readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER)));
             readfilter.attach(chain);
 
             WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-//            writefilter.setMaximumConnectionBufferCount(1000);
-            writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+
+            writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER)));
 
             writefilter.attach(chain);
         }

Modified: incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java Fri Mar 14 03:46:40 2008
@@ -39,6 +39,9 @@
 
     static public int _PORT = 9999;
 
+    private static final String DEFAULT_READ_BUFFER = "262144";
+    private static final String DEFAULT_WRITE_BUFFER = "262144";
+
 
     private static class TestHandler extends IoHandlerAdapter
     {
@@ -52,14 +55,14 @@
         {
             IoFilterChain chain = ioSession.getFilterChain();
 
-            int buf_size = ((SocketSessionConfig) ioSession.getConfig()).getReceiveBufferSize();
-
             ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
-            readfilter.setMaximumConnectionBufferSize(buf_size);
+            readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER)));
             readfilter.attach(chain);
 
             WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-            writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+
+            writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER)));
+
             writefilter.attach(chain);
 
         }