You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/15 10:55:55 UTC

svn commit: r1685516 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/adapter/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-core/src/main/java...

Author: rgodfrey
Date: Mon Jun 15 08:55:55 2015
New Revision: 1685516

URL: http://svn.apache.org/r1685516
Log:
QPID-6589 : [Java Broker] use separate thread pools for each virtual host

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Mon Jun 15 08:55:55 2015
@@ -210,7 +210,6 @@ public final class ConnectionAdapter ext
     private ListenableFuture<Void> asyncCloseUnderlying()
     {
         final SettableFuture<Void> closeFuture = SettableFuture.create();
-
         _underlyingConnection.addDeleteTask(new Action()
         {
             @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Mon Jun 15 08:55:55 2015
@@ -30,6 +30,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.util.Deletable;
 
 public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends AMQSessionModel<S,T>> extends StatisticsGatherer, Deletable<T>
@@ -111,4 +112,5 @@ public interface AMQConnectionModel<T ex
 
     ServerProtocolEngine getProtocolEngine();
 
+    void setScheduler(NetworkConnectionScheduler networkConnectionScheduler);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Mon Jun 15 08:55:55 2015
@@ -397,6 +397,7 @@ public class MultiVersionProtocolEngine
         {
             return _aggregateTicker;
         }
+
     }
 
     private class SelfDelegateProtocolEngine implements ServerProtocolEngine
@@ -593,6 +594,7 @@ public class MultiVersionProtocolEngine
 
         }
 
+
         public long getConnectionId()
         {
             return _id;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java Mon Jun 15 08:55:55 2015
@@ -24,7 +24,9 @@ import javax.security.auth.Subject;
 
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 public interface ServerProtocolEngine extends ProtocolEngine
 {
@@ -54,4 +56,5 @@ public interface ServerProtocolEngine ex
     void setWorkListener(Action<ServerProtocolEngine> listener);
 
     AggregateTicker getAggregateTicker();
+
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Mon Jun 15 08:55:55 2015
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.transport;
 
+import java.io.IOException;
+import java.nio.channels.ServerSocketChannel;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -31,7 +33,9 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class NetworkConnectionScheduler
+import org.apache.qpid.transport.TransportException;
+
+public class NetworkConnectionScheduler
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
 
@@ -39,25 +43,35 @@ class NetworkConnectionScheduler
     private final ThreadPoolExecutor _executor;
     private final AtomicInteger _running = new AtomicInteger();
     private final int _poolSize;
+    private final String _name;
 
-    NetworkConnectionScheduler(final SelectorThread selectorThread, final NonBlockingNetworkTransport transport)
+    public NetworkConnectionScheduler(String name, int threadPoolSize)
     {
-        _selectorThread = selectorThread;
-        _poolSize = transport.getThreadPoolSize();
-        _executor = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, TimeUnit.MILLISECONDS,
-                                           new LinkedBlockingQueue<Runnable>(), new ThreadFactory()
+        try
         {
-            final AtomicInteger _count = new AtomicInteger();
-
-            @Override
-            public Thread newThread(final Runnable r)
+            _selectorThread = new SelectorThread(this);
+            _selectorThread.start();
+            _poolSize = threadPoolSize;
+            _name = name;
+            _executor = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, TimeUnit.MILLISECONDS,
+                                               new LinkedBlockingQueue<Runnable>(), new ThreadFactory()
             {
-                Thread t = Executors.defaultThreadFactory().newThread(r);
-                t.setName("IO-pool-"+selectorThread.getName()+"-"+_count.incrementAndGet());
-                return t;
-            }
-        });
-        _executor.prestartAllCoreThreads();
+                final AtomicInteger _count = new AtomicInteger();
+
+                @Override
+                public Thread newThread(final Runnable r)
+                {
+                    Thread t = Executors.defaultThreadFactory().newThread(r);
+                    t.setName("IO-pool-" + getName() + "-" + _count.incrementAndGet());
+                    return t;
+                }
+            });
+            _executor.prestartAllCoreThreads();
+        }
+        catch (IOException e)
+        {
+            throw new TransportException(e);
+        }
     }
 
     public void schedule(final NonBlockingConnection connection)
