You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/11/19 15:59:31 UTC
svn commit: r1715193 - in /qpid/java/trunk/broker-core/src:
main/java/org/apache/qpid/server/model/port/
main/java/org/apache/qpid/server/transport/
test/java/org/apache/qpid/server/transport/
Author: kwall
Date: Thu Nov 19 14:59:31 2015
New Revision: 1715193
URL: http://svn.apache.org/viewvc?rev=1715193&view=rev
Log:
QPID-6871: [Java Broker] Make accepting port socket backlog configurable with context variable
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Thu Nov 19 14:59:31 2015
@@ -57,6 +57,7 @@ public interface AmqpPort<X extends Amqp
String PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "qpid.port.amqp.threadPool.keep_alive_timeout";
String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
+ String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
String PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = "qpid.port.amqp.outboundMessageBufferSize";
@ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
@@ -76,9 +77,13 @@ public interface AmqpPort<X extends Amqp
long DEFAULT_PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = 60; // Minutes
@SuppressWarnings("unused")
- @ManagedContextDefault( name = PORT_AMQP_NUMBER_OF_SELECTORS)
+ @ManagedContextDefault(name = PORT_AMQP_NUMBER_OF_SELECTORS)
long DEFAULT_PORT_AMQP_NUMBER_OF_SELECTORS = Math.max(DEFAULT_PORT_AMQP_THREAD_POOL_SIZE / 8, 1);
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = PORT_AMQP_ACCEPT_BACKLOG)
+ int DEFAULT_PORT_AMQP_ACCEPT_BACKLOG = 1024;
+
String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
@ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Thu Nov 19 14:59:31 2015
@@ -38,14 +38,13 @@ import org.apache.qpid.transport.Transpo
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
-import org.apache.qpid.transport.network.io.IoNetworkTransport;
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
public class NonBlockingNetworkTransport
{
- private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
@@ -55,13 +54,13 @@ public class NonBlockingNetworkTransport
private final ServerSocketChannel _serverSocket;
private final int _timeout;
private final NetworkConnectionScheduler _scheduler;
- private final AmqpPort _port;
+ private final AmqpPort<?> _port;
private final InetSocketAddress _address;
public NonBlockingNetworkTransport(final MultiVersionProtocolEngineFactory factory,
final EnumSet<TransportEncryption> encryptionSet,
final NetworkConnectionScheduler scheduler,
- final AmqpPort port)
+ final AmqpPort<?> port)
{
try
{
@@ -85,10 +84,11 @@ public class NonBlockingNetworkTransport
_address = new InetSocketAddress(bindingAddress, portNumber);
}
+ int acceptBacklog = port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG);
_serverSocket = ServerSocketChannel.open();
_serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- _serverSocket.bind(_address);
+ _serverSocket.bind(_address, acceptBacklog);
_serverSocket.configureBlocking(false);
_encryptionSet = encryptionSet;
_scheduler = scheduler;
@@ -99,7 +99,6 @@ public class NonBlockingNetworkTransport
{
throw new TransportException("Failed to start AMQP on port : " + port, e);
}
-
}
public void start()
@@ -142,7 +141,6 @@ public class NonBlockingNetworkTransport
if (engine != null)
{
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _port.isTcpNoDelay());
- socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
final int bufferSize = _port.getNetworkBufferSize();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Thu Nov 19 14:59:31 2015
@@ -117,6 +117,7 @@ public class TCPandSSLTransportTest exte
when(port.getSSLContext()).thenReturn(sslContext);
when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
+ when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
port,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org