You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/16 17:52:33 UTC

svn commit: r1667068 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-core/...

Author: kwall
Date: Mon Mar 16 16:52:32 2015
New Revision: 1667068

URL: http://svn.apache.org/r1667068
Log:
QPID-6429, QPID-6262: [Java Broker]  Improve error handling in new NIO code; Remove MINA terminlogy (session etc) in 0-8 stack

* Also added uncaught exception handler in test framework (QBTC) to guard log the case where a thread exits abnormally.

Added:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Mon Mar 16 16:52:32 2015
@@ -69,8 +69,6 @@ public interface AMQConnectionModel<T ex
      */
     public LogSubject getLogSubject();
 
-    public boolean isSessionNameUnique(byte[] name);
-
     String getRemoteAddressString();
 
     SocketAddress getRemoteAddress();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java Mon Mar 16 16:52:32 2015
@@ -92,7 +92,6 @@ public class KerberosAuthenticationManag
         }
         catch (SaslException e)
         {
-            e.printStackTrace(System.err);
             return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
         }
     }

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1667068&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Mon Mar 16 16:52:32 2015
@@ -0,0 +1,114 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class NetworkConnectionScheduler
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
+
+    private final SelectorThread _selectorThread;
+    private final ScheduledThreadPoolExecutor _executor;
+    private final AtomicInteger _running = new AtomicInteger();
+    private final int _poolSize;
+
+    NetworkConnectionScheduler(final SelectorThread selectorThread)
+    {
+        _selectorThread = selectorThread;
+        _poolSize = Runtime.getRuntime().availableProcessors();
+        _executor = new ScheduledThreadPoolExecutor(_poolSize);
+        _executor.prestartAllCoreThreads();
+    }
+
+    public void schedule(final NonBlockingConnection connection)
+    {
+        _executor.submit(new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                String currentName = Thread.currentThread().getName();
+                                try
+                                {
+                                    Thread.currentThread().setName(
+                                            SelectorThread.IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
+                                    processConnection(connection);
+                                }
+                                finally
+                                {
+                                    Thread.currentThread().setName(currentName);
+                                }
+                            }
+                        });
+    }
+
+    private void processConnection(final NonBlockingConnection connection)
+    {
+        try
+        {
+            _running.incrementAndGet();
+            boolean rerun;
+            do
+            {
+                rerun = false;
+                boolean closed = connection.doWork();
+
+                if (!closed)
+                {
+
+                    if (connection.isStateChanged())
+                    {
+                        if (_running.get() == _poolSize)
+                        {
+                            schedule(connection);
+                        }
+                        else
+                        {
+                            rerun = true;
+                        }
+                    }
+                    else
+                    {
+                        _selectorThread.addConnection(connection);
+                    }
+                }
+
+            } while (rerun);
+        }
+        finally
+        {
+            _running.decrementAndGet();
+        }
+    }
+
+    public void close()
+    {
+        _executor.shutdown();
+    }
+
+
+
+}

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Mon Mar 16 16:52:32 2015
@@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.TransportEncryption;
@@ -55,18 +55,12 @@ import org.apache.qpid.util.SystemUtils;
 public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+    private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+
     private final SocketChannel _socketChannel;
-    private final long _timeout;
     private final Ticker _ticker;
+    private final Object _peerPrincipalLock = new Object();
     private final SelectorThread _selector;
-    private int _maxReadIdle;
-    private int _maxWriteIdle;
-    private Principal _principal;
-    private boolean _principalChecked;
-    private final Object _lock = new Object();
-
-    public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
-
     private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
     private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
 
@@ -74,9 +68,14 @@ public class NonBlockingConnection imple
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final ServerProtocolEngine _protocolEngine;
     private final int _receiveBufSize;
-    private final Set<TransportEncryption> _encryptionSet;
-    private final SSLContext _sslContext;
     private final Runnable _onTransportEncryptionAction;
+
+
+    private int _maxReadIdle;
+    private int _maxWriteIdle;
+    private Principal _principal;
+    private boolean _principalChecked;
+
     private ByteBuffer _netInputBuffer;
     private SSLEngine _sslEngine;
 