@@ -93,7 +107,7 @@ class NetworkConnectionScheduler
                 rerun = false;
                 boolean closed = connection.doWork();
 
-                if (!closed)
+                if (!closed && connection.getScheduler() == this)
                 {
 
                     if (connection.isStateChanged() || connection.isPartialRead())
@@ -123,9 +137,39 @@ class NetworkConnectionScheduler
 
     public void close()
     {
+        _selectorThread.close();
         _executor.shutdown();
     }
 
 
+    public String getName()
+    {
+        return _name;
+    }
 
+    public void addAcceptingSocket(final ServerSocketChannel serverSocket,
+                                   final NonBlockingNetworkTransport nonBlockingNetworkTransport)
+    {
+        _selectorThread.addAcceptingSocket(serverSocket, nonBlockingNetworkTransport);
+    }
+
+    public void cancelAcceptingSocket(final ServerSocketChannel serverSocket)
+    {
+        _selectorThread.cancelAcceptingSocket(serverSocket);
+    }
+
+    public void addConnection(final NonBlockingConnection connection)
+    {
+        _selectorThread.addConnection(connection);
+    }
+
+    public void wakeup()
+    {
+        _selectorThread.wakeup();
+    }
+
+    public void removeConnection(final NonBlockingConnection connection)
+    {
+        _selectorThread.removeConnection(connection);
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Mon Jun 15 08:55:55 2015
@@ -60,7 +60,7 @@ public class NonBlockingConnection imple
 
     private final SocketChannel _socketChannel;
     private final Object _peerPrincipalLock = new Object();
-    private final SelectorThread _selector;
+    private NetworkConnectionScheduler _scheduler;
     private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
     private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
 
@@ -92,7 +92,6 @@ public class NonBlockingConnection imple
     public NonBlockingConnection(SocketChannel socketChannel,
                                  ServerProtocolEngine delegate,
                                  int receiveBufferSize,
-                                 Ticker ticker,
                                  final Set<TransportEncryption> encryptionSet,
                                  final SSLContext sslContext,
                                  final boolean wantClientAuth,
@@ -100,10 +99,10 @@ public class NonBlockingConnection imple
                                  final Collection<String> enabledCipherSuites,
                                  final Collection<String> disabledCipherSuites,
                                  final Runnable onTransportEncryptionAction,
-                                 final SelectorThread selectorThread)
+                                 final NetworkConnectionScheduler scheduler)
     {
         _socketChannel = socketChannel;
-        _selector = selectorThread;
+        _scheduler = scheduler;
 
         _protocolEngine = delegate;
         _receiveBufSize = receiveBufferSize;
@@ -114,7 +113,7 @@ public class NonBlockingConnection imple
                                     @Override
                                     public void performAction(final ServerProtocolEngine object)
                                     {
-                                        _selector.wakeup();
+                                        _scheduler.wakeup();
                                     }
                                 });
 
@@ -178,7 +177,7 @@ public class NonBlockingConnection imple
         if(_closed.compareAndSet(false,true))
         {
             _protocolEngine.notifyWork();
-            _selector.wakeup();
+            _scheduler.wakeup();
         }
     }
 
