You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/16 11:28:47 UTC

svn commit: r1685745 - in /qpid/java/trunk/broker-core/src: main/java/org/apache/qpid/server/model/port/ main/java/org/apache/qpid/server/transport/ test/java/org/apache/qpid/server/transport/

Author: rgodfrey
Date: Tue Jun 16 09:28:46 2015
New Revision: 1685745

URL: http://svn.apache.org/r1685745
Log:
QPID-6249 : Refactory NonBlockingConnection (work by Lorenz Quack and Rob Godfrey)

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java   (with props)
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java   (with props)
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java   (with props)
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java   (with props)
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Tue Jun 16 09:28:46 2015
@@ -35,6 +35,8 @@ import org.apache.qpid.server.model.Trus
 import org.apache.qpid.server.model.VirtualHostAlias;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
+import javax.net.ssl.SSLContext;
+
 @ManagedObject( category = false, type = "AMQP")
 public interface AmqpPort<X extends AmqpPort<X>> extends ClientAuthCapablePort<X>
 {
@@ -60,6 +62,10 @@ public interface AmqpPort<X extends Amqp
     @ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS)
     int DEFAULT_MAX_OPEN_CONNECTIONS = -1;
 
+    String THREAD_POOL_SIZE = "qpid.port.thread_pool_size";
+
+    @ManagedContextDefault(name = THREAD_POOL_SIZE)
+    int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
 
     String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
 
@@ -72,6 +78,8 @@ public interface AmqpPort<X extends Amqp
     int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
 
 
+    SSLContext getSSLContext();
+
     @ManagedAttribute(defaultValue = "*")
     String getBindingAddress();
 
@@ -84,6 +92,8 @@ public interface AmqpPort<X extends Amqp
     @ManagedAttribute( defaultValue = AmqpPort.DEFAULT_AMQP_RECEIVE_BUFFER_SIZE )
     int getReceiveBufferSize();
 
+    @ManagedAttribute( defaultValue = "${" + THREAD_POOL_SIZE + "}")
+    int getThreadPoolSize();
 
     @ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH )
     boolean getNeedClientAuth();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Tue Jun 16 09:28:46 2015
@@ -119,6 +119,9 @@ public class AmqpPortImpl extends Abstra
     @ManagedAttributeField
     private int _maxOpenConnections;
 
+    @ManagedAttributeField
+    private int _threadPoolSize;
+
     private final AtomicInteger _connectionCount = new AtomicInteger();
     private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
 
@@ -126,6 +129,7 @@ public class AmqpPortImpl extends Abstra
     private AcceptingTransport _transport;
     private final AtomicBoolean _closing = new AtomicBoolean();
     private final SettableFuture _noConnectionsRemain = SettableFuture.create();
+    private SSLContext _sslContext;
 
     @ManagedObjectFactoryConstructor
     public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -134,6 +138,11 @@ public class AmqpPortImpl extends Abstra
         _broker = broker;
     }
 
+    @Override
+    public SSLContext getSSLContext()
+    {
+        return _sslContext;
+    }
 
     @Override
     public String getBindingAddress()
@@ -166,6 +175,12 @@ public class AmqpPortImpl extends Abstra
     }
 
     @Override
+    public int getThreadPoolSize()
+    {
+        return _threadPoolSize;
+    }
+
+    @Override
     protected void onCreate()
     {
         super.onCreate();
@@ -238,15 +253,14 @@ public class AmqpPortImpl extends Abstra
                 );
             }
 
-            SSLContext sslContext = null;
             if (transports.contains(Transport.SSL) || transports.contains(Transport.WSS))
             {
-                sslContext = createSslContext();
+                _sslContext = createSslContext();
             }
             Protocol defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
 
             _transport = transportProvider.createTransport(transportSet,
-                                                           sslContext,
+                                                           _sslContext,
                                                            this,
                                                            getProtocols(),
                                                            defaultSupportedProtocolReply);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Jun 16 09:28:46 2015
@@ -26,20 +26,11 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.security.Principal;
 import java.security.cert.Certificate;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLPeerUnverifiedException;
-
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,101 +41,71 @@ import org.apache.qpid.transport.ByteBuf
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 import org.apache.qpid.util.SystemUtils;
 
 public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
-    private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
 
     private final SocketChannel _socketChannel;
-    private final Object _peerPrincipalLock = new Object();
+    private NonBlockingConnectionDelegate _delegate;
     private NetworkConnectionScheduler _scheduler;
     private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
-    private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
 
     private final String _remoteSocketAddress;
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final ServerProtocolEngine _protocolEngine;
-    private final int _receiveBufSize;
     private final Runnable _onTransportEncryptionAction;
-
+    private final int _receiveBufferSize;
 
     private volatile int _maxReadIdle;
     private volatile int _maxWriteIdle;
-    private Principal _principal;
-    private boolean _principalChecked;
 
     private ByteBuffer _netInputBuffer;
-    private SSLEngine _sslEngine;
-
-    private ByteBuffer _currentBuffer;
 
-    private TransportEncryption _transportEncryption;
-    private SSLEngineResult _status;
     private volatile boolean _fullyWritten = true;
-    private boolean _workDone;
-    private Certificate _peerCertificate;
 