@@ -90,9 +89,7 @@ public class NonBlockingConnection imple
 
     public NonBlockingConnection(SocketChannel socketChannel,
                                  ServerProtocolEngine delegate,
-                                 int sendBufferSize,
                                  int receiveBufferSize,
-                                 long timeout,
                                  Ticker ticker,
                                  final Set<TransportEncryption> encryptionSet,
                                  final SSLContext sslContext,
@@ -104,14 +101,11 @@ public class NonBlockingConnection imple
                                  final SelectorThread selectorThread)
     {
         _socketChannel = socketChannel;
-        _timeout = timeout;
         _ticker = ticker;
         _selector = selectorThread;
 
         _protocolEngine = delegate;
         _receiveBufSize = receiveBufferSize;
-        _encryptionSet = encryptionSet;
-        _sslContext = sslContext;
         _onTransportEncryptionAction = onTransportEncryptionAction;
 
         delegate.setWorkListener(new Action<ServerProtocolEngine>()
@@ -125,7 +119,7 @@ public class NonBlockingConnection imple
 
         if(encryptionSet.size() == 1)
         {
-            _transportEncryption = _encryptionSet.iterator().next();
+            _transportEncryption = encryptionSet.iterator().next();
             if (_transportEncryption == TransportEncryption.TLS)
             {
                 onTransportEncryptionAction.run();
@@ -134,7 +128,7 @@ public class NonBlockingConnection imple
 
         if(encryptionSet.contains(TransportEncryption.TLS))
         {
-            _sslEngine = _sslContext.createSSLEngine();
+            _sslEngine = sslContext.createSSLEngine();
             _sslEngine.setUseClientMode(false);
             SSLUtil.removeSSLv3Support(_sslEngine);
             SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
@@ -150,26 +144,16 @@ public class NonBlockingConnection imple
             _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
         }
 
-        try
-        {
-            _remoteSocketAddress = _socketChannel.getRemoteAddress().toString();
-            _socketChannel.configureBlocking(false);
-        }
-        catch (IOException e)
-        {
-            throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
-        }
-
-
+        _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
     }
 
 
-    public Ticker getTicker()
+    Ticker getTicker()
     {
         return _ticker;
     }
 
-    public SocketChannel getSocketChannel()
+    SocketChannel getSocketChannel()
     {
         return _socketChannel;
     }
@@ -189,7 +173,7 @@ public class NonBlockingConnection imple
         if(_closed.compareAndSet(false,true))
         {
             _protocolEngine.notifyWork();
-            getSelector().wakeup();
+            _selector.wakeup();
         }
     }
 
@@ -216,7 +200,7 @@ public class NonBlockingConnection imple
     @Override
     public Principal getPeerPrincipal()
     {
-        synchronized (_lock)
+        synchronized (_peerPrincipalLock)
         {
             if(!_principalChecked)
             {
@@ -301,7 +285,7 @@ public class NonBlockingConnection imple
                 // tell all consumer targets that it is okay to accept more
                 _protocolEngine.setMessageAssignmentSuspended(false);
             }
-            catch (IOException e)
+            catch (IOException | ConnectionScopedRuntimeException e)
             {
                 LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
                 LOGGER.debug("Closing " + _remoteSocketAddress);
@@ -359,22 +343,7 @@ public class NonBlockingConnection imple
 
     }
 
-    public SelectorThread getSelector()
-    {
-        return _selector;
-    }
-
-    public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
-    {
-        return headerBytes[0] == -128 &&
-               headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
-               (headerBytes[4] == 0 || // SSL 3.0
-                headerBytes[4] == 1 || // TLS 1.0
-                headerBytes[4] == 2 || // TLS 1.1
-                headerBytes[4] == 3);
-    }
-
-    public boolean doRead() throws IOException
+    private boolean doRead() throws IOException
     {
         boolean readData = false;
         if(_transportEncryption == TransportEncryption.NONE)
@@ -496,7 +465,7 @@ public class NonBlockingConnection imple
         return readData;
     }
 
-    public boolean doWrite() throws IOException
+    private boolean doWrite() throws IOException
     {
 
         ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
@@ -589,18 +558,7 @@ public class NonBlockingConnection imple
         }
     }
 
-    public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
-    {
-        return headerBytes[0] == 22 && // SSL Handshake
-               (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
-                (headerBytes[2] == 0 || // SSL 3.0
-                 headerBytes[2] == 1 || // TLS 1.0
-                 headerBytes[2] == 2 || // TLS 1.1
-                 headerBytes[2] == 3)) && // TLS1.2
-               (headerBytes[5] == 1); // client_hello
-    }
-
-    public boolean runSSLEngineTasks(final SSLEngineResult status)
+    private boolean runSSLEngineTasks(final SSLEngineResult status)
     {
         if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
         {
@@ -614,15 +572,11 @@ public class NonBlockingConnection imple
         return false;
     }
 
-    public boolean looksLikeSSL(final byte[] headerBytes)
-    {
-        return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
-    }
-
     @Override
     public void send(final ByteBuffer msg)
     {
-        assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName();
+        assert _selector.isIOThread() : "Send called by unexpected thread " + Thread.currentThread().getName();
+
 
         if (_closed.get())
         {
@@ -631,7 +585,7 @@ public class NonBlockingConnection imple
         else
         {
             _buffers.add(msg);
-            _protocolEngine.notifyWork();
+            _protocolEngine.notifyWork();  // TODO now redundant
         }
     }
 
@@ -639,4 +593,36 @@ public class NonBlockingConnection imple
     public void flush()
     {
     }
+
+    @Override
+    public String toString()
+    {
+        return "[NonBlockingConnection " + _remoteSocketAddress + "]";
+    }
+
+    private boolean looksLikeSSL(final byte[] headerBytes)
+    {
+        return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+    }
+
+    private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+    {
+        return headerBytes[0] == 22 && // SSL Handshake
+               (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+                (headerBytes[2] == 0 || // SSL 3.0
+                 headerBytes[2] == 1 || // TLS 1.0
+                 headerBytes[2] == 2 || // TLS 1.1
+                 headerBytes[2] == 3)) && // TLS1.2
+               (headerBytes[5] == 1); // client_hello
+    }
+
+    private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+    {
+        return headerBytes[0] == -128 &&
+               headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+               (headerBytes[4] == 0 || // SSL 3.0
+                headerBytes[4] == 1 || // TLS 1.0
+                headerBytes[4] == 2 || // TLS 1.1
+                headerBytes[4] == 3);
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Mon Mar 16 16:52:32 2015
@@ -25,14 +25,17 @@ import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.EnumSet;
 import java.util.Set;
 
 import javax.net.ssl.SSLContext;
 
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.TransportException;
@@ -43,51 +46,24 @@ import org.apache.qpid.transport.network
 public class NonBlockingNetworkTransport
 {
 
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
     private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
                                                           CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
     private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
                                                                    CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
-    private SelectorThread _selector;
-
-
-    private  Set<TransportEncryption> _encryptionSet;
-    private volatile boolean _closed = false;
-    private NetworkTransportConfiguration _config;
-    private ProtocolEngineFactory _factory;
-    private SSLContext _sslContext;
-    private ServerSocketChannel _serverSocket;
-    private int _timeout;
-
-    public void close()
-    {
-        if(_selector != null)
-        {
-            try
-            {
-                if (_serverSocket != null)
-                {
-                    _selector.cancelAcceptingSocket(_serverSocket);
-                    _serverSocket.close();
-                }
-            }
-            catch (IOException e)
-            {
-                // TODO
-                e.printStackTrace();
-            }
-            finally
-            {
+    private final Set<TransportEncryption> _encryptionSet;
+    private final NetworkTransportConfiguration _config;
+    private final ProtocolEngineFactory _factory;
+    private final SSLContext _sslContext;
+    private final ServerSocketChannel _serverSocket;
+    private final int _timeout;
 
-                _selector.close();
-            }
-        }
-    }
+    private SelectorThread _selector;
 
-    public void accept(NetworkTransportConfiguration config,
-                       ProtocolEngineFactory factory,
-                       SSLContext sslContext,
-                       final Set<TransportEncryption> encryptionSet)
+    public NonBlockingNetworkTransport(final NetworkTransportConfiguration config,
+                                       final MultiVersionProtocolEngineFactory factory,
+                                       final SSLContext sslContext,
+                                       final EnumSet<TransportEncryption> encryptionSet)
     {
         try
         {
@@ -106,80 +82,138 @@ public class NonBlockingNetworkTransport
             _serverSocket.configureBlocking(false);
             _encryptionSet = encryptionSet;
 
-            _selector = new SelectorThread(config.getAddress().toString(), this);
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Failed to start AMQP on port : " + config, e);
+        }
+
+    }
+
+    public void start()
+    {
+        try
+        {
+            _selector = new SelectorThread(this);
             _selector.start();
             _selector.addAcceptingSocket(_serverSocket);
         }
         catch (IOException e)
         {
-            throw new TransportException("Failed to start AMQP on port : " + config, e);
+            throw new TransportException("Failed to start", e);
         }
+    }
 
 
+    public void close()
+    {
+        if(_selector != null)
+        {
+            _selector.cancelAcceptingSocket(_serverSocket);
+            try
+            {
+                _serverSocket.close();
+            }
+            catch (IOException e)
+            {
+                LOGGER.warn("Error closing the server socket for : " +  _config.getAddress().toString(), e);
+            }
+            finally
+            {
+                _selector.close();
+                _selector = null;
+            }
+        }
     }
 
     public int getAcceptingPort()
     {
-        return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort();
+        return _serverSocket.socket().getLocalPort();
     }
 
-    public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
+    public NetworkTransportConfiguration getConfig()
     {
-        final ServerProtocolEngine engine =
-                (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
-                                                                          .getRemoteSocketAddress());
+        return _config;
+    }
 
-        if(engine != null)
+    void acceptSocketChannel(final ServerSocketChannel serverSocketChannel)
+    {
+        SocketChannel socketChannel = null;
+        boolean success = false;
+        try
         {
-            socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
-            socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
-
-            final Integer sendBufferSize = _config.getSendBufferSize();
-            final Integer receiveBufferSize = _config.getReceiveBufferSize();
+            socketChannel = serverSocketChannel.accept();
 
-            socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
-            socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+            final ServerProtocolEngine engine =
+                    (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+                                                                              .getRemoteSocketAddress());
 
+            if(engine != null)
+            {
+                socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+                socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
 
-            final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+                final int sendBufferSize = _config.getSendBufferSize();
+                final int receiveBufferSize = _config.getReceiveBufferSize();
 
-            NonBlockingConnection connection =
-                    new NonBlockingConnection(socketChannel,
-                                              engine,
-                                              sendBufferSize,
-                                              receiveBufferSize,
-                                              _timeout,
-                                              ticker,
-                                              _encryptionSet,
-                                              _sslContext,
-                                              _config.wantClientAuth(),
-                                              _config.needClientAuth(),
-                                              _config.getEnabledCipherSuites(),
-                                              _config.getDisabledCipherSuites(),
-                                              new Runnable()
-                                              {
+                socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+                socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
 
-                                                  @Override
-                                                  public void run()
+                socketChannel.configureBlocking(false);
+
+                final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, _timeout);
+
+                NonBlockingConnection connection =
+                        new NonBlockingConnection(socketChannel,
+                                                  engine,
+                                                  receiveBufferSize,
+                                                  ticker,
+                                                  _encryptionSet,
+                                                  _sslContext,
+                                                  _config.wantClientAuth(),
+                                                  _config.needClientAuth(),
+                                                  _config.getEnabledCipherSuites(),
+                                                  _config.getDisabledCipherSuites(),
+                                                  new Runnable()
                                                   {
-                                                      engine.encryptedTransport();
-                                                  }
-                                              },
-                                              _selector);
 
-            engine.setNetworkConnection(connection, connection.getSender());
-            connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+                                                      @Override
+                                                      public void run()
+                                                      {
+                                                          engine.encryptedTransport();
+                                                      }
+                                                  },
+                                                  _selector);
+
+                engine.setNetworkConnection(connection, connection.getSender());
+                connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
 
-            ticker.setConnection(connection);
+                ticker.setConnection(connection);
 
-            connection.start();
+                connection.start();
 
-            _selector.addConnection(connection);
+                _selector.addConnection(connection);
 
+                success = true;
+            }
+        }
+        catch (IOException e)
+        {
+            LOGGER.error("Failed to process incoming socket", e);
         }
-        else
+        finally
         {
-            socketChannel.close();
+            if (!success && socketChannel != null)
+            {
+                try
+                {
+                    socketChannel.close();
+                }
+                catch (IOException e)
+                {
+                    LOGGER.debug("Failed to close socket " + socketChannel, e);
+                }
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Mon Mar 16 16:52:32 2015
@@ -32,43 +32,40 @@ import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.thread.LoggingUncaughtExceptionHandler;
-
 
 public class SelectorThread extends Thread
 {
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
 
-    public static final String IO_THREAD_NAME_PREFIX  = "NCS-";
+    static final String IO_THREAD_NAME_PREFIX  = "IO-";
     private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
+
+    /**
+     * Queue of connections that are not currently scheduled and not registered with the selector.
+     * These need to go back into the Selector.
+     */
     private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+
+    /** Set of connections that are currently being selected upon */
     private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+
     private final Selector _selector;
     private final AtomicBoolean _closed = new AtomicBoolean();
-    private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
+    private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(this);
     private final NonBlockingNetworkTransport _transport;
+    private long _nextTimeout;
 
-    SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport)
+    SelectorThread(final NonBlockingNetworkTransport nonBlockingNetworkTransport) throws IOException
     {
-        super("SelectorThread-"+name);
+        super("SelectorThread-" + nonBlockingNetworkTransport.getConfig().getAddress().toString());
+
         _transport = nonBlockingNetworkTransport;
-        try
-        {
-            _selector = Selector.open();
-        }
-        catch (IOException e)
-        {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-        }
+        _selector = Selector.open();
     }
 
     public void addAcceptingSocket(final ServerSocketChannel socketChannel)
@@ -83,10 +80,10 @@ public class SelectorThread extends Thre
                             {
                                 socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
                             }
-                            catch (ClosedChannelException e)
+                            catch (IllegalStateException | ClosedChannelException e)
                             {
-                                // TODO
-                                e.printStackTrace();
+                                // TODO Communicate condition back to model object to make it go into the ERROR state
+                                LOGGER.error("Failed to register selector on accepting port", e);
                             }
                         }
                     });
@@ -114,91 +111,38 @@ public class SelectorThread extends Thre
     public void run()
     {
 
-        long nextTimeout = 0;
+        _nextTimeout = 0;
 
         try
         {
             while (!_closed.get())
             {
 
-                _selector.select(nextTimeout);
-
-                while(_tasks.peek() != null)
+                try
                 {
-                    Runnable task = _tasks.poll();
-                    task.run();
+                    _selector.select(_nextTimeout);
                 }
-
-                List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
-
-                Set<SelectionKey> selectionKeys = _selector.selectedKeys();
-                for (SelectionKey key : selectionKeys)
+                catch (IOException e)
                 {
-                    if(key.isAcceptable())
-                    {
-                        // todo - should we schedule this rather than running in this thread?
-                        SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept();
-                        _transport.acceptSocketChannel(acceptedChannel);
-                    }
-                    else
-                    {
-                        NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
-
-                        key.channel().register(_selector, 0);
-
-                        toBeScheduled.add(connection);
-                        _unscheduledConnections.remove(connection);
-                    }
-
+                    // TODO Inform the model object
+                    LOGGER.error("Failed to select for " + _transport.getConfig().getAddress().toString(),e );
+                    break;
                 }
-                selectionKeys.clear();
-
-                while (_unregisteredConnections.peek() != null)
-                {
-                    NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
-                    _unscheduledConnections.add(unregisteredConnection);
 
+                runTasks();
 
-                    final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
-                                    | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
-                    unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+                List<NonBlockingConnection> toBeScheduled = processSelectionKeys();
 
-                }
-
-                long currentTime = System.currentTimeMillis();
-                Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
-                nextTimeout = Integer.MAX_VALUE;
-                while (iterator.hasNext())
-                {
-                    NonBlockingConnection connection = iterator.next();
-
-                    int period = connection.getTicker().getTimeToNextTick(currentTime);
+                toBeScheduled.addAll(reregisterUnregisteredConnections());
 
-                    if (period <= 0 || connection.isStateChanged())
-                    {
-                        toBeScheduled.add(connection);
-                        connection.getSocketChannel().register(_selector, 0).cancel();
-                        iterator.remove();
-                    }
-                    else
-                    {
-                        nextTimeout = Math.min(period, nextTimeout);
-                    }
-                }
+                toBeScheduled.addAll(processUnscheduledConnections());
 
                 for (NonBlockingConnection connection : toBeScheduled)
                 {
                     _scheduler.schedule(connection);
                 }
-
             }
         }
-        catch (IOException e)
-        {
-            //TODO
-            e.printStackTrace();
-        }
         finally
         {
             try
@@ -207,114 +151,144 @@ public class SelectorThread extends Thre
             }
             catch (IOException e)
             {
-                e.printStackTrace();
+                LOGGER.debug("Failed to close selector", e);
             }
         }
 
-
-
-
     }
 
-    public void addConnection(final NonBlockingConnection connection)
+    private List<NonBlockingConnection> processUnscheduledConnections()
     {
-        _unregisteredConnections.add(connection);
-        _selector.wakeup();
+        List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
 
-    }
+        long currentTime = System.currentTimeMillis();
+        Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+        _nextTimeout = Integer.MAX_VALUE;
+        while (iterator.hasNext())
+        {
+            NonBlockingConnection connection = iterator.next();
 
-    public void wakeup()
-    {
-        _selector.wakeup();
-    }
+            int period = connection.getTicker().getTimeToNextTick(currentTime);
 
-    public void close()
-    {
-        _closed.set(true);
-        _selector.wakeup();
-        _scheduler.close();
+            if (period <= 0 || connection.isStateChanged())
+            {
+                toBeScheduled.add(connection);
+                try
+                {
+                    LOGGER.debug("KWDEBUG# Setting interest to zero (PUC) " + connection);
+
+                    SelectionKey register = connection.getSocketChannel().register(_selector, 0);
+                    register.cancel();
+                }
+                catch (ClosedChannelException e)
+                {
+                    LOGGER.debug("Failed to register with selector for connection " + connection +
+                                 ". Connection is probably being closed by peer.", e);
+                }
+                iterator.remove();
+            }
+            else
+            {
+                _nextTimeout = Math.min(period, _nextTimeout);
+            }
+        }
+
+        return toBeScheduled;
     }
 
-    private class NetworkConnectionScheduler
+    private List<NonBlockingConnection> reregisterUnregisteredConnections()
     {
-        private final ScheduledThreadPoolExecutor _executor;
-        private final AtomicInteger _running = new AtomicInteger();
-        private final int _poolSize;
+        List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
 
-        private NetworkConnectionScheduler()
+        while (_unregisteredConnections.peek() != null)
         {
-            _poolSize = Runtime.getRuntime().availableProcessors();
-            _executor = new ScheduledThreadPoolExecutor(_poolSize);
-            _executor.prestartAllCoreThreads();
-        }
+            NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+            _unscheduledConnections.add(unregisteredConnection);
 
-        public void processConnection(final NonBlockingConnection connection)
-        {
+
+            final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+                            | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
             try
             {
-                _running.incrementAndGet();
-                boolean rerun;
-                do
-                {
-                    rerun = false;
-                    boolean closed = connection.doWork();
+                LOGGER.debug("KWDEBUG# Registering " + unregisteredConnection);
+                unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+            }
+            catch (ClosedChannelException e)
+            {
+                unregisterableConnections.add(unregisteredConnection);
+            }
+        }
 
-                    if (!closed)
-                    {
+        return unregisterableConnections;
+    }
 
-                        if (connection.isStateChanged())
-                        {
-                            if (_running.get() == _poolSize)
-                            {
-                                schedule(connection);
-                            }
-                            else
-                            {
-                                rerun = true;
-                            }
-                        }
-                        else
-                        {
-                            SelectorThread.this.addConnection(connection);
-                        }
-                    }
+    private List<NonBlockingConnection> processSelectionKeys()
+    {
+        List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
 
-                } while (rerun);
+        Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+        for (SelectionKey key : selectionKeys)
+        {
+            if(key.isAcceptable())
+            {
+                // todo - should we schedule this rather than running in this thread?
+                _transport.acceptSocketChannel((ServerSocketChannel)key.channel());
             }
-            finally
+            else
             {
-                _running.decrementAndGet();
+                NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+
+                try
+                {
+                    LOGGER.debug("KWDEBUG# Setting interest to zero (PSK)" + connection);
+
+                    key.channel().register(_selector, 0);
+                }
+                catch (ClosedChannelException e)
+                {
+                    // Ignore - we will schedule the connection anyway
+                }
+
+                toBeScheduled.add(connection);
+                _unscheduledConnections.remove(connection);
             }
-        }
 
-        public void schedule(final NonBlockingConnection connection)
-        {
-            _executor.submit(new Runnable()
-                            {
-                                @Override
-                                public void run()
-                                {
-                                    String currentName = Thread.currentThread().getName();
-                                    try
-                                    {
-                                        Thread.currentThread().setName(
-                                                IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
-                                        processConnection(connection);
-                                    }
-                                    finally
-                                    {
-                                        Thread.currentThread().setName(currentName);
-                                    }
-                                }
-                            });
         }
+        selectionKeys.clear();
+
+        return toBeScheduled;
+    }
 
-        public void close()
+    private void runTasks()
+    {
+        while(_tasks.peek() != null)
         {
-            _executor.shutdown();
+            Runnable task = _tasks.poll();
+            task.run();
         }
+    }
+
+    public void addConnection(final NonBlockingConnection connection)
+    {
+        _unregisteredConnections.add(connection);
+        _selector.wakeup();
+
+    }
 
+    public void wakeup()
+    {
+        _selector.wakeup();
+    }
 
+    public void close()
+    {
+        _closed.set(true);
+        _selector.wakeup();
+        _scheduler.close();
+    }
 
+    boolean isIOThread()
+    {
+        return Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX);
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Mon Mar 16 16:52:32 2015
@@ -79,7 +79,8 @@ class TCPandSSLTransport implements Acce
         }
 
         final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
-        _networkTransport = new NonBlockingNetworkTransport();
+
+
         final MultiVersionProtocolEngineFactory protocolEngineFactory =
                 new MultiVersionProtocolEngineFactory(
                 _port.getParent(Broker.class),
@@ -97,7 +98,9 @@ class TCPandSSLTransport implements Acce
         {
             encryptionSet.add(TransportEncryption.TLS);
         }
-        _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet);
+
+        _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory, _sslContext, encryptionSet);
+        _networkTransport.start();
     }
 
     public int getAcceptingPort()

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Mon Mar 16 16:52:32 2015
@@ -592,12 +592,6 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public boolean isSessionNameUnique(byte[] name)
-        {
-            return false;
-        }
-
-        @Override
         public String getRemoteAddressString()
         {
             return "remoteAddress:1234";

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Mar 16 16:52:32 2015
@@ -540,11 +540,6 @@ public class ServerConnection extends Co
         return _connectionId;
     }
 
-    public boolean isSessionNameUnique(byte[] name)
-    {
-        return !super.hasSessionWithName(name);
-    }
-
     public String getRemoteAddressString()
     {
         return String.valueOf(getRemoteAddress());

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Mar 16 16:52:32 2015
@@ -371,12 +371,17 @@ public class ServerConnectionDelegate ex
         while(connections.hasNext())
         {
             final AMQConnectionModel amqConnectionModel = connections.next();
-            final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
-                    ? ""
-                    : amqConnectionModel.getAuthorizedPrincipal().getName();
-            if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name))
+            if (amqConnectionModel instanceof ServerConnection)
             {
-                return false;
+                ServerConnection otherConnection = (ServerConnection)amqConnectionModel;
+
+                final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+                        ? ""
+                        : amqConnectionModel.getAuthorizedPrincipal().getName();
+                if (userId.equals(userName) && otherConnection.hasSessionWithName(name))
+                {
+                    return false;
+                }
             }
         }
         return true;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Mar 16 16:52:32 2015
@@ -857,9 +857,6 @@ public class AMQChannel
         return false;
     }
 