@@ -616,8 +615,6 @@ public class NonBlockingConnection imple
     @Override
     public void send(final ByteBuffer msg)
     {
-        assert _selector.isIOThread() : "Send called by unexpected thread " + Thread.currentThread().getName();
-
 
         if (_closed.get())
         {
@@ -634,6 +631,14 @@ public class NonBlockingConnection imple
     {
     }
 
+    public void changeScheduler(NetworkConnectionScheduler scheduler)
+    {
+        NetworkConnectionScheduler currentScheduler = _scheduler;
+        currentScheduler.removeConnection(this);
+        _scheduler = scheduler;
+        _scheduler.addConnection(this);
+    }
+
     @Override
     public String toString()
     {
@@ -665,4 +670,9 @@ public class NonBlockingConnection imple
                 headerBytes[4] == 2 || // TLS 1.1
                 headerBytes[4] == 3);
     }
+
+    public NetworkConnectionScheduler getScheduler()
+    {
+        return _scheduler;
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Mon Jun 15 08:55:55 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.EnumSet;
@@ -58,13 +59,13 @@ public class NonBlockingNetworkTransport
     private final SSLContext _sslContext;
     private final ServerSocketChannel _serverSocket;
     private final int _timeout;
-
-    private SelectorThread _selector;
+    private final NetworkConnectionScheduler _scheduler;
 
     public NonBlockingNetworkTransport(final NetworkTransportConfiguration config,
                                        final MultiVersionProtocolEngineFactory factory,
                                        final SSLContext sslContext,
-                                       final EnumSet<TransportEncryption> encryptionSet)
+                                       final EnumSet<TransportEncryption> encryptionSet,
+                                       final NetworkConnectionScheduler scheduler)
     {
         try
         {
@@ -82,6 +83,7 @@ public class NonBlockingNetworkTransport
             _serverSocket.bind(address);
             _serverSocket.configureBlocking(false);
             _encryptionSet = encryptionSet;
+            _scheduler = scheduler;
 
         }
         catch (IOException e)
@@ -93,37 +95,20 @@ public class NonBlockingNetworkTransport
 
     public void start()
     {
-        try
-        {
-            _selector = new SelectorThread(this);
-            _selector.start();
-            _selector.addAcceptingSocket(_serverSocket);
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Failed to start", e);
-        }
+        _scheduler.addAcceptingSocket(_serverSocket, this);
     }
 
 
     public void close()
     {
-        if(_selector != null)
+        _scheduler.cancelAcceptingSocket(_serverSocket);
+        try
         {
-            _selector.cancelAcceptingSocket(_serverSocket);
-            try
-            {
-                _serverSocket.close();
-            }
-            catch (IOException e)
-            {
-                LOGGER.warn("Error closing the server socket for : " +  _config.getAddress().toString(), e);
-            }
-            finally
-            {
-                _selector.close();
-                _selector = null;
-            }
+            _serverSocket.close();
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Error closing the server socket for : " +  _config.getAddress().toString(), e);
         }
     }
 
@@ -171,7 +156,6 @@ public class NonBlockingNetworkTransport
                         new NonBlockingConnection(socketChannel,
                                                   engine,
                                                   receiveBufferSize,
-                                                  idleTimeoutTicker,
                                                   _encryptionSet,
                                                   _sslContext,
                                                   _config.wantClientAuth(),
@@ -187,7 +171,7 @@ public class NonBlockingNetworkTransport
                                                           engine.encryptedTransport();
                                                       }
                                                   },
-                                                  _selector);
+                                                  _scheduler);
 
                 engine.setNetworkConnection(connection, connection.getSender());
                 connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
@@ -196,7 +180,7 @@ public class NonBlockingNetworkTransport
 
                 connection.start();
 
-                _selector.addConnection(connection);
+                _scheduler.addConnection(connection);
 
                 success = true;
             }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Mon Jun 15 08:55:55 2015
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SelectorThread extends Thread
+class SelectorThread extends Thread
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
 
@@ -58,19 +58,18 @@ public class SelectorThread extends Thre
     private final Selector _selector;
     private final AtomicBoolean _closed = new AtomicBoolean();
     private final NetworkConnectionScheduler _scheduler;
-    private final NonBlockingNetworkTransport _transport;
     private long _nextTimeout;
 
-    SelectorThread(final NonBlockingNetworkTransport nonBlockingNetworkTransport) throws IOException
+    SelectorThread(final NetworkConnectionScheduler scheduler) throws IOException
     {
-        super("SelectorThread-" + nonBlockingNetworkTransport.getConfig().getAddress().toString());
+        super("SelectorThread-" + scheduler.getName());
 
-        _transport = nonBlockingNetworkTransport;
         _selector = Selector.open();
-        _scheduler = new NetworkConnectionScheduler(this, _transport);
+        _scheduler = scheduler;
     }
 