-    private boolean _partialRead;
+    private boolean _partialRead = false;
+    private final AmqpPort _port;
 
     public NonBlockingConnection(SocketChannel socketChannel,
-                                 ServerProtocolEngine delegate,
+                                 ServerProtocolEngine protocolEngine,
                                  int receiveBufferSize,
                                  final Set<TransportEncryption> encryptionSet,
-                                 final SSLContext sslContext,
-                                 final boolean wantClientAuth,
-                                 final boolean needClientAuth,
-                                 final Collection<String> enabledCipherSuites,
-                                 final Collection<String> disabledCipherSuites,
                                  final Runnable onTransportEncryptionAction,
-                                 final NetworkConnectionScheduler scheduler)
+                                 final NetworkConnectionScheduler scheduler,
+                                 final AmqpPort port)
     {
         _socketChannel = socketChannel;
         _scheduler = scheduler;
 
-        _protocolEngine = delegate;
-        _receiveBufSize = receiveBufferSize;
+        _protocolEngine = protocolEngine;
         _onTransportEncryptionAction = onTransportEncryptionAction;
 
-        delegate.setWorkListener(new Action<ServerProtocolEngine>()
-                                {
-                                    @Override
-                                    public void performAction(final ServerProtocolEngine object)
-                                    {
-                                        _scheduler.wakeup();
-                                    }
-                                });
+        _receiveBufferSize = receiveBufferSize;
 
-        if(encryptionSet.size() == 1)
+        _netInputBuffer = ByteBuffer.allocate(receiveBufferSize);
+        _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
+        _port = port;
+
+        protocolEngine.setWorkListener(new Action<ServerProtocolEngine>()
         {
-            _transportEncryption = encryptionSet.iterator().next();
-            if (_transportEncryption == TransportEncryption.TLS)
+            @Override
+            public void performAction(final ServerProtocolEngine object)
             {
-                onTransportEncryptionAction.run();
+                _scheduler.wakeup();
             }
-        }
+        });
 
-        if(encryptionSet.contains(TransportEncryption.TLS))
+        if(encryptionSet.size() == 1)
         {
-            _sslEngine = sslContext.createSSLEngine();
-            _sslEngine.setUseClientMode(false);
-            SSLUtil.removeSSLv3Support(_sslEngine);
-            SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
-
-            if(needClientAuth)
-            {
-                _sslEngine.setNeedClientAuth(true);
-            }
-            else if(wantClientAuth)
-            {
-                _sslEngine.setWantClientAuth(true);
-            }
-            _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
+            setTransportEncryption(encryptionSet.iterator().next());
+        }
+        else
+        {
+            _delegate = new NonBlockingConnectionUndecidedDelegate(this);
         }
 
-        _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
     }
 
     public boolean isPartialRead()
@@ -162,15 +123,18 @@ public class NonBlockingConnection imple
         return _socketChannel;
     }
 
+    @Override
     public void start()
     {
     }
 
+    @Override
     public ByteBufferSender getSender()
     {
         return this;
     }
 
+    @Override
     public void close()
     {
         LOGGER.debug("Closing " + _remoteSocketAddress);
@@ -181,21 +145,25 @@ public class NonBlockingConnection imple
         }
     }
 
+    @Override
     public SocketAddress getRemoteAddress()
     {
         return _socketChannel.socket().getRemoteSocketAddress();
     }
 
+    @Override
     public SocketAddress getLocalAddress()
     {
         return _socketChannel.socket().getLocalSocketAddress();
     }
 
+    @Override
     public void setMaxWriteIdle(int sec)
     {
         _maxWriteIdle = sec;
     }
 
+    @Override
     public void setMaxReadIdle(int sec)
     {
         _maxReadIdle = sec;
@@ -204,45 +172,13 @@ public class NonBlockingConnection imple
     @Override
     public Principal getPeerPrincipal()
     {
-        checkPeerPrincipal();
-        return _principal;
+        return _delegate.getPeerPrincipal();
     }
 
     @Override
     public Certificate getPeerCertificate()
     {
-        checkPeerPrincipal();
-        return _peerCertificate;
-    }
-
-    private void checkPeerPrincipal()
-    {
-        synchronized (_peerPrincipalLock)
-        {
-            if (!_principalChecked)
-            {
-                if (_sslEngine != null)
-                {
-                    try
-                    {
-                        _principal = _sslEngine.getSession().getPeerPrincipal();
-                        final Certificate[] peerCertificates =
-                                _sslEngine.getSession().getPeerCertificates();
-                        if (peerCertificates != null && peerCertificates.length > 0)
-                        {
-                            _peerCertificate = peerCertificates[0];
-                        }
-                    }
-                    catch (SSLPeerUnverifiedException e)
-                    {
-                        _principal = null;
-                        _peerCertificate = null;
-                    }
-                }
-
-                _principalChecked = true;
-            }
-        }
+        return _delegate.getPeerCertificate();
     }
 
     @Override
@@ -269,7 +205,6 @@ public class NonBlockingConnection imple
 
     public boolean isStateChanged()
     {
-
         return _protocolEngine.hasWork();
     }
 
@@ -281,8 +216,6 @@ public class NonBlockingConnection imple
         {
             try
             {
-                _workDone = false;
-
                 long currentTime = System.currentTimeMillis();
                 int tick = getTicker().getTimeToNextTick(currentTime);
                 if (tick <= 0)
@@ -299,7 +232,7 @@ public class NonBlockingConnection imple
                 _fullyWritten = doWrite();
                 _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
 
-                if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
+                if (dataRead || (_delegate.needsWork() && _netInputBuffer.position() != 0))
                 {
                     _protocolEngine.notifyWork();
                 }
@@ -319,187 +252,121 @@ public class NonBlockingConnection imple
         }
         else
         {
+            shutdown();
+        }
 
-            if(!SystemUtils.isWindows())
-            {
-                try
-                {
-                    _socketChannel.shutdownInput();
-                }
-                catch (IOException e)
-                {
-                    LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e);
+        return closed;
 
-                }
-            }
-            try
+    }
+
+    private void shutdown()
+    {
+        shutdownInput();
+
+        shutdownFinalWrite();
+        LOGGER.debug("Closing receiver");
+        _protocolEngine.closed();
+
+        shutdownOutput();
+    }
+
+    private void shutdownFinalWrite()
+    {
+        try
+        {
+            while(!doWrite())
             {
-                while(!doWrite())
-                {
-                }
             }
-            catch (IOException e)
-            {
-                LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+        }
+        catch (IOException e)
+        {
+            LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+        }
+    }
 
+    private void shutdownOutput()
+    {
+        try
+        {
+            if(!SystemUtils.isWindows())
+            {
+                _socketChannel.shutdownOutput();
             }
-            LOGGER.debug("Closing receiver");
-            _protocolEngine.closed();
 
+            _socketChannel.close();
+        }
+        catch (IOException e)
+        {
+            LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e);
+        }
+    }
+
+    private void shutdownInput()
+    {
+        if(!SystemUtils.isWindows())
+        {
             try
             {
-                if(!SystemUtils.isWindows())
-                {
-                    _socketChannel.shutdownOutput();
-                }
-
-                _socketChannel.close();
+                _socketChannel.shutdownInput();
             }
             catch (IOException e)
             {
-                LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e);
+                LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e);
             }
         }
