You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/11/19 15:59:31 UTC

svn commit: r1715193 - in /qpid/java/trunk/broker-core/src: main/java/org/apache/qpid/server/model/port/ main/java/org/apache/qpid/server/transport/ test/java/org/apache/qpid/server/transport/

Author: kwall
Date: Thu Nov 19 14:59:31 2015
New Revision: 1715193

URL: http://svn.apache.org/viewvc?rev=1715193&view=rev
Log:
QPID-6871: [Java Broker] Make accepting port socket backlog configurable with context variable

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Thu Nov 19 14:59:31 2015
@@ -57,6 +57,7 @@ public interface AmqpPort<X extends Amqp
     String PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "qpid.port.amqp.threadPool.keep_alive_timeout";
 
     String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
+    String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
     String PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = "qpid.port.amqp.outboundMessageBufferSize";
 
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
@@ -76,9 +77,13 @@ public interface AmqpPort<X extends Amqp
     long DEFAULT_PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = 60; // Minutes
 
     @SuppressWarnings("unused")
-    @ManagedContextDefault( name = PORT_AMQP_NUMBER_OF_SELECTORS)
+    @ManagedContextDefault(name = PORT_AMQP_NUMBER_OF_SELECTORS)
     long DEFAULT_PORT_AMQP_NUMBER_OF_SELECTORS = Math.max(DEFAULT_PORT_AMQP_THREAD_POOL_SIZE / 8, 1);
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = PORT_AMQP_ACCEPT_BACKLOG)
+    int DEFAULT_PORT_AMQP_ACCEPT_BACKLOG = 1024;
+
     String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
 
     @ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Thu Nov 19 14:59:31 2015
@@ -38,14 +38,13 @@ import org.apache.qpid.transport.Transpo
 import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
-import org.apache.qpid.transport.network.io.IoNetworkTransport;
 
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
 public class NonBlockingNetworkTransport
 {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
     private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
                                                           CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
     private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
@@ -55,13 +54,13 @@ public class NonBlockingNetworkTransport
     private final ServerSocketChannel _serverSocket;
     private final int _timeout;
     private final NetworkConnectionScheduler _scheduler;
-    private final AmqpPort _port;
+    private final AmqpPort<?> _port;
     private final InetSocketAddress _address;
 
     public NonBlockingNetworkTransport(final MultiVersionProtocolEngineFactory factory,
                                        final EnumSet<TransportEncryption> encryptionSet,
                                        final NetworkConnectionScheduler scheduler,
-                                       final AmqpPort port)
+                                       final AmqpPort<?> port)
     {
         try
         {
@@ -85,10 +84,11 @@ public class NonBlockingNetworkTransport
                 _address = new InetSocketAddress(bindingAddress, portNumber);
             }
 
+            int acceptBacklog = port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG);
             _serverSocket =  ServerSocketChannel.open();
 
             _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
-            _serverSocket.bind(_address);
+            _serverSocket.bind(_address, acceptBacklog);
             _serverSocket.configureBlocking(false);
             _encryptionSet = encryptionSet;
             _scheduler = scheduler;
@@ -99,7 +99,6 @@ public class NonBlockingNetworkTransport
         {
             throw new TransportException("Failed to start AMQP on port : " + port, e);
         }
-
     }
 
     public void start()
@@ -142,7 +141,6 @@ public class NonBlockingNetworkTransport
                 if (engine != null)
                 {
                     socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _port.isTcpNoDelay());
-                    socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
 
                     final int bufferSize = _port.getNetworkBufferSize();
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Thu Nov 19 14:59:31 2015
@@ -117,6 +117,7 @@ public class TCPandSSLTransportTest exte
         when(port.getSSLContext()).thenReturn(sslContext);
         when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
         when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
+        when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
 
         TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
                                                               port,



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org