-    public void addAcceptingSocket(final ServerSocketChannel socketChannel)
+    public void addAcceptingSocket(final ServerSocketChannel socketChannel,
+                                   final NonBlockingNetworkTransport nonBlockingNetworkTransport)
     {
         _tasks.add(new Runnable()
                     {
@@ -80,7 +79,7 @@ public class SelectorThread extends Thre
 
                             try
                             {
-                                socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
+                                socketChannel.register(_selector, SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport);
                             }
                             catch (IllegalStateException | ClosedChannelException e)
                             {
@@ -127,7 +126,7 @@ public class SelectorThread extends Thre
                 catch (IOException e)
                 {
                     // TODO Inform the model object
-                    LOGGER.error("Failed to select for " + _transport.getConfig().getAddress().toString(),e );
+                    LOGGER.error("Failed to trying to select()",e );
                     break;
                 }
 
@@ -177,8 +176,7 @@ public class SelectorThread extends Thre
                 toBeScheduled.add(connection);
                 try
                 {
-                    SelectionKey register = connection.getSocketChannel().register(_selector, 0);
-                    register.cancel();
+                    unregisterConnection(connection);
                 }
                 catch (ClosedChannelException e)
                 {
@@ -196,6 +194,12 @@ public class SelectorThread extends Thre
         return toBeScheduled;
     }
 
+    private void unregisterConnection(final NonBlockingConnection connection) throws ClosedChannelException
+    {
+        SelectionKey register = connection.getSocketChannel().register(_selector, 0);
+        register.cancel();
+    }
+
     private List<NonBlockingConnection> reregisterUnregisteredConnections()
     {
         List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
@@ -230,8 +234,9 @@ public class SelectorThread extends Thre
         {
             if(key.isAcceptable())
             {
+                NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
                 // todo - should we schedule this rather than running in this thread?
-                _transport.acceptSocketChannel((ServerSocketChannel)key.channel());
+                transport.acceptSocketChannel((ServerSocketChannel)key.channel());
             }
             else
             {
@@ -272,6 +277,20 @@ public class SelectorThread extends Thre
 
     }
 
+    void removeConnection(NonBlockingConnection connection)
+    {
+        try
+        {
+            unregisterConnection(connection);
+        }
+        catch (ClosedChannelException e)
+        {
+            LOGGER.debug("Failed to unregister with selector for connection " + connection +
+                         ". Connection is probably being closed by peer.", e);
+
+        }
+    }
+
     public void wakeup()
     {
         _selector.wakeup();
@@ -281,11 +300,6 @@ public class SelectorThread extends Thre
     {
         _closed.set(true);
         _selector.wakeup();
-        _scheduler.close();
     }
 
-    boolean isIOThread()
-    {
-        return Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX);
-    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Mon Jun 15 08:55:55 2015
@@ -46,6 +46,7 @@ class TCPandSSLTransport implements Acce
     private AmqpPort<?> _port;
     private Set<Protocol> _supported;
     private Protocol _defaultSupportedProtocolReply;
+    private NetworkConnectionScheduler _scheduler;
 
     TCPandSSLTransport(final Set<Transport> transports,
                        final SSLContext sslContext,
@@ -99,7 +100,9 @@ class TCPandSSLTransport implements Acce
             encryptionSet.add(TransportEncryption.TLS);
         }
 
-        _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory, _sslContext, encryptionSet);
+        _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(),settings.getThreadPoolSize());
+        _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory,
+                                                            _sslContext, encryptionSet, _scheduler);
         _networkTransport.start();
     }
 
@@ -115,6 +118,10 @@ class TCPandSSLTransport implements Acce
         {
             _networkTransport.close();
         }
+        if(_scheduler != null)
+        {
+            _scheduler.close();
+        }
     }
 
     class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Jun 15 08:55:55 2015
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -91,6 +92,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.MessageStoreProvider;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -149,6 +151,8 @@ public abstract class AbstractVirtualHos
 
     private final Set<BlockingType> _blockingReasons = Collections.synchronizedSet(EnumSet.noneOf(BlockingType.class));
 
+    private NetworkConnectionScheduler _networkConnectionScheduler;
+
 
     @ManagedAttributeField
     private boolean _queue_deadLetterQueueEnabled;
@@ -860,7 +864,11 @@ public abstract class AbstractVirtualHos
         _dtxRegistry.close();
         closeMessageStore();
         shutdownHouseKeeping();
-
+        if(_networkConnectionScheduler != null)
+        {
+            _networkConnectionScheduler.close();
+            _networkConnectionScheduler = null;
+        }
         _eventLogger.message(VirtualHostMessages.CLOSED(getName()));
     }
 
@@ -995,6 +1003,7 @@ public abstract class AbstractVirtualHos
         ConnectionAdapter c = new ConnectionAdapter(connection);
         c.create();
         childAdded(c);
+        connection.setScheduler(_networkConnectionScheduler);
 
     }
 