-
-        return closed;
-
     }
 
-    private boolean doRead() throws IOException
+    /**
+     * doRead is not reentrant.
+     */
+    boolean doRead() throws IOException
     {
         boolean readData = false;
         _partialRead = false;
-        if(_transportEncryption == TransportEncryption.NONE)
+        if (!_closed.get())
         {
-            if (!_closed.get())
-            {
-                if (_currentBuffer == null || _currentBuffer.remaining() == 0)
-                {
-                    _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
-                }
-                int read = _socketChannel.read(_currentBuffer);
-                if(read > 0)
-                {
-                    readData = true;
-                }
-                if (read == -1)
-                {
-                    _closed.set(true);
-                }
-
-                _partialRead = !_currentBuffer.hasRemaining();
-
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Read " + read + " byte(s)");
-                }
-                ByteBuffer dup = _currentBuffer.duplicate();
-                dup.flip();
-                _currentBuffer = _currentBuffer.slice();
-                _protocolEngine.received(dup);
-            }
+            readData = _delegate.doRead();
         }
-        else if(_transportEncryption == TransportEncryption.TLS)
-        {
-            if (!_closed.get() && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
-            {
-                int read = _socketChannel.read(_netInputBuffer);
-                if (read == -1)
-                {
-                    _closed.set(true);
-                }
-                else if(read > 0)
-                {
-                    readData = true;
-                }
-
-                _partialRead = !_netInputBuffer.hasRemaining();
-
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Read " + read + " encrypted bytes ");
-                }
-
-                _netInputBuffer.flip();
-
+        return readData;
+    }
 
-                int unwrapped = 0;
-                boolean tasksRun;
-                do
-                {
-                    ByteBuffer appInputBuffer =
-                            ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+    boolean readAndProcessData() throws IOException
+    {
+        boolean readData = readIntoBuffer(_netInputBuffer);
 
-                    _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
-                    if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
-                    {
-                        // KW If SSLEngine changes state to CLOSED, what will ever set _closed to true?
-                        LOGGER.debug("SSLEngine closed");
-                    }
-
-                    tasksRun = runSSLEngineTasks(_status);
-
-                    appInputBuffer.flip();
-                    unwrapped = appInputBuffer.remaining();
-                    if(unwrapped > 0)
-                    {
-                        readData = true;
-                    }
-                    _protocolEngine.received(appInputBuffer);
-                }
-                while(unwrapped > 0 || tasksRun);
+        ByteBuffer duplicate = _netInputBuffer.duplicate();
+        duplicate.flip();
 
-                _netInputBuffer.compact();
+        readData |= processData(duplicate);
 
-            }
+        if (_netInputBuffer.hasRemaining())
+        {
+            // slice but keep unprocessed data
+            int amountOfUnprocessedData = duplicate.remaining();
+            _netInputBuffer.position(_netInputBuffer.position() - amountOfUnprocessedData);
+            _netInputBuffer = _netInputBuffer.slice();
+            _netInputBuffer.position(amountOfUnprocessedData);
         }
         else
         {
-            int read = 1;
-            while (!_closed.get() && read > 0)
-            {
-
-                read = _socketChannel.read(_netInputBuffer);
-                if (read == -1)
-                {
-                    _closed.set(true);
-                }
-
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
-                }
-
-                if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
-                {
-                    _netInputBuffer.flip();
-                    final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
-                    ByteBuffer dup = _netInputBuffer.duplicate();
-                    dup.get(headerBytes);
-
-                    _transportEncryption =  looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE;
-                    LOGGER.debug("Identified transport encryption as " + _transportEncryption);
-
-                    if (_transportEncryption == TransportEncryption.NONE)
-                    {
-                        _protocolEngine.received(_netInputBuffer);
-                    }
-                    else
-                    {
-                        _onTransportEncryptionAction.run();
-                        _netInputBuffer.compact();
-                        readData = doRead();
-                    }
-                    break;
-                }
-            }
+            // compact into new buffer
+            _netInputBuffer = ByteBuffer.allocate(_receiveBufferSize);
+            _netInputBuffer.put(duplicate);
         }
         return readData;
     }
 
