You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/10/30 19:16:07 UTC

svn commit: r1711533 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport: NonBlockingNetworkTransport.java SelectorThread.java

Author: kwall
Date: Fri Oct 30 18:16:06 2015
New Revision: 1711533

URL: http://svn.apache.org/viewvc?rev=1711533&view=rev
Log:
QPID-6819: [Java Broker] Schedule socket accepts on the thread pool

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1711533&r1=1711532&r2=1711533&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Fri Oct 30 18:16:06 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.transport
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
@@ -131,58 +132,61 @@ public class NonBlockingNetworkTransport
         boolean success = false;
         try
         {
-            socketChannel = serverSocketChannel.accept();
 
-            final MultiVersionProtocolEngine engine =
-                    _factory.newProtocolEngine(socketChannel.socket().getRemoteSocketAddress());
-
-            if(engine != null)
+            while ((socketChannel = serverSocketChannel.accept()) != null)
             {
-                socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _port.isTcpNoDelay());
-                socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+                SocketAddress remoteSocketAddress = socketChannel.socket().getRemoteSocketAddress();
+                final MultiVersionProtocolEngine engine =
+                        _factory.newProtocolEngine(remoteSocketAddress);
 
-                final int bufferSize = _port.getNetworkBufferSize();
+                if (engine != null)
+                {
+                    socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _port.isTcpNoDelay());
+                    socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
 
-                socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, bufferSize);
-                socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, bufferSize);
+                    final int bufferSize = _port.getNetworkBufferSize();
 
-                socketChannel.configureBlocking(false);
+                    socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, bufferSize);
+                    socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, bufferSize);
 
-                AggregateTicker aggregateTicker = engine.getAggregateTicker();
+                    socketChannel.configureBlocking(false);
 
-                final IdleTimeoutTicker idleTimeoutTicker = new IdleTimeoutTicker(engine, _timeout);
-                aggregateTicker.addTicker(idleTimeoutTicker);
+                    AggregateTicker aggregateTicker = engine.getAggregateTicker();
 
-                NonBlockingConnection connection =
-                        new NonBlockingConnection(socketChannel,
-                                                  engine,
-                                                  _encryptionSet,
-                                                  new Runnable()
-                                                  {
+                    final IdleTimeoutTicker idleTimeoutTicker = new IdleTimeoutTicker(engine, _timeout);
+                    aggregateTicker.addTicker(idleTimeoutTicker);
 
-                                                      @Override
-                                                      public void run()
+                    NonBlockingConnection connection =
+                            new NonBlockingConnection(socketChannel,
+                                                      engine,
+                                                      _encryptionSet,
+                                                      new Runnable()
                                                       {
-                                                          engine.encryptedTransport();
-                                                      }
-                                                  },
-                                                  _scheduler,
-                                                  _port);
-
-                engine.setNetworkConnection(connection);
-                connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
-
-                idleTimeoutTicker.setConnection(connection);
-
-                connection.start();
 
-                _scheduler.addConnection(connection);
-
-                success = true;
-            }
-            else
-            {
-                LOGGER.error("No Engine available.");
+                                                          @Override
+                                                          public void run()
+                                                          {
+                                                              engine.encryptedTransport();
+                                                          }
+                                                      },
+                                                      _scheduler,
+                                                      _port);
+
+                    engine.setNetworkConnection(connection);
+                    connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+                    idleTimeoutTicker.setConnection(connection);
+
+                    connection.start();
+
+                    _scheduler.addConnection(connection);
+
+                    success = true;
+                }
+                else
+                {
+                    LOGGER.error("No Engine available.");
+                }
             }
         }
         catch (IOException e)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1711533&r1=1711532&r2=1711533&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Fri Oct 30 18:16:06 2015
@@ -20,6 +20,7 @@
 package org.apache.qpid.server.transport;
 
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
@@ -164,9 +165,44 @@ class SelectorThread extends Thread
             {
                 if(key.isAcceptable())
                 {
-                    NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
-                    // todo - should we schedule this rather than running in this thread?
-                    transport.acceptSocketChannel((ServerSocketChannel)key.channel());
+                    final NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
+                    final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
+                    final SocketAddress localSocketAddress = channel.socket().getLocalSocketAddress();
+
+                    try
+                    {
+                        channel.register(_selector, 0);
+                    }
+                    catch (ClosedChannelException e)
+                    {
+                        LOGGER.error("Failed to register selector on accepting port {} ",
+                                     localSocketAddress, e);
+                    }
+
+                    _workQueue.add(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                transport.acceptSocketChannel(channel);
+                            }
+                            finally
+                            {
+                                try
+                                {
+                                    channel.register(_selector, SelectionKey.OP_ACCEPT, transport);
+                                    wakeup();
+                                }
+                                catch (ClosedChannelException e)
+                                {
+                                    LOGGER.error("Failed to register selector on accepting port {}",
+                                                 localSocketAddress, e);
+                                }
+                            }
+                        }
+                    });
                 }
                 else
                 {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org