@@ -1375,6 +1384,11 @@ public abstract class AbstractVirtualHos
                         try
                         {
                             shutdownHouseKeeping();
+                            if(_networkConnectionScheduler != null)
+                            {
+                                _networkConnectionScheduler.close();
+                                _networkConnectionScheduler = null;
+                            }
                             closeMessageStore();
                             setState(State.STOPPED);
 
@@ -1629,7 +1643,7 @@ public abstract class AbstractVirtualHos
     private ListenableFuture<Void> onActivate()
     {
         _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory("virtualhost-" + getName() + "-pool"));
-
+        _networkConnectionScheduler = new NetworkConnectionScheduler("virtualhost-" + getName() + "-iopool", Runtime.getRuntime().availableProcessors());
         MessageStore messageStore = getMessageStore();
         messageStore.openMessageStore(this);
 
@@ -1847,5 +1861,4 @@ public abstract class AbstractVirtualHos
         }
     }
 
-
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Mon Jun 15 08:55:55 2015
@@ -40,6 +40,8 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
 
 public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AMQQueue<?>, E extends ExchangeImpl<?> >
@@ -112,4 +114,5 @@ public interface VirtualHostImpl< X exte
     boolean authoriseCreateConnection(AMQConnectionModel<?, ?> connection);
 
     String getLocalAddress(String routingAddress);
+
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Mon Jun 15 08:55:55 2015
@@ -532,6 +532,7 @@ class RedirectingVirtualHostImpl
         return localAddress;
     }
 
+
     private void throwUnsupportedForRedirector()
     {
         throw new IllegalStateException("The virtual host state of " + getState()

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Mon Jun 15 08:55:55 2015
@@ -57,6 +57,7 @@ import org.apache.qpid.server.protocol.S
 import org.apache.qpid.server.protocol.SessionModelListener;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.transport.network.Ticker;
@@ -671,6 +672,12 @@ public class MockConsumer implements Con
         }
 
         @Override
+        public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+        {
+
+        }
+
+        @Override
         public String getClientVersion()
         {
             return null;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Mon Jun 15 08:55:55 2015
@@ -21,7 +21,6 @@ package org.apache.qpid.server.transport
 
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngine;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.network.AggregateTicker;
@@ -111,7 +110,10 @@ public class NetworkConnectionSchedulerT
         when(verboseEngine.getAggregateTicker()).thenReturn(new AggregateTicker());
         when(timidEngine.getAggregateTicker()).thenReturn(new AggregateTicker());
 
-        NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(config, engineFactory, null, EnumSet.of(TransportEncryption.NONE));
+        final NetworkConnectionScheduler scheduler = new NetworkConnectionScheduler(getName(), 1);
+
+        NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(config, engineFactory, null, EnumSet.of(TransportEncryption.NONE),
+                                                                                scheduler);
 
         transport.start();
         final int port = transport.getAcceptingPort();
@@ -164,6 +166,7 @@ public class NetworkConnectionSchedulerT
         verify(timidEngine, atLeast(6)).received(any(ByteBuffer.class));
         _keepRunningThreads = false;
         transport.close();
+        scheduler.close();
     }
 
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Mon Jun 15 08:55:55 2015
@@ -38,11 +38,12 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.NetworkConnection;
 
@@ -89,6 +90,7 @@ public class ProtocolEngine_0_10  extend
         return _aggregateTicker;
     }
 