-    /**
-     * Called from the protocol session to close this channel and clean up. T
-     */
     @Override
     public void close()
     {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Mar 16 16:52:32 2015
@@ -93,7 +93,6 @@ import org.apache.qpid.transport.SenderC
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.util.BytesDataOutput;
 
 public class AMQProtocolEngine implements ServerProtocolEngine,
                                           AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
@@ -117,9 +116,8 @@ public class AMQProtocolEngine implement
     // to save boxing the channelId and looking up in a map... cache in an array the low numbered
     // channels.  This value must be of the form 2^x - 1.
     private static final int CHANNEL_CACHE_SIZE = 0xff;
-    private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
-    public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
-    public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
+    private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
+    private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
     private static final long AWAIT_CLOSED_TIMEOUT = 60000;
     private final AmqpPort<?> _port;
     private final long _creationTime;
@@ -156,10 +154,8 @@ public class AMQProtocolEngine implement
 
     private volatile boolean _closed;
 
-    // maximum number of channels this session should have
     private long _maxNoOfChannels;
 
-    /* AMQP Version for this session */
     private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
     private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
     private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList =
@@ -172,7 +168,7 @@ public class AMQProtocolEngine implement
     private ProtocolOutputConverter _protocolOutputConverter;
     private final Subject _authorizedSubject = new Subject();
 
-    private final long _connectionID;
+    private final long _connectionId;
     private Object _reference = new Object();
 
     private LogSubject _logSubject;
@@ -190,7 +186,7 @@ public class AMQProtocolEngine implement
     private ByteBufferSender _sender;
 
     private volatile boolean _deferFlush;
-    private long _lastReceivedTime = System.currentTimeMillis();  // TODO consider if this is what we want?
+    private long _lastReceivedTime = System.currentTimeMillis();
     private boolean _blocking;
 
     private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
@@ -247,7 +243,7 @@ public class AMQProtocolEngine implement
         _transport = transport;
         _maxNoOfChannels = broker.getConnection_sessionCountLimit();
         _decoder = new BrokerDecoder(this);
-        _connectionID = connectionId;
+        _connectionId = connectionId;
         _logSubject = new ConnectionLogSubject(this);
         _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH)
                 ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH)