-    private boolean doWrite() throws IOException
+    void writeToTransport(ByteBuffer[] buffers) throws IOException
     {
+        long written  = _socketChannel.write(buffers);
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Written " + written + " bytes");
+        }
+    }
 
+    private boolean doWrite() throws IOException
+    {
         ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
         Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
         for (int i = 0; i < bufArray.length; i++)
@@ -507,90 +374,9 @@ public class NonBlockingConnection imple
             bufArray[i] = bufferIterator.next();
         }
 
-        int byteBuffersWritten = 0;
-
-        if(_transportEncryption == TransportEncryption.NONE)
-        {
-
-
-            long written = _socketChannel.write(bufArray);
-            if (LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("Written " + written + " bytes");
-            }
-
-            for (ByteBuffer buf : bufArray)
-            {
-                if (buf.remaining() == 0)
-                {
-                    byteBuffersWritten++;
-                    _buffers.poll();
-                }
-                else
-                {
-                    break;
-                }
-            }
-
-
-            return bufArray.length == byteBuffersWritten;
-        }
-        else if(_transportEncryption == TransportEncryption.TLS)
+        if (_delegate != null)
         {
-            int remaining = 0;
-            do
-            {
-                if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
-                {
-                    _workDone = true;
-                    final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
-                    _status = _sslEngine.wrap(bufArray, netBuffer);
-                    runSSLEngineTasks(_status);
-
-                    netBuffer.flip();
-                    remaining = netBuffer.remaining();
-                    if (remaining != 0)
-                    {
-                        _encryptedOutput.add(netBuffer);
-                    }
-                    for (ByteBuffer buf : bufArray)
-                    {
-                        if (buf.remaining() == 0)
-                        {
-                            byteBuffersWritten++;
-                            _buffers.poll();
-                        }
-                        else
-                        {
-                            break;
-                        }
-                    }
-                }
-
-            }
-            while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
-            ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
-            long written  = _socketChannel.write(encryptedBuffers);
-            if (LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("Written " + written + " encrypted bytes");
-            }
-            ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
-            while(iter.hasNext())
-            {
-                ByteBuffer buf = iter.next();
-                if(buf.remaining() == 0)
-                {
-                    iter.remove();
-                }
-                else
-                {
-                    break;
-                }
-            }
-
-            return (bufArray.length == byteBuffersWritten) && _encryptedOutput.isEmpty();
-
+            return _delegate.doWrite(bufArray);
         }
         else
         {
@@ -598,18 +384,31 @@ public class NonBlockingConnection imple
         }
     }
 
-    private boolean runSSLEngineTasks(final SSLEngineResult status)
+    boolean processData(ByteBuffer data) throws IOException
+    {
+        return _delegate.processData(data);
+    }
+
+    protected boolean readIntoBuffer(final ByteBuffer buffer) throws IOException
     {
-        if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+        boolean readData = false;
+        int read = _socketChannel.read(buffer);
+        if (read > 0)
         {
-            Runnable task;
-            while((task = _sslEngine.getDelegatedTask()) != null)
-            {
-                task.run();
-            }
-            return true;
+            readData = true;
+        }
+        else if (read == -1)
+        {
+            _closed.set(true);
         }
-        return false;
+
+        _partialRead = !buffer.hasRemaining();
+
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Read " + read + " byte(s)");
+        }
+        return readData;
     }
 
     @Override
@@ -626,6 +425,11 @@ public class NonBlockingConnection imple
         }
     }
 
+    public void writeBufferProcessed()
+    {
+        _buffers.poll();
+    }
+
     @Override
     public void flush()
     {
@@ -642,34 +446,30 @@ public class NonBlockingConnection imple
         return "[NonBlockingConnection " + _remoteSocketAddress + "]";
     }
 
-    private boolean looksLikeSSL(final byte[] headerBytes)
-    {
-        return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
-    }
-
-    private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+    public NetworkConnectionScheduler getScheduler()
     {
-        return headerBytes[0] == 22 && // SSL Handshake
-               (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
-                (headerBytes[2] == 0 || // SSL 3.0
-                 headerBytes[2] == 1 || // TLS 1.0
-                 headerBytes[2] == 2 || // TLS 1.1
-                 headerBytes[2] == 3)) && // TLS1.2
-               (headerBytes[5] == 1); // client_hello
+        return _scheduler;
     }
 
-    private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+    public void processAmqpData(ByteBuffer data)
     {
-        return headerBytes[0] == -128 &&
-               headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
-               (headerBytes[4] == 0 || // SSL 3.0
-                headerBytes[4] == 1 || // TLS 1.0
-                headerBytes[4] == 2 || // TLS 1.1
-                headerBytes[4] == 3);
+        _protocolEngine.received(data);
     }
 