+
     @Override
     public boolean isMessageAssignmentSuspended()
     {
@@ -343,4 +345,9 @@ public class ProtocolEngine_0_10  extend
     {
         _workListener.set(listener);
     }
+
+    public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+    {
+        ((NonBlockingConnection)_network).changeScheduler(networkConnectionScheduler);
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Jun 15 08:55:55 2015
@@ -40,6 +40,9 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
@@ -56,6 +59,7 @@ import org.apache.qpid.server.protocol.S
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -73,7 +77,7 @@ import org.apache.qpid.transport.Session
 public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
                                                             LogSubject, AuthorizationHolder
 {
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class);
     public static final long CLOSE_OK_TIMEOUT = 10000l;
     private final Broker<?> _broker;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -102,7 +106,7 @@ public class ServerConnection extends Co
     private int _messageCompressionThreshold;
     private final int _maxMessageSize;
 
-    private ServerProtocolEngine _serverProtocolEngine;
+    private ProtocolEngine_0_10 _serverProtocolEngine;
     private boolean _ignoreFutureInput;
     private boolean _ignoreAllButConnectionCloseOk;
 
@@ -214,7 +218,13 @@ public class ServerConnection extends Co
         return _serverProtocolEngine;
     }
 
-    public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine)
+    @Override
+    public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+    {
+        _serverProtocolEngine.setScheduler(networkConnectionScheduler);
+    }
+
+    public void setProtocolEngine(final ProtocolEngine_0_10 serverProtocolEngine)
     {
         _serverProtocolEngine = serverProtocolEngine;
     }
@@ -396,7 +406,6 @@ public class ServerConnection extends Co
 
     public void closeAsync(final AMQConstant cause, final String message)
     {
-
         addAsyncTask(new Action<ServerConnection>()
         {
             @Override
@@ -406,8 +415,6 @@ public class ServerConnection extends Co
                 {
                     markAllSessionsClosed();
 
-                    performDeleteTasks();
-
                     setState(CLOSING);
                     ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
                     try

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Mon Jun 15 08:55:55 2015
@@ -28,6 +28,8 @@ import java.util.List;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -76,12 +78,15 @@ public class ServerSessionTest extends Q
         when(amqpPort.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
 
         ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
+        final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class);
+        connection.setProtocolEngine(protocolEngine);
         connection.setVirtualHost(_virtualHost);
         ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
                 new Binary(getName().getBytes()), 0);
 
         // create a session with the same name but on a different connection
         ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
+        connection2.setProtocolEngine(protocolEngine);
         connection2.setVirtualHost(_virtualHost);
         ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
                 new Binary(getName().getBytes()), 0);
@@ -98,6 +103,8 @@ public class ServerSessionTest extends Q
         AmqpPort port = mock(AmqpPort.class);
         when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024);
         ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
+        final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class);
+        connection.setProtocolEngine(protocolEngine);
         connection.setVirtualHost(_virtualHost);
         final List<Method> invokedMethods = new ArrayList<>();
         ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Jun 15 08:55:55 2015
@@ -85,6 +85,8 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -279,6 +281,12 @@ public class AMQProtocolEngine implement
     }
 
     @Override
+    public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+    {
+        ((NonBlockingConnection)_network).changeScheduler(networkConnectionScheduler);
+    }
+
+    @Override
     public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
     {
         _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
@@ -847,22 +855,13 @@ public class AMQProtocolEngine implement
             }
             finally
             {
-                try
+                synchronized (this)
                 {
-                    for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList)
-                    {
-                        task.performAction(this);
-                    }
-                }
-                finally
-                {
-                    synchronized (this)
-                    {
-                        _closed = true;
-                        notifyAll();
-                    }
-                    getEventLogger().message(_logSubject, connectionDropped ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE());
+                    _closed = true;
+                    notifyAll();
                 }
+                getEventLogger().message(_logSubject, connectionDropped ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE());
+
             }
         }
     }
@@ -1145,7 +1144,14 @@ public class AMQProtocolEngine implement
             }
             finally
             {
-                closeNetworkConnection();
+                try
+                {
+                    closeNetworkConnection();
+                }
+                finally
+                {
+                    performDeleteTasks();
+                }
             }
         }
         catch (ConnectionScopedRuntimeException | TransportException e)
@@ -1154,6 +1160,30 @@ public class AMQProtocolEngine implement
         }
     }
 
+    private void performDeleteTasks()
+    {
+        if(runningAsSubject())
+        {
+            for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList)
+            {
+                task.performAction(this);
+            }
+        }
+        else
+        {
+            runAsSubject(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    performDeleteTasks();
+                    return null;
+                }
+            });
+        }
+
+    }
+
     @Override
     public void encryptedTransport()
     {
@@ -1542,7 +1572,6 @@ public class AMQProtocolEngine implement
 
                         writeFrame(responseBody.generateFrame(0));
                         _state = ConnectionState.OPEN;
-
                     }
                     else
                     {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Mon Jun 15 08:55:55 2015
@@ -59,6 +59,8 @@ import org.apache.qpid.server.protocol.S
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
@@ -258,6 +260,16 @@ public class Connection_1_0 implements C
             session.remoteEnd(new End());
         }
 
+        if(_vhost != null)
+        {
+            _vhost.getConnectionRegistry().deregisterConnection(this);
+        }
+
+
+    }
+
+    void performCloseTasks()
+    {
         List<Action<? super Connection_1_0>> taskCopy;
 
         synchronized (_closeTasks)
@@ -272,12 +284,6 @@ public class Connection_1_0 implements C
         {
             _closeTasks.clear();
         }
-        if(_vhost != null)
-        {
-            _vhost.getConnectionRegistry().deregisterConnection(this);
-        }
-
-
     }
 
     public void closed()
@@ -423,6 +429,12 @@ public class Connection_1_0 implements C
     }
 
     @Override
+    public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+    {
+        _protocolEngine.changeScheduler(networkConnectionScheduler);
+    }
+
+    @Override
     public Transport getTransport()
     {
         return _transport;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java Mon Jun 15 08:55:55 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.io.PrintWriter;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.security.Principal;
@@ -40,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.amqp_1_0.codec.FrameWriter;
 import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler;
 import org.apache.qpid.amqp_1_0.framing.AMQFrame;
 import org.apache.qpid.amqp_1_0.framing.FrameHandler;
 import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
@@ -67,6 +65,8 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
@@ -516,6 +516,12 @@ public class ProtocolEngine_1_0_0 implem
         }
     }
 
+    void changeScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+    {
+        ((NonBlockingConnection)_network).changeScheduler(networkConnectionScheduler);
+    }
+
+
     public long getCreateTime()
     {
         return _createTime;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Mon Jun 15 08:55:55 2015
@@ -71,6 +71,8 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.ByteBufferSender;
@@ -94,7 +96,7 @@ public class ProtocolEngine_1_0_0Test ex
     public void setUp() throws Exception
     {
         super.setUp();
-        _networkConnection = mock(NetworkConnection.class);
+        _networkConnection = mock(NonBlockingConnection.class);
         _broker = mock(Broker.class);
         when(_broker.getModel()).thenReturn(BrokerModel.getInstance());
         final TaskExecutor taskExecutor = new TaskExecutorImpl();
@@ -107,6 +109,7 @@ public class ProtocolEngine_1_0_0Test ex
         _authenticationProvider = mock(AuthenticationProvider.class);
         when(_port.getAuthenticationProvider()).thenReturn(_authenticationProvider);
         VirtualHostImpl virtualHost = mock(VirtualHostImpl.class);
+
         _connectionRegistry = mock(IConnectionRegistry.class);
         final ArgumentCaptor<AMQConnectionModel> connectionCaptor = ArgumentCaptor.forClass(AMQConnectionModel.class);
         doAnswer(new Answer()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org