You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/03/14 11:46:42 UTC
svn commit: r637047 - in /incubator/qpid/branches/M2.1/java: broker/etc/
broker/src/main/java/org/apache/qpid/server/protocol/
client/src/main/java/org/apache/qpid/client/protocol/
common/src/test/java/org/apache/mina/SocketIOTest/
Author: ritchiem
Date: Fri Mar 14 03:46:40 2008
New Revision: 637047
URL: http://svn.apache.org/viewvc?rev=637047&view=rev
Log:
QPID-592 : Parameterised the Read/Write buffer limits. On the broker extra config [read|write]BufferLimitSize on the client System properties qpid.[read|write].buffer.limit. All the defaults are 256k(262144).
Modified:
incubator/qpid/branches/M2.1/java/broker/etc/config.xml
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
Modified: incubator/qpid/branches/M2.1/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/etc/config.xml?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/M2.1/java/broker/etc/config.xml Fri Mar 14 03:46:40 2008
@@ -33,7 +33,9 @@
<keystorePassword>keystorepass</keystorePassword>
</ssl>-->
<qpidnio>false</qpidnio>
- <protectio>false</protectio>
+ <protectio>
+ <enabled>false</enabled>
+ </protectio>
<transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Fri Mar 14 03:46:40 2008
@@ -29,10 +29,8 @@
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
@@ -58,6 +56,11 @@
private final IApplicationRegistry _applicationRegistry;
+ private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144";
+ private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144";
+
+ private final int BUFFER_READ_LIMIT_SIZE;
+ private final int BUFFER_WRITE_LIMIT_SIZE;
public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
{
@@ -67,6 +70,11 @@
public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
{
_applicationRegistry = applicationRegistry;
+
+ // Read the configuration from the application registry
+ BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE));
+ BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE));
+
_logger.debug("AMQPFastProtocolHandler created");
}
@@ -115,27 +123,22 @@
}
- if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false))
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false))
{
try
{
// //Add IO Protection Filters
IoFilterChain chain = protocolSession.getFilterChain();
- int buf_size = 32768;
- if (protocolSession.getConfig() instanceof SocketSessionConfig)
- {
- buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize();
- }
protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
writefilter.attach(chain);
protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Mar 14 03:46:40 2008
@@ -30,7 +30,6 @@
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -156,6 +155,12 @@
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+ /** Default buffer size for pending messages reads */
+ private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
+
+ /** Default buffer size for pending messages writes */
+ private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -219,19 +224,14 @@
//Add IO Protection Filters
IoFilterChain chain = session.getFilterChain();
- int buf_size = 32768;
- if (session.getConfig() instanceof SocketSessionConfig)
- {
- buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
- }
session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
writefilter.attach(chain);
session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
Modified: incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java Fri Mar 14 03:46:40 2008
@@ -131,19 +131,20 @@
private int _receivedCount = 0;
private int _sentCount = 0;
+ private static final String DEFAULT_READ_BUFFER = "262144";
+ private static final String DEFAULT_WRITE_BUFFER = "262144";
public void sessionCreated(IoSession session) throws Exception
{
IoFilterChain chain = session.getFilterChain();
- int buf_size = ((SocketSessionConfig) session.getConfig()).getSendBufferSize();
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER)));
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-// writefilter.setMaximumConnectionBufferCount(1000);
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+
+ writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER)));
writefilter.attach(chain);
}
Modified: incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java?rev=637047&r1=637046&r2=637047&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java Fri Mar 14 03:46:40 2008
@@ -39,6 +39,9 @@
static public int _PORT = 9999;
+ private static final String DEFAULT_READ_BUFFER = "262144";
+ private static final String DEFAULT_WRITE_BUFFER = "262144";
+
private static class TestHandler extends IoHandlerAdapter
{
@@ -52,14 +55,14 @@
{
IoFilterChain chain = ioSession.getFilterChain();
- int buf_size = ((SocketSessionConfig) ioSession.getConfig()).getReceiveBufferSize();
-
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER)));
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+
+ writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER)));
+
writefilter.attach(chain);
}