-    public NetworkConnectionScheduler getScheduler()
+    public void setTransportEncryption(TransportEncryption transportEncryption)
     {
-        return _scheduler;
+        switch (transportEncryption)
+        {
+            case TLS:
+                _onTransportEncryptionAction.run();
+                _delegate = new NonBlockingConnectionTLSDelegate(this, _port);
+                break;
+            case NONE:
+                _delegate = new NonBlockingConnectionPlainDelegate(this);
+                break;
+            default:
+                throw new IllegalArgumentException("unknown TransportEncryption " + transportEncryption);
+        }
+        LOGGER.debug("Identified transport encryption as " + transportEncryption);
     }
 }

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java?rev=1685745&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java Tue Jun 16 09:28:46 2015
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.Certificate;
+
+public interface NonBlockingConnectionDelegate
+{
+    boolean doRead() throws IOException;
+    boolean doWrite(ByteBuffer[] bufferArray) throws IOException;
+    boolean processData(ByteBuffer data) throws IOException;
+
+    Principal getPeerPrincipal();
+
+    Certificate getPeerCertificate();
+
+    boolean needsWork();
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1685745&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java Tue Jun 16 09:28:46 2015
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.Certificate;
+
+public class NonBlockingConnectionPlainDelegate implements NonBlockingConnectionDelegate
+{
+    private final NonBlockingConnection _parent;
+
+    public NonBlockingConnectionPlainDelegate(NonBlockingConnection parent)
+    {
+        _parent = parent;
+    }
+
+    @Override
+    public boolean doRead() throws IOException
+    {
+        return _parent.readAndProcessData();
+    }
+
+    @Override
+    public boolean processData(ByteBuffer buffer)
+    {
+        _parent.processAmqpData(buffer);
+        buffer.position(buffer.limit());
+        return false;
+    }
+
+    @Override
+    public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+    {
+        int byteBuffersWritten = 0;
+
+        _parent.writeToTransport(bufferArray);
+
+        for (ByteBuffer buf : bufferArray)
+        {
+            if (buf.remaining() == 0)
+            {
+                byteBuffersWritten++;
+                _parent.writeBufferProcessed();
+            }
+            else
+            {
+                break;
+            }
+        }
+
+        return bufferArray.length == byteBuffersWritten;
+    }
+
+    @Override
+    public Principal getPeerPrincipal()
+    {
+        return null;
+    }
+
+    @Override
+    public Certificate getPeerCertificate()
+    {
+        return null;
+    }
+
+    @Override
+    public boolean needsWork()
+    {
+        return false;
+    }
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1685745&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java Tue Jun 16 09:28:46 2015
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDelegate
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnectionTLSDelegate.class);
+
+    private final SSLEngine _sslEngine;
+    private final NonBlockingConnection _parent;
+    private SSLEngineResult _status;
+    private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
+    private Principal _principal;
+    private Certificate _peerCertificate;
+    private boolean _principalChecked;
+
+    public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, AmqpPort port)
+    {
+        _parent = parent;
+        _sslEngine = createSSLEngine(port);
+    }
+
+    @Override
+    public boolean doRead() throws IOException
+    {
+        boolean readData = false;
+        if (_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+        {
+            readData = _parent.readAndProcessData();
+        }
+        return readData;
+    }
+
+    @Override
+    public boolean processData(ByteBuffer buffer) throws IOException
+    {
+        return unwrapAndProcessBuffer(buffer);
+    }
+
+    @Override
+    public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+    {
+        int byteBuffersWritten = wrapBufferArray(bufferArray);
+
+        ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+
+        _parent.writeToTransport(encryptedBuffers);
+
+        ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+        while(iter.hasNext())
+        {
+            ByteBuffer buf = iter.next();
+            if(buf.remaining() == 0)
+            {
+                iter.remove();
+            }
+            else
+            {
+                break;
+            }
+        }
+
+        return (bufferArray.length == byteBuffersWritten) && _encryptedOutput.isEmpty();
+    }
+
+    private boolean unwrapAndProcessBuffer(final ByteBuffer wrappedDataBuffer) throws SSLException
+    {
+        boolean readData = false;
+        int unwrapped;
+        boolean tasksRun;
+        do
+        {
+            ByteBuffer appInputBuffer =
+                    ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+            _status = _sslEngine.unwrap(wrappedDataBuffer, appInputBuffer);
+            if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
+            {
+                // KW If SSLEngine changes state to CLOSED, what will ever set _closed to true?
+                LOGGER.debug("SSLEngine closed");
+            }
+
+            tasksRun = runSSLEngineTasks(_status);
+
+            appInputBuffer.flip();
+            unwrapped = appInputBuffer.remaining();
+            if(unwrapped > 0)
+            {
+                readData = true;
+            }
+            _parent.processAmqpData(appInputBuffer);
+        }
+        while(unwrapped > 0 || tasksRun);
+        return readData;
+    }
+
+    private int wrapBufferArray(final ByteBuffer[] bufferArray) throws SSLException
+    {
+        int byteBuffersWritten = 0;
+        int remaining = 0;
+        do
+        {
+            if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+            {
+                final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+                _status = _sslEngine.wrap(bufferArray, netBuffer);
+                runSSLEngineTasks(_status);
+
+                netBuffer.flip();
+                remaining = netBuffer.remaining();
+                if (remaining != 0)
+                {
+                    _encryptedOutput.add(netBuffer);
+                }
+                for (ByteBuffer buf : bufferArray)
+                {
+                    if (buf.remaining() == 0)
+                    {
+                        byteBuffersWritten++;
+                        _parent.writeBufferProcessed();
+                    }
+                    else
+                    {
+                        break;
+                    }
+                }
+            }
+
+        }
+        while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+        return byteBuffersWritten;
+    }
+
+    private boolean runSSLEngineTasks(final SSLEngineResult status)
+    {
+        if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+        {
+            Runnable task;
+            while((task = _sslEngine.getDelegatedTask()) != null)
+            {
+                task.run();
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public Principal getPeerPrincipal()
+    {
+        checkPeerPrincipal();
+        return _principal;
+    }
+
+    @Override
+    public Certificate getPeerCertificate()
+    {
+        checkPeerPrincipal();
+        return _peerCertificate;
+    }
+
+    @Override
+    public boolean needsWork()
+    {
+        return _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
+    }
+
+    private synchronized void checkPeerPrincipal()
+    {
+        if (!_principalChecked)
+        {
+            try
+            {
+                _principal = _sslEngine.getSession().getPeerPrincipal();
+                final Certificate[] peerCertificates =
+                        _sslEngine.getSession().getPeerCertificates();
+                if (peerCertificates != null && peerCertificates.length > 0)
+                {
+                    _peerCertificate = peerCertificates[0];
+                }
+            }
+            catch (SSLPeerUnverifiedException e)
+            {
+                _principal = null;
+                _peerCertificate = null;
+            }
+
+            _principalChecked = true;
+        }
+    }
+
+    private SSLEngine createSSLEngine(AmqpPort port)
+    {
+        SSLEngine sslEngine = port.getSSLContext().createSSLEngine();
+        sslEngine.setUseClientMode(false);
+        SSLUtil.removeSSLv3Support(sslEngine);
+        SSLUtil.updateEnabledCipherSuites(sslEngine, port.getEnabledCipherSuites(), port.getDisabledCipherSuites());
+
+        if(port.getNeedClientAuth())
+        {
+            sslEngine.setNeedClientAuth(true);
+        }
+        else if(port.getWantClientAuth())
+        {
+            sslEngine.setWantClientAuth(true);
+        }
+        return sslEngine;
+    }
+
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1685745&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java Tue Jun 16 09:28:46 2015
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.network.TransportEncryption;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.Certificate;
+
+public class NonBlockingConnectionUndecidedDelegate implements NonBlockingConnectionDelegate
+{
+    private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+    public final NonBlockingConnection _parent;
+
+    public NonBlockingConnectionUndecidedDelegate(NonBlockingConnection parent)
+    {
+        _parent = parent;
+    }
+
+    @Override
+    public boolean doRead() throws IOException
+    {
+        return _parent.readAndProcessData();
+    }
+
+    public boolean processData(ByteBuffer buffer) throws IOException
+    {
+        if (buffer.remaining() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
+        {
+            final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
+            ByteBuffer dup = buffer.duplicate();
+            dup.get(headerBytes);
+
+            if (looksLikeSSL(headerBytes))
+            {
+                _parent.setTransportEncryption(TransportEncryption.TLS);
+            }
+            else
+            {
+                _parent.setTransportEncryption(TransportEncryption.NONE);
+            }
+            _parent.processData(buffer);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+    {
+        return true;
+    }
+
+    @Override
+    public Principal getPeerPrincipal()
+    {
+        return null;
+    }
+
+    @Override
+    public Certificate getPeerCertificate()
+    {
+        return null;
+    }
+
+    @Override
+    public boolean needsWork()
+    {
+        return false;
+    }
+
+    private boolean looksLikeSSL(final byte[] headerBytes)
+    {
+        return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+    }
+
+    private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+    {
+        return headerBytes[0] == 22 && // SSL Handshake
+                (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+                        (headerBytes[2] == 0 || // SSL 3.0
+                                headerBytes[2] == 1 || // TLS 1.0
+                                headerBytes[2] == 2 || // TLS 1.1
+                                headerBytes[2] == 3)) && // TLS1.2
+                (headerBytes[5] == 1); // client_hello
+    }
+
+    private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+    {
+        return headerBytes[0] == -128 &&
+                headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+                (headerBytes[4] == 0 || // SSL 3.0
+                        headerBytes[4] == 1 || // TLS 1.0
+                        headerBytes[4] == 2 || // TLS 1.1
+                        headerBytes[4] == 3);
+    }
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Tue Jun 16 09:28:46 2015
@@ -23,7 +23,6 @@ package org.apache.qpid.server.transport
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
-import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.EnumSet;
@@ -31,6 +30,7 @@ import java.util.Set;
 
 import javax.net.ssl.SSLContext;
 
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +45,8 @@ import org.apache.qpid.transport.network
 import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
 import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
 
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
 public class NonBlockingNetworkTransport
 {
 
@@ -54,41 +56,53 @@ public class NonBlockingNetworkTransport
     private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
                                                                     CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
     private final Set<TransportEncryption> _encryptionSet;
-    private final NetworkTransportConfiguration _config;
     private final ProtocolEngineFactory _factory;
-    private final SSLContext _sslContext;
     private final ServerSocketChannel _serverSocket;
     private final int _timeout;
     private final NetworkConnectionScheduler _scheduler;
+    private final AmqpPort _port;
+    private final InetSocketAddress _address;
 
-    public NonBlockingNetworkTransport(final NetworkTransportConfiguration config,
-                                       final MultiVersionProtocolEngineFactory factory,
-                                       final SSLContext sslContext,
+    public NonBlockingNetworkTransport(final MultiVersionProtocolEngineFactory factory,
                                        final EnumSet<TransportEncryption> encryptionSet,
-                                       final NetworkConnectionScheduler scheduler)
+                                       final NetworkConnectionScheduler scheduler,
+                                       final AmqpPort port)
     {
         try
         {
 
-            _config = config;
             _factory = factory;
-            _sslContext = sslContext;
             _timeout = TIMEOUT;
 
-            InetSocketAddress address = config.getAddress();
+            String bindingAddress = port.getBindingAddress();
+            if (WILDCARD_ADDRESS.equals(bindingAddress))
+            {
+                bindingAddress = null;
+            }
+            int portNumber = port.getPort();
+
+            if ( bindingAddress == null )
+            {
+                _address = new InetSocketAddress(portNumber);
+            }
+            else
+            {
+                _address = new InetSocketAddress(bindingAddress, portNumber);
+            }
 
             _serverSocket =  ServerSocketChannel.open();
 
             _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
-            _serverSocket.bind(address);
+            _serverSocket.bind(_address);
             _serverSocket.configureBlocking(false);
             _encryptionSet = encryptionSet;
             _scheduler = scheduler;
+            _port = port;
 
         }
         catch (IOException e)
         {
-            throw new TransportException("Failed to start AMQP on port : " + config, e);
+            throw new TransportException("Failed to start AMQP on port : " + port, e);
         }
 
     }
@@ -108,7 +122,7 @@ public class NonBlockingNetworkTransport
         }
         catch (IOException e)
         {
-            LOGGER.warn("Error closing the server socket for : " +  _config.getAddress().toString(), e);
+            LOGGER.warn("Error closing the server socket for : " +  _address.toString(), e);
         }
     }
 
@@ -117,11 +131,6 @@ public class NonBlockingNetworkTransport
         return _serverSocket.socket().getLocalPort();
     }
 
-    public NetworkTransportConfiguration getConfig()
-    {
-        return _config;
-    }
-
     void acceptSocketChannel(final ServerSocketChannel serverSocketChannel)
     {
         SocketChannel socketChannel = null;
@@ -136,11 +145,11 @@ public class NonBlockingNetworkTransport
 
             if(engine != null)
             {
-                socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+                socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _port.isTcpNoDelay());
                 socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
 
-                final int sendBufferSize = _config.getSendBufferSize();
-                final int receiveBufferSize = _config.getReceiveBufferSize();
+                final int sendBufferSize = _port.getSendBufferSize();
+                final int receiveBufferSize = _port.getReceiveBufferSize();
 
                 socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
                 socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
@@ -157,12 +166,7 @@ public class NonBlockingNetworkTransport
                                                   engine,
                                                   receiveBufferSize,
                                                   _encryptionSet,
-                                                  _sslContext,
-                                                  _config.wantClientAuth(),
-                                                  _config.needClientAuth(),
-                                                  _config.getEnabledCipherSuites(),
-                                                  _config.getDisabledCipherSuites(),
-                                                  new Runnable()
+                                new Runnable()
                                                   {
 
                                                       @Override
@@ -171,7 +175,8 @@ public class NonBlockingNetworkTransport
                                                           engine.encryptedTransport();
                                                       }
                                                   },
-                                                  _scheduler);
+                                                  _scheduler,
+                                                  _port);
 
                 engine.setNetworkConnection(connection, connection.getSender());
                 connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
@@ -208,9 +213,4 @@ public class NonBlockingNetworkTransport
             }
         }
     }
-
-    public int getThreadPoolSize()
-    {
-        return _config.getThreadPoolSize();
-    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Tue Jun 16 09:28:46 2015
@@ -20,42 +20,31 @@
  */
 package org.apache.qpid.server.transport;
 
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
-import java.net.InetSocketAddress;
 import java.util.EnumSet;
-import java.util.Collection;
 import java.util.Set;
 
-import javax.net.ssl.SSLContext;
-
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.network.TransportEncryption;
 
 class TCPandSSLTransport implements AcceptingTransport
 {
     private NonBlockingNetworkTransport _networkTransport;
     private Set<Transport> _transports;
-    private SSLContext _sslContext;
-    private InetSocketAddress _bindingSocketAddress;
     private AmqpPort<?> _port;
     private Set<Protocol> _supported;
     private Protocol _defaultSupportedProtocolReply;
     private NetworkConnectionScheduler _scheduler;
 
     TCPandSSLTransport(final Set<Transport> transports,
-                       final SSLContext sslContext,
                        final AmqpPort<?> port,
                        final Set<Protocol> supported,
                        final Protocol defaultSupportedProtocolReply)
     {
         _transports = transports;
-        _sslContext = sslContext;
         _port = port;
         _supported = supported;
         _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
@@ -64,24 +53,6 @@ class TCPandSSLTransport implements Acce
     @Override
     public void start()
     {
-        String bindingAddress = _port.getBindingAddress();
-        if (WILDCARD_ADDRESS.equals(bindingAddress))
-        {
-            bindingAddress = null;
-        }
-        int port = _port.getPort();
-        if ( bindingAddress == null )
-        {
-            _bindingSocketAddress = new InetSocketAddress(port);
-        }
-        else
-        {
-            _bindingSocketAddress = new InetSocketAddress(bindingAddress, port);
-        }
-
-        final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
-
-
         final MultiVersionProtocolEngineFactory protocolEngineFactory =
                 new MultiVersionProtocolEngineFactory(
                 _port.getParent(Broker.class),
@@ -100,9 +71,9 @@ class TCPandSSLTransport implements Acce
             encryptionSet.add(TransportEncryption.TLS);
         }
 
-        _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(),settings.getThreadPoolSize());
-        _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory,
-                                                            _sslContext, encryptionSet, _scheduler);
+        _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(), _port.getThreadPoolSize());
+        _networkTransport = new NonBlockingNetworkTransport(protocolEngineFactory,
+                                                            encryptionSet, _scheduler, _port);
         _networkTransport.start();
     }
 
@@ -123,71 +94,4 @@ class TCPandSSLTransport implements Acce
             _scheduler.close();
         }
     }
-
-    class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
-    {
-        public ServerNetworkTransportConfiguration()
-        {
-        }
-
-        @Override
-        public boolean wantClientAuth()
-        {
-            return _port.getWantClientAuth();
-        }
-
-        @Override
-        public Collection<String> getEnabledCipherSuites()
-        {
-            return _port.getEnabledCipherSuites();
-        }
-
-        @Override
-        public Collection<String> getDisabledCipherSuites()
-        {
-            return _port.getDisabledCipherSuites();
-        }
-
-        @Override
-        public boolean needClientAuth()
-        {
-            return _port.getNeedClientAuth();
-        }
-
-        @Override
-        public boolean getTcpNoDelay()
-        {
-            return _port.isTcpNoDelay();
-        }
-
-        @Override
-        public int getSendBufferSize()
-        {
-            return _port.getSendBufferSize();
-        }
-
-        @Override
-        public int getThreadPoolSize()
-        {
-            return Runtime.getRuntime().availableProcessors();
-        }
-
-        @Override
-        public int getReceiveBufferSize()
-        {
-            return _port.getReceiveBufferSize();
-        }
-
-        @Override
-        public InetSocketAddress getAddress()
-        {
-            return _bindingSocketAddress;
-        }
-
-        @Override
-        public String toString()
-        {
-            return _port.toString();
-        }
-    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java Tue Jun 16 09:28:46 2015
@@ -37,6 +37,6 @@ class TCPandSSLTransportProvider impleme
                                               final Set<Protocol> supported,
                                               final Protocol defaultSupportedProtocolReply)
     {
-        return new TCPandSSLTransport(transports, sslContext, port, supported, defaultSupportedProtocolReply);
+        return new TCPandSSLTransport(transports, port, supported, defaultSupportedProtocolReply);
     }
 }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Tue Jun 16 09:28:46 2015
@@ -19,6 +19,7 @@
 
 package org.apache.qpid.server.transport;
 
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngine;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -45,63 +46,16 @@ public class NetworkConnectionSchedulerT
 
     public void testFairRead() throws IOException, InterruptedException
     {
-        NetworkTransportConfiguration config = new NetworkTransportConfiguration()
-        {
-
-            @Override
-            public boolean getTcpNoDelay()
-            {
-                return true;
-            }
-
-            @Override
-            public int getReceiveBufferSize()
-            {
-                return 1;
-            }
-
-            @Override
-            public int getSendBufferSize()
-            {
-                return 1;
-            }
-
-            @Override
-            public int getThreadPoolSize()
-            {
-                return 1;
-            }
-
-            @Override
-            public InetSocketAddress getAddress()
-            {
-                return new InetSocketAddress(0);
-            }
-
-            @Override
-            public boolean needClientAuth()
-            {
-                return false;
-            }
-
-            @Override
-            public boolean wantClientAuth()
-            {
-                return false;
-            }
+        AmqpPort port = mock(AmqpPort.class);
+        when(port.isTcpNoDelay()).thenReturn(true);
+        when(port.getSendBufferSize()).thenReturn(1);
+        when(port.getReceiveBufferSize()).thenReturn(1);
+        when(port.getPort()).thenReturn(0);
+        when(port.getBindingAddress()).thenReturn("*");
+        when(port.getEnabledCipherSuites()).thenReturn(Collections.emptyList());
+        when(port.getDisabledCipherSuites()).thenReturn(Collections.emptyList());
+        when(port.getThreadPoolSize()).thenReturn(1);
 
-            @Override
-            public Collection<String> getEnabledCipherSuites()
-            {
-                return Collections.emptyList();
-            }
-
-            @Override
-            public Collection<String> getDisabledCipherSuites()
-            {
-                return Collections.emptyList();
-            }
-        };
         MultiVersionProtocolEngineFactory engineFactory = mock(MultiVersionProtocolEngineFactory.class);
         MultiVersionProtocolEngine verboseEngine = mock(MultiVersionProtocolEngine.class);
         MultiVersionProtocolEngine timidEngine = mock(MultiVersionProtocolEngine.class);
@@ -112,14 +66,14 @@ public class NetworkConnectionSchedulerT
 
         final NetworkConnectionScheduler scheduler = new NetworkConnectionScheduler(getName(), 1);
 
-        NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(config, engineFactory, null, EnumSet.of(TransportEncryption.NONE),
-                                                                                scheduler);
+        NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(engineFactory, EnumSet.of(TransportEncryption.NONE),
+                                                                                scheduler, port);
 
         transport.start();
-        final int port = transport.getAcceptingPort();
+        final int portNumber = transport.getAcceptingPort();
 
         Socket verboseSocket = new Socket();
-        verboseSocket.connect(new InetSocketAddress(port));
+        verboseSocket.connect(new InetSocketAddress(portNumber));
         final OutputStream verboseOutputStream = verboseSocket.getOutputStream();
         Thread verboseSender = new Thread(new Runnable()
         {
@@ -141,7 +95,7 @@ public class NetworkConnectionSchedulerT
         });
 
         Socket timidSocket = new Socket();
-        timidSocket.connect(new InetSocketAddress(port));
+        timidSocket.connect(new InetSocketAddress(portNumber));
         final OutputStream timidOutputStream = timidSocket.getOutputStream();
         Thread timidSender = new Thread(new Runnable()
         {

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Tue Jun 16 09:28:46 2015
@@ -109,9 +109,10 @@ public class TCPandSSLTransportTest exte
         when(port.getSendBufferSize()).thenReturn(64*1024);
         when(port.getReceiveBufferSize()).thenReturn(64*1024);
         when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
+        when(port.getThreadPoolSize()).thenReturn(1);
+        when(port.getSSLContext()).thenReturn(sslContext);
 
         TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
-                                                              sslContext,
                                                               port,
                                                               new HashSet<>(Arrays.asList(Protocol.AMQP_0_8,
                                                                                           Protocol.AMQP_0_9,



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org