@@ -272,11 +268,11 @@ public class AMQProtocolEngine implement
                 return null;
             }
         });
-        
-        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
-        _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
-        _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
-        _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + _connectionId);
+        _dataDelivered = new StatisticsCounter("data-delivered-" + _connectionId);
+        _messagesReceived = new StatisticsCounter("messages-received-" + _connectionId);
+        _dataReceived = new StatisticsCounter("data-received-" + _connectionId);
         _creationTime = System.currentTimeMillis();
     }
 
@@ -323,11 +319,6 @@ public class AMQProtocolEngine implement
         _sender = sender;
     }
 
-    public long getSessionID()
-    {
-        return _connectionID;
-    }
-
     public void setMaxFrameSize(int frameMax)
     {
         _maxFrameSize = frameMax;
@@ -368,7 +359,7 @@ public class AMQProtocolEngine implement
                                  + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
                                  + "ms to establish identity.  Closing as possible DoS.");
                     getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
                 _lastReceivedTime = arrivalTime;
                 _lastIoTime = arrivalTime;
@@ -382,37 +373,37 @@ public class AMQProtocolEngine implement
                 catch (ConnectionScopedRuntimeException e)
                 {
                     _logger.error("Unexpected exception", e);
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
                 catch (AMQProtocolVersionException e)
                 {
                     _logger.error("Unexpected protocol version", e);
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
                 catch (SenderClosedException e)
                 {
                     _logger.debug("Sender was closed abruptly, closing network.", e);
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
                 catch (SenderException e)
                 {
                     _logger.info("Unexpected exception on send, closing network.", e);
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
                 catch (TransportException e)
                 {
                     _logger.error("Unexpected transport exception", e);
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
                 catch (AMQFrameDecodingException e)
                 {
                     _logger.error("Frame decoding", e);
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
                 catch (IOException e)
                 {
                     _logger.error("I/O Exception", e);
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
                 catch (StoreException e)
                 {
@@ -484,7 +475,6 @@ public class AMQProtocolEngine implement
 
             ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
 
-            // This sets the protocol version (and hence framing classes) for this session.
             setProtocolVersion(pv);
 
             StringBuilder mechanismBuilder = new StringBuilder();
@@ -538,15 +528,8 @@ public class AMQProtocolEngine implement
     }
 
 
-    private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
-    private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
 
-    /**
-     * Convenience method that writes a frame to the protocol session. Equivalent to calling
-     * getProtocolSession().write().
-     *
-     * @param frame the frame to write
-     */
+
     public synchronized void writeFrame(AMQDataBlock frame)
     {
         if(_logger.isDebugEnabled())
@@ -730,12 +713,7 @@ public class AMQProtocolEngine implement
         sessionRemoved(session);
     }
 
-    /**
-     * Initialise heartbeats on the session.
-     *
-     * @param delay delay in seconds (not ms)
-     */
-    public void initHeartbeats(int delay)
+    private void initHeartbeats(int delay)
     {
         if (delay > 0)
         {
@@ -749,9 +727,6 @@ public class AMQProtocolEngine implement
         }
     }
 
-    /**
-     * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
-     */
     private void closeAllChannels()
     {
         try
@@ -792,7 +767,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void closeSession(final boolean connectionDropped)
+    private void closeConnectionInternal(final boolean connectionDropped)
     {
 
         if(runningAsSubject())
@@ -823,7 +798,7 @@ public class AMQProtocolEngine implement
                 @Override
                 public Object run()
                 {
-                    closeSession(connectionDropped);
+                    closeConnectionInternal(connectionDropped);
                     return null;
                 }
             });
@@ -921,7 +896,7 @@ public class AMQProtocolEngine implement
             try
             {
                 markChannelAwaitingCloseOk(channelId);
-                closeSession(false);
+                closeConnectionInternal(false);
             }
             finally
             {
@@ -931,7 +906,7 @@ public class AMQProtocolEngine implement
                 }
                 finally
                 {
-                    closeProtocolSession();
+                    closeNetworkConnection();
                 }
             }
         }
@@ -941,7 +916,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void closeProtocolSession()
+    public void closeNetworkConnection()
     {
         _network.close();
     }
@@ -951,19 +926,7 @@ public class AMQProtocolEngine implement
         return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
     }
 
-    /** @return an object that can be used to identity */
-    public Object getKey()
-    {
-        return getRemoteAddress();
-    }
-
-    /**
-     * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
-     * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
-     *
-     * @return a String FQDN
-     */
-    public String getLocalFQDN()
+    private String getLocalFQDN()
     {
         SocketAddress address = _network.getLocalAddress();
         if (address instanceof InetSocketAddress)
@@ -1149,11 +1112,11 @@ public class AMQProtocolEngine implement
         {
             try
             {
-                closeSession(true);
+                closeConnectionInternal(true);
             }
             finally
             {
-                closeProtocolSession();
+                closeNetworkConnection();
             }
         }
         catch (ConnectionScopedRuntimeException | TransportException e)
@@ -1311,7 +1274,7 @@ public class AMQProtocolEngine implement
 
     public long getConnectionId()
     {
-        return getSessionID();
+        return _connectionId;
     }
 
     public String getAddress()
@@ -1452,12 +1415,6 @@ public class AMQProtocolEngine implement
         _dataReceived.reset();
     }
 
-    public boolean isSessionNameUnique(byte[] name)
-    {
-        // 0-8/0-9/0-9-1 sessions don't have names
-        return true;
-    }
-
     public String getRemoteAddressString()
     {
         return String.valueOf(getRemoteAddress());
@@ -1640,19 +1597,20 @@ public class AMQProtocolEngine implement
         }
         try
         {
-            closeSession(false);
+            closeConnectionInternal(false);
+
+            MethodRegistry methodRegistry = getMethodRegistry();
+            ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+            writeFrame(responseBody.generateFrame(0));
         }
         catch (Exception e)
         {
-            _logger.error("Error closing protocol session: " + e, e);
+            _logger.error("Error closing connection for " + getRemoteAddressString(), e);
+        }
+        finally
+        {
+            closeNetworkConnection();
         }
-
-        MethodRegistry methodRegistry = getMethodRegistry();
-        ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
-        writeFrame(responseBody.generateFrame(0));
-
-        closeProtocolSession();
-
     }
 
     @Override
@@ -1667,12 +1625,17 @@ public class AMQProtocolEngine implement
 
         try
         {
-            closeSession(false);
+            closeConnectionInternal(false);
         }
         catch (Exception e)
         {
-            _logger.error("Error closing protocol session: " + e, e);
+            _logger.error("Error closing connection: " + getRemoteAddressString(), e);
         }
+        finally
+        {
+            closeNetworkConnection();
+        }
+
     }
 
     @Override
@@ -1692,7 +1655,7 @@ public class AMQProtocolEngine implement
         SaslServer ss = getSaslServer();
         if (ss == null)
         {
-            closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 );
+            closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in connection",0 );
         }
         MethodRegistry methodRegistry = getMethodRegistry();
         SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Mon Mar 16 16:52:32 2015
@@ -377,7 +377,7 @@ public abstract class ConsumerTarget_0_8
     {
         String subscriber = "[channel=" + _channel +
                             ", consumerTag=" + _consumerTag +
-                            ", session=" + getProtocolSession().getKey()  ;
+                            ", session=" + getConnection().getRemoteAddressString();
 
         return subscriber + "]";
     }
@@ -450,7 +450,7 @@ public abstract class ConsumerTarget_0_8
         return _consumerTag;
     }
 
-    public AMQProtocolEngine getProtocolSession()
+    private AMQProtocolEngine getConnection()
     {
         return _channel.getConnection();
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Mon Mar 16 16:52:32 2015
@@ -365,7 +365,7 @@ public class ProtocolOutputConverterImpl
             return _underlyingBody.writePayload(sender);
         }
 
-        public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+        public void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession)
             throws AMQException
         {
             throw new AMQException("This block should never be dispatched!");

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Mon Mar 16 16:52:32 2015
@@ -218,7 +218,7 @@ public class InternalTestProtocolSession
         }
     }
 
-    public void closeProtocolSession()
+    public void closeNetworkConnection()
     {
         // Override as we don't have a real IOSession to close.
         //  The alternative is to fully implement the TestIOSession to return a CloseFuture from close();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java Mon Mar 16 16:52:32 2015
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.test.utils.QpidTestCase;
 
-/** Test class to test MBean operations for AMQMinaProtocolSession. */
 public class MaxChannelsTest extends QpidTestCase
 {
     private AMQProtocolEngine _session;
@@ -62,7 +61,6 @@ public class MaxChannelsTest extends Qpi
         try
         {
             _session.getVirtualHost().close();
-            _session.closeSession(false);
         }
         finally
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Mon Mar 16 16:52:32 2015
@@ -324,12 +324,6 @@ public class Connection_1_0 implements C
     }
 
     @Override
-    public boolean isSessionNameUnique(byte[] name)
-    {
-        return true;  // TODO
-    }
-
-    @Override
     public String getRemoteAddressString()
     {
         return String.valueOf(_conn.getRemoteAddress());

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java Mon Mar 16 16:52:32 2015
@@ -212,7 +212,7 @@ public class HttpManagement extends Abst
                 if(port.getState() != State.ACTIVE)
                 {
 
-                    // TODO - RG
+                    // TODO - RG - probably does nothing
                     port.startAsync();
                 }
                 Connector connector = null;

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java Mon Mar 16 16:52:32 2015
@@ -49,6 +49,21 @@ public class InternalBrokerHolder implem
     @Override
     public void start(BrokerOptions options) throws Exception
     {
+        if (Thread.getDefaultUncaughtExceptionHandler() != null)
+        {
+            Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
+            {
+                @Override
+                public void uncaughtException(final Thread t, final Throwable e)
+                {
+                    System.err.print("Thread terminated due to uncaught exception");
+                    e.printStackTrace();
+
+                    LOGGER.error("Uncaught exception from thread " + t.getName(), e);
+                }
+            });
+        }
+
         LOGGER.info("Starting internal broker (same JVM)");
 
         _broker = new Broker(new Action<Integer>()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org