You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/27 23:13:27 UTC

svn commit: r1687962 [1/4] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ broker-codegen/src/main/java/org/apache/qpid/server/model/validation/ broker-core/src/main/java/org/apache/qpid/server/connection/ b...

Author: rgodfrey
Date: Sat Jun 27 21:13:25 2015
New Revision: 1687962

URL: http://svn.apache.org/r1687962
Log:
QPID-6612 : Refactor various connection related classes

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java   (with props)
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java   (with props)
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java   (with props)
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java   (with props)
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java   (with props)
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
    qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
    qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
    qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Sat Jun 27 21:13:25 2015
@@ -47,13 +47,14 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.VirtualHostAlias;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.virtualhost.*;
 
@@ -500,7 +501,7 @@ public class BDBHAReplicaVirtualHostImpl
     }
 
     @Override
-    public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+    public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
     {
         return false;
     }
@@ -547,13 +548,13 @@ public class BDBHAReplicaVirtualHostImpl
     }
 
     @Override
-    public void registerConnection(final Connection<?> connection)
+    public void registerConnection(final AMQPConnection<?> connection)
     {
         throwUnsupportedForReplica();
     }
 
     @Override
-    public void deregisterConnection(final Connection<?> connection)
+    public void deregisterConnection(final AMQPConnection<?> connection)
     {
         throwUnsupportedForReplica();
     }

Modified: qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java (original)
+++ qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java Sat Jun 27 21:13:25 2015
@@ -275,7 +275,7 @@ public class AttributeAnnotationValidato
                 return true;
             }
         }
-        if(typeElement.getKind()==ElementKind.ENUM)
+        if(typeElement != null && typeElement.getKind()==ElementKind.ENUM)
         {
             return true;
         }
@@ -290,11 +290,14 @@ public class AttributeAnnotationValidato
 
         final TypeElement managedAttributeTypeValueElement =
                 elementUtils.getTypeElement(ManagedAttributeValueTypeValidator.MANAGED_ATTRIBUTE_VALUE_TYPE_CLASS_NAME);
-        for(AnnotationMirror annotation : typeElement.getAnnotationMirrors())
+        if(typeElement != null)
         {
-            if(annotation.getAnnotationType().asElement().equals(managedAttributeTypeValueElement))
+            for (AnnotationMirror annotation : typeElement.getAnnotationMirrors())
             {
-                return true;
+                if (annotation.getAnnotationType().asElement().equals(managedAttributeTypeValueElement))
+                {
+                    return true;
+                }
             }
         }
         if(typeUtils.isSameType(type,elementUtils.getTypeElement("java.lang.Object").asType()))

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java Sat Jun 27 21:13:25 2015
@@ -23,14 +23,14 @@ package org.apache.qpid.server.connectio
 import java.net.SocketAddress;
 
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
 
 public class ConnectionPrincipal implements SocketConnectionPrincipal
 {
-    private final AMQConnectionModel _connection;
+    private final AMQPConnection<?> _connection;
 
-    public ConnectionPrincipal(final AMQConnectionModel connection)
+    public ConnectionPrincipal(final AMQPConnection<?> connection)
     {
         _connection = connection;
     }
@@ -44,10 +44,10 @@ public class ConnectionPrincipal impleme
     @Override
     public SocketAddress getRemoteAddress()
     {
-        return _connection.getRemoteAddress();
+        return _connection.getRemoteSocketAddress();
     }
 
-    public AMQConnectionModel getConnection()
+    public AMQPConnection<?> getConnection()
     {
         return _connection;
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java Sat Jun 27 21:13:25 2015
@@ -38,7 +38,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.plugin.ConnectionValidator;
 import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
 
 
 @PluggableService
@@ -71,7 +71,7 @@ public class ConnectionVersionValidator
     }
 
     @Override
-    public boolean validateConnectionCreation(final AMQConnectionModel<?, ?> connection,
+    public boolean validateConnectionCreation(final AMQPConnection<?> connection,
                                               final VirtualHost<?, ?, ?> virtualHost)
     {
         String connectionVersion = connection.getClientVersion();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Sat Jun 27 21:13:25 2015
@@ -88,7 +88,7 @@ public abstract class AbstractConsumerTa
     @Override
     public final boolean isSuspended()
     {
-        return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended();
+        return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || doIsSuspended();
     }
 
     protected abstract boolean doIsSuspended();
@@ -185,7 +185,7 @@ public abstract class AbstractConsumerTa
     public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
         _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
-        getSessionModel().getConnectionModel().notifyWork();
+        getSessionModel().getAMQPConnection().notifyWork();
         return entry.getMessage().getSize();
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java Sat Jun 27 21:13:25 2015
@@ -24,11 +24,11 @@ package org.apache.qpid.server.logging;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.logging.subjects.LogSubjectFormat;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal;
 import org.apache.qpid.server.security.auth.TaskPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
 
 import javax.security.auth.Subject;
 import java.security.AccessController;
@@ -145,7 +145,7 @@ public abstract class AbstractMessageLog
         return "["+taskPrincipal.getName()+"] ";
     }
 
-    protected String generateConnectionMessage(final AMQConnectionModel connection)
+    protected String generateConnectionMessage(final AMQPConnection<?> connection)
     {
         if (connection.getAuthorizedPrincipal() != null)
         {
@@ -190,7 +190,7 @@ public abstract class AbstractMessageLog
 
     protected String generateSessionMessage(final AMQSessionModel session)
     {
-        AMQConnectionModel connection = session.getConnectionModel();
+        AMQPConnection<?> connection = session.getAMQPConnection();
         return "[" + MessageFormat.format(CHANNEL_FORMAT, connection == null ? -1L : connection.getConnectionId(),
                                           (connection == null || connection.getAuthorizedPrincipal() == null)
                                                   ? "?"

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Sat Jun 27 21:13:25 2015
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.server.logging.subjects;
 
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 
@@ -43,7 +43,7 @@ public class ChannelLogSubject extends A
          * 3 - Virtualhost
          * 4 - Channel ID
          */
-        AMQConnectionModel connection = session.getConnectionModel();
+        AMQPConnection connection = session.getAMQPConnection();
         setLogStringWithFormat(CHANNEL_FORMAT,
                                connection == null ? -1L : connection.getConnectionId(),
                                (connection == null || connection.getAuthorizedPrincipal() == null) ? "?" : connection.getAuthorizedPrincipal().getName(),

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java Sat Jun 27 21:13:25 2015
@@ -21,7 +21,8 @@
 package org.apache.qpid.server.logging.subjects;
 
 import java.text.MessageFormat;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+
+import org.apache.qpid.server.transport.AMQPConnection;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -32,11 +33,11 @@ public class ConnectionLogSubject extend
 {
 
     // The Session this Actor is representing
-    private AMQConnectionModel _session;
+    private AMQPConnection<?> _connection;
 
-    public ConnectionLogSubject(AMQConnectionModel session)
+    public ConnectionLogSubject(AMQPConnection<?> connection)
     {
-        _session = session;
+        _connection = connection;
     }
 
     // Used to stop re-creating the _logString when we reach our final format
@@ -55,9 +56,9 @@ public class ConnectionLogSubject extend
     {
         if (!_upToDate)
         {
-            if (_session.getAuthorizedPrincipal() != null)
+            if (_connection.getAuthorizedPrincipal() != null)
             {
-                if (_session.getVirtualHostName() != null)
+                if (_connection.getVirtualHostName() != null)
                 {
                     /**
                      * LOG FORMAT used by the AMQPConnectorActor follows
@@ -70,10 +71,10 @@ public class ConnectionLogSubject extend
                      * 0 - Connection ID 1 - User ID 2 - IP 3 - Virtualhost
                      */
                     setLogString("[" + MessageFormat.format(CONNECTION_FORMAT,
-                                                            _session.getConnectionId(),
-                                                            _session.getAuthorizedPrincipal().getName(),
-                                                            _session.getRemoteAddressString(),
-                                                            _session.getVirtualHostName())
+                                                            _connection.getConnectionId(),
+                                                            _connection.getAuthorizedPrincipal().getName(),
+                                                            _connection.getRemoteAddressString(),
+                                                            _connection.getVirtualHostName())
                                  + "] ");
 
                     _upToDate = true;
@@ -81,9 +82,9 @@ public class ConnectionLogSubject extend
                 else
                 {
                     setLogString("[" + MessageFormat.format(USER_FORMAT,
-                                                            _session.getConnectionId(),
-                                                            _session.getAuthorizedPrincipal().getName(),
-                                                            _session.getRemoteAddressString())
+                                                            _connection.getConnectionId(),
+                                                            _connection.getAuthorizedPrincipal().getName(),
+                                                            _connection.getRemoteAddressString())
                                  + "] ");
 
                 }
@@ -91,8 +92,8 @@ public class ConnectionLogSubject extend
             else
             {
                     setLogString("[" + MessageFormat.format(SOCKET_FORMAT,
-                                                            _session.getConnectionId(),
-                                                            _session.getRemoteAddressString())
+                                                            _connection.getConnectionId(),
+                                                            _connection.getRemoteAddressString())
                                  + "] ");
             }
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Sat Jun 27 21:13:25 2015
@@ -43,7 +43,7 @@ public interface MessageSource extends T
 
     void removeConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener);
 
-    boolean verifySessionAccess(AMQSessionModel<?,?> session);
+    boolean verifySessionAccess(AMQSessionModel<?> session);
 
     interface ConsumerRegistrationListener<Q extends MessageSource>
     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Sat Jun 27 21:13:25 2015
@@ -22,7 +22,7 @@ package org.apache.qpid.server.model;
 
 import java.util.Collection;
 
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
 
 @ManagedObject( creatable = false )
 public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
@@ -85,7 +85,7 @@ public interface Connection<X extends Co
     VirtualHost getVirtualHost();
 
     @DerivedAttribute
-    Port getPort();
+    Port<?> getPort();
 
     @ManagedStatistic
     long getBytesIn();
@@ -109,5 +109,5 @@ public interface Connection<X extends Co
     Collection<Session> getSessions();
 
 
-    AMQConnectionModel<?,?> getUnderlyingConnection();
+    AbstractAMQPConnection<?> getUnderlyingConnection();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Sat Jun 27 21:13:25 2015
@@ -30,6 +30,7 @@ import java.util.UUID;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostConnectionListener;
 
 @ManagedObject( defaultType = "ProvidedStore", description = VirtualHost.CLASS_DESCRIPTION)
@@ -166,7 +167,7 @@ public interface VirtualHost<X extends V
     Collection<String> getExchangeTypeNames();
 
     @ManagedOperation(nonModifying = true)
-    Collection<Connection<?>> getConnections();
+    Collection<? extends Connection<?>> getConnections();
 
     @ManagedOperation(nonModifying = true)
     Connection<?> getConnection(@Param(name="name") String name);
@@ -179,8 +180,8 @@ public interface VirtualHost<X extends V
 
     Principal getPrincipal();
 
-    void registerConnection(Connection<?> connection);
-    void deregisterConnection(Connection<?> connection);
+    void registerConnection(AMQPConnection<?> connection);
+    void deregisterConnection(AMQPConnection<?> connection);
 
     public static interface Transaction
     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Sat Jun 27 21:13:25 2015
@@ -25,7 +25,6 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Sat Jun 27 21:13:25 2015
@@ -39,14 +39,15 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.StateTransition;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.util.Action;
 
-final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
+public final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
 {
     // Attributes
     private final AMQSessionModel _session;
 
-    public SessionAdapter(final ConnectionAdapter connectionAdapter,
+    public SessionAdapter(final AbstractAMQPConnection<?> connectionAdapter,
                           final AMQSessionModel session)
     {
         super(parentsMap(connectionAdapter), createAttributes(session));

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java Sat Jun 27 21:13:25 2015
@@ -21,9 +21,9 @@
 package org.apache.qpid.server.plugin;
 
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
 
 public interface ConnectionValidator extends Pluggable
 {
-    boolean validateConnectionCreation(AMQConnectionModel<?, ?> connection, final VirtualHost<?, ?, ?> virtualHost);
+    boolean validateConnectionCreation(AMQPConnection<?> connection, final VirtualHost<?, ?, ?> virtualHost);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Sat Jun 27 21:13:25 2015
@@ -29,6 +29,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.transport.network.Ticker;
 
@@ -37,11 +38,11 @@ import org.apache.qpid.transport.network
  * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
  * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}.
  */
-public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<AMQSessionModel>, Deletable<T>
+public interface AMQSessionModel<T extends AMQSessionModel<T>> extends Comparable<AMQSessionModel>, Deletable<T>
 {
     public UUID getId();
 
-    public C getConnectionModel();
+    public AMQPConnection<?> getAMQPConnection();
 
     public String getClientID();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sat Jun 27 21:13:25 2015
@@ -84,7 +84,6 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.TypedContent;
 import org.apache.qpid.server.plugin.MessageFilterFactory;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.access.Operation;
@@ -93,6 +92,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -342,7 +342,7 @@ public abstract class AbstractQueue<X ex
 
         Subject activeSubject = Subject.getSubject(AccessController.getContext());
         Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
-        AMQSessionModel<?,?> sessionModel;
+        AMQSessionModel<?> sessionModel;
         if(sessionPrincipals.isEmpty())
         {
             sessionModel = null;
@@ -360,14 +360,14 @@ public abstract class AbstractQueue<X ex
             {
 
                 case PRINCIPAL:
-                    _exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal();
+                    _exclusiveOwner = sessionModel.getAMQPConnection().getAuthorizedPrincipal();
                     break;
                 case CONTAINER:
-                    _exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName();
+                    _exclusiveOwner = sessionModel.getAMQPConnection().getRemoteContainerName();
                     break;
                 case CONNECTION:
-                    _exclusiveOwner = sessionModel.getConnectionModel();
-                    addExclusivityConstraint(sessionModel.getConnectionModel());
+                    _exclusiveOwner = sessionModel.getAMQPConnection();
+                    addExclusivityConstraint(sessionModel.getAMQPConnection());
                     break;
                 case SESSION:
                     _exclusiveOwner = sessionModel;
@@ -405,7 +405,7 @@ public abstract class AbstractQueue<X ex
         {
             if(sessionModel != null)
             {
-                addLifetimeConstraint(sessionModel.getConnectionModel());
+                addLifetimeConstraint(sessionModel.getAMQPConnection());
             }
             else
             {
@@ -717,12 +717,12 @@ public abstract class AbstractQueue<X ex
             case CONNECTION:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSessionModel().getConnectionModel();
-                    addExclusivityConstraint(target.getSessionModel().getConnectionModel());
+                    exclusiveOwner = target.getSessionModel().getAMQPConnection();
+                    addExclusivityConstraint(target.getSessionModel().getAMQPConnection());
                 }
                 else
                 {
-                    if(exclusiveOwner != target.getSessionModel().getConnectionModel())
+                    if(exclusiveOwner != target.getSessionModel().getAMQPConnection())
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -751,11 +751,11 @@ public abstract class AbstractQueue<X ex
             case PRINCIPAL:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+                    exclusiveOwner = target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
                 }
                 else
                 {
-                    if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+                    if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal()))
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -764,11 +764,11 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
+                    exclusiveOwner = target.getSessionModel().getAMQPConnection().getRemoteContainerName();
                 }
                 else
                 {
-                    if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
+                    if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getRemoteContainerName()))
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -2718,7 +2718,7 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+    public boolean verifySessionAccess(final AMQSessionModel<?> session)
     {
         boolean allowed;
         switch(_exclusive)
@@ -2730,13 +2730,13 @@ public abstract class AbstractQueue<X ex
                 allowed = _exclusiveOwner == null || _exclusiveOwner == session;
                 break;
             case CONNECTION:
-                allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel();
+                allowed = _exclusiveOwner == null || _exclusiveOwner == session.getAMQPConnection();
                 break;
             case PRINCIPAL:
-                allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal());
+                allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getAMQPConnection().getAuthorizedPrincipal());
                 break;
             case CONTAINER:
-                allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName());
+                allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getAMQPConnection().getRemoteContainerName());
                 break;
             case LINK:
                 allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
@@ -2822,7 +2822,7 @@ public abstract class AbstractQueue<X ex
                 _exclusiveOwner = session;
                 break;
             case LINK:
-                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection();
         }
     }
 
@@ -2833,14 +2833,14 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case CONTAINER:
             case PRINCIPAL:
-                AMQConnectionModel con = null;
+                AMQPConnection con = null;
                 for(ConsumerImpl c : getConsumers())
                 {
                     if(con == null)
                     {
-                        con = c.getSessionModel().getConnectionModel();
+                        con = c.getSessionModel().getAMQPConnection();
                     }
-                    else if(!con.equals(c.getSessionModel().getConnectionModel()))
+                    else if(!con.equals(c.getSessionModel().getAMQPConnection()))
                     {
                         throw new ExistingConsumerPreventsExclusive();
                     }
@@ -2848,10 +2848,10 @@ public abstract class AbstractQueue<X ex
                 _exclusiveOwner = con;
                 break;
             case SESSION:
-                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel();
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getAMQPConnection();
                 break;
             case LINK:
-                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection();
         }
     }
 
@@ -2866,9 +2866,9 @@ public abstract class AbstractQueue<X ex
                 {
                     if(containerID == null)
                     {
-                        containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName();
+                        containerID = c.getSessionModel().getAMQPConnection().getRemoteContainerName();
                     }
-                    else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName()))
+                    else if(!containerID.equals(c.getSessionModel().getAMQPConnection().getRemoteContainerName()))
                     {
                         throw new ExistingConsumerPreventsExclusive();
                     }
@@ -2876,13 +2876,13 @@ public abstract class AbstractQueue<X ex
                 _exclusiveOwner = containerID;
                 break;
             case CONNECTION:
-                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName();
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPConnection)_exclusiveOwner).getRemoteContainerName();
                 break;
             case SESSION:
-                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName();
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getAMQPConnection().getRemoteContainerName();
                 break;
             case LINK:
-                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName();
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection().getRemoteContainerName();
         }
     }
 
@@ -2897,9 +2897,9 @@ public abstract class AbstractQueue<X ex
                 {
                     if(principal == null)
                     {
-                        principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+                        principal = c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
                     }
-                    else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+                    else if(!principal.equals(c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal()))
                     {
                         throw new ExistingConsumerPreventsExclusive();
                     }
@@ -2907,13 +2907,13 @@ public abstract class AbstractQueue<X ex
                 _exclusiveOwner = principal;
                 break;
             case CONNECTION:
-                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal();
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPConnection)_exclusiveOwner).getAuthorizedPrincipal();
                 break;
             case SESSION:
-                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal();
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getAMQPConnection().getAuthorizedPrincipal();
                 break;
             case LINK:
-                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
         }
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java Sat Jun 27 21:13:25 2015
@@ -62,7 +62,6 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.model.VirtualHostLogger;
 import org.apache.qpid.server.model.VirtualHostLoggerFilter;
 import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.security.access.ObjectProperties;
 import org.apache.qpid.server.security.access.ObjectProperties.Property;
@@ -71,6 +70,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.access.OperationLoggingDetails;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.TaskPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
 
 public class SecurityManager
 {
@@ -251,7 +251,7 @@ public class SecurityManager
         }
     }
 
-    public void authoriseCreateConnection(final AMQConnectionModel connection)
+    public void authoriseCreateConnection(final AMQPConnection<?> connection)
     {
         String virtualHostName = connection.getVirtualHostName();
         ObjectProperties properties = new ObjectProperties(virtualHostName);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java Sat Jun 27 21:13:25 2015
@@ -20,10 +20,6 @@
  */
 package org.apache.qpid.server.security;
 
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.qpid.server.model.DerivedAttribute;
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedObject;

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1687962&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Sat Jun 27 21:13:25 2015
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.net.SocketAddress;
+import java.security.Principal;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.Deletable;
+
+public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>
+{
+    boolean isMessageAssignmentSuspended();
+
+    long getConnectionId();
+
+    Principal getAuthorizedPrincipal();
+
+    String getRemoteAddressString();
+
+    String getVirtualHostName();
+
+    void notifyWork();
+
+    String getRemoteContainerName();
+
+    boolean isConnectionStopped();
+
+    void registerMessageReceived(long size, long arrivalTime);
+
+    void registerMessageDelivered(long size);
+
+    void closeSessionAsync(AMQSessionModel<?> session, AMQConstant cause, String message);
+
+    SocketAddress getRemoteSocketAddress();
+
+    void block();
+
+    void unblock();
+
+    void setScheduler(NetworkConnectionScheduler networkConnectionScheduler);
+
+    boolean hasSessionWithName(byte[] name);
+
+    void closeAsync(AMQConstant connectionForced, String reason);
+
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1687962&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Sat Jun 27 21:13:25 2015
@@ -0,0 +1,517 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.net.SocketAddress;
+import java.security.AccessController;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.Subject;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.adapter.SessionAdapter;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C>>
+        extends AbstractConfiguredObject<C>
+        implements ProtocolEngine, AMQPConnection<C>
+
+{
+    private final Broker<?> _broker;
+    private final NetworkConnection _network;
+    private final AmqpPort<?> _port;
+    private final Transport _transport;
+    private final long _connectionId;
+    private final AggregateTicker _aggregateTicker;
+    private final Subject _subject = new Subject();
+    private final List<Action<? super C>> _connectionCloseTaskList =
+            new CopyOnWriteArrayList<>();
+
+    private final Action<? super AMQPConnection<C>> _underlyingConnectionDeleteTask;
+
+    private String _clientProduct;
+    private String _clientVersion;
+    private String _remoteProcessPid;
+    private String _clientId;
+
+    private volatile boolean _stopped;
+    private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final AtomicBoolean _underlyingClosed = new AtomicBoolean();
+
+
+    public AbstractAMQPConnection(Broker<?> broker,
+                                  NetworkConnection network,
+                                  AmqpPort<?> port,
+                                  Transport transport,
+                                  long connectionId,
+                                  final AggregateTicker aggregateTicker)
+    {
+        super(parentsMap(port),createAttributes(connectionId, network));
+
+        _broker = broker;
+        _network = network;
+        _port = port;
+        _transport = transport;
+        _connectionId = connectionId;
+        _aggregateTicker = aggregateTicker;
+        _subject.getPrincipals().add(new ConnectionPrincipal(this));
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+        _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+        _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+        _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+
+
+        // Used to allow the protocol layers to tell the model they have been deleted
+        _underlyingConnectionDeleteTask = new Action<AMQPConnection<?>>()
+        {
+            @Override
+            public void performAction(final AMQPConnection<?> object)
+            {
+                removeDeleteTask(this);
+                _underlyingClosed.set(true);
+                deleteAsync();
+            }
+        };
+        addDeleteTask(_underlyingConnectionDeleteTask);
+
+        setState(State.ACTIVE);
+
+    }
+
+    private static Map<String, Object> createAttributes(long connectionId, NetworkConnection network)
+    {
+        Map<String,Object> attributes = new HashMap<>();
+        attributes.put(ID, UUID.randomUUID());
+        attributes.put(NAME, "[" + connectionId + "] " + String.valueOf(network.getRemoteAddress()).replaceAll("/", ""));
+        attributes.put(DURABLE, false);
+        return attributes;
+    }
+
+    public final Broker<?> getBroker()
+    {
+        return _broker;
+    }
+
+    public final NetworkConnection getNetwork()
+    {
+        return _network;
+    }
+
+    public final AmqpPort<?> getPort()
+    {
+        return _port;
+    }
+
+    public final Transport getTransport()
+    {
+        return _transport;
+    }
+
+    @Override
+    public final AggregateTicker getAggregateTicker()
+    {
+        return _aggregateTicker;
+    }
+
+
+    public long getLastIoTime()
+    {
+        return Math.max(getLastReadTime(), getLastWriteTime());
+    }
+
+    public final long getConnectionId()
+    {
+        return _connectionId;
+    }
+
+    public final StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+
+    public String getRemoteAddressString()
+    {
+        return String.valueOf(_network.getRemoteAddress());
+    }
+
+    public final void stopConnection()
+    {
+        _stopped = true;
+    }
+
+    public final ProtocolEngine getProtocolEngine()
+    {
+        return this;
+    }
+
+    public boolean isConnectionStopped()
+    {
+        return _stopped;
+    }
+
+    public final String getVirtualHostName()
+    {
+        return getVirtualHost() == null ? null : getVirtualHost().getName();
+    }
+
+    public String getClientVersion()
+    {
+        return _clientVersion;
+    }
+
+    public String getRemoteProcessPid()
+    {
+        return _remoteProcessPid;
+    }
+
+    public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+    {
+        ((NonBlockingConnection)_network).changeScheduler(networkConnectionScheduler);
+    }
+
+    public String getClientProduct()
+    {
+        return _clientProduct;
+    }
+
+    public void addDeleteTask(final Action<? super C> task)
+    {
+        _connectionCloseTaskList.add(task);
+    }
+
+    public void removeDeleteTask(final Action<? super C> task)
+    {
+        _connectionCloseTaskList.remove(task);
+    }
+
+
+    protected void performDeleteTasks()
+    {
+        if(runningAsSubject())
+        {
+            for (Action<? super C> task : _connectionCloseTaskList)
+            {
+                task.performAction((C)this);
+            }
+        }
+        else
+        {
+            runAsSubject(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    performDeleteTasks();
+                    return null;
+                }
+            });
+        }
+    }
+
+    public String getClientId()
+    {
+        return _clientId;
+    }
+
+    public final StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+
+    public final StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+
+    public final SocketAddress getRemoteSocketAddress()
+    {
+        return _network.getRemoteAddress();
+    }
+
+    public void registerMessageDelivered(long messageSize)
+    {
+        _messagesDelivered.registerEvent(1L);
+        _dataDelivered.registerEvent(messageSize);
+        ((VirtualHostImpl<?,?,?>)getVirtualHost()).registerMessageDelivered(messageSize);
+    }
+
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        _messagesReceived.registerEvent(1L, timestamp);
+        _dataReceived.registerEvent(messageSize, timestamp);
+        ((VirtualHostImpl<?,?,?>)getVirtualHost()).registerMessageReceived(messageSize, timestamp);
+    }
+
+    public final void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+    }
+
+    public final StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+
+    public void setClientProduct(final String clientProduct)
+    {
+        _clientProduct = clientProduct;
+    }
+
+    public void setClientVersion(final String clientVersion)
+    {
+        _clientVersion = clientVersion;
+    }
+
+    public void setRemoteProcessPid(final String remoteProcessPid)
+    {
+        _remoteProcessPid = remoteProcessPid;
+    }
+
+    public void setClientId(final String clientId)
+    {
+        _clientId = clientId;
+    }
+
+    private <T> T runAsSubject(PrivilegedAction<T> action)
+    {
+        return Subject.doAs(_subject, action);
+    }
+
+    private boolean runningAsSubject()
+    {
+        return _subject.equals(Subject.getSubject(AccessController.getContext()));
+    }
+
+    @Override
+    public Subject getSubject()
+    {
+        return _subject;
+    }
+
+    public void sessionAdded(final AMQSessionModel<?> session)
+    {
+        SessionAdapter adapter = new SessionAdapter(this, session);
+        adapter.create();
+        childAdded(adapter);
+
+    }
+
+    public void sessionRemoved(final AMQSessionModel<?> session)
+    {
+    }
+
+    public void virtualHostAssociated()
+    {
+        getVirtualHost().registerConnection(this);
+    }
+
+
+
+    @Override
+    public boolean isIncoming()
+    {
+        return true;
+    }
+
+    @Override
+    public String getLocalAddress()
+    {
+        return null;
+    }
+
+    @Override
+    public String getPrincipal()
+    {
+        final Principal authorizedPrincipal = getAuthorizedPrincipal();
+        return authorizedPrincipal == null ? null : authorizedPrincipal.getName();
+    }
+
+    @Override
+    public String getRemoteAddress()
+    {
+        return getRemoteAddressString();
+    }
+
+    @Override
+    public String getRemoteProcessName()
+    {
+        return null;
+    }
+
+    public Collection<Session> getSessions()
+    {
+        return getChildren(Session.class);
+    }
+
+    @SuppressWarnings("unused")
+    @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
+    private ListenableFuture<Void> doDelete()
+    {
+        if (_underlyingClosed.get())
+        {
+            deleted();
+            return Futures.immediateFuture(null);
+        }
+        else
+        {
+            final SettableFuture<Void> returnVal = SettableFuture.create();
+            asyncCloseUnderlying().addListener(
+                    new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                deleted();
+                                setState(State.DELETED);
+                            }
+                            finally
+                            {
+                                returnVal.set(null);
+                            }
+                        }
+                    }, getTaskExecutor().getExecutor()
+                                              );
+            return returnVal;
+        }
+    }
+
+    @Override
+    protected ListenableFuture<Void> beforeClose()
+    {
+        if (_underlyingClosed.get())
+        {
+            return Futures.immediateFuture(null);
+        }
+        else
+        {
+
+            return asyncCloseUnderlying();
+        }
+
+    }
+
+    private ListenableFuture<Void> asyncCloseUnderlying()
+    {
+        final SettableFuture<Void> closeFuture = SettableFuture.create();
+        addDeleteTask(new Action<AMQPConnection<?>>()
+        {
+            @Override
+            public void performAction(final AMQPConnection<?> object)
+            {
+                closeFuture.set(null);
+            }
+        });
+        removeDeleteTask(_underlyingConnectionDeleteTask);
+
+        closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+        return closeFuture;
+    }
+
+    @Override
+    protected void onClose()
+    {
+    }
+
+    @Override
+    public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+    {
+        if(childClass == Session.class)
+        {
+            throw new IllegalStateException();
+        }
+        else
+        {
+            throw new IllegalArgumentException("Cannot create a child of class " + childClass.getSimpleName());
+        }
+
+    }
+
+    @Override
+    public long getBytesIn()
+    {
+        return getDataReceiptStatistics().getTotal();
+    }
+
+    @Override
+    public long getBytesOut()
+    {
+        return getDataDeliveryStatistics().getTotal();
+    }
+
+    @Override
+    public long getMessagesIn()
+    {
+        return getMessageReceiptStatistics().getTotal();
+    }
+
+    @Override
+    public long getMessagesOut()
+    {
+        return getMessageDeliveryStatistics().getTotal();
+    }
+
+    public abstract List<? extends AMQSessionModel<?>> getSessionModels();
+
+    @Override
+    public int getSessionCount()
+    {
+        return getSessionModels().size();
+    }
+
+    @Override
+    public AbstractAMQPConnection<?> getUnderlyingConnection()
+    {
+        return this;
+    }
+
+
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Sat Jun 27 21:13:25 2015
@@ -104,26 +104,6 @@ public class MultiVersionProtocolEngine
         return _delegate.isMessageAssignmentSuspended();
     }
 
-    public SocketAddress getRemoteAddress()
-    {
-        return _delegate.getRemoteAddress();
-    }
-
-    public SocketAddress getLocalAddress()
-    {
-        return _delegate.getLocalAddress();
-    }
-
-    public long getWrittenBytes()
-    {
-        return _delegate.getWrittenBytes();
-    }
-
-    public long getReadBytes()
-    {
-        return _delegate.getReadBytes();
-    }
-
     public void closed()
     {
         _logger.debug("Closed");
@@ -165,7 +145,7 @@ public class MultiVersionProtocolEngine
 
     public long getConnectionId()
     {
-        return _delegate.getConnectionId();
+        return _id;
     }
 
     @Override
@@ -188,7 +168,7 @@ public class MultiVersionProtocolEngine
 
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
-    public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
+    public void setNetworkConnection(NetworkConnection network)
     {
         _network = network;
         SocketAddress address = _network.getLocalAddress();
@@ -200,7 +180,7 @@ public class MultiVersionProtocolEngine
         {
             throw new IllegalArgumentException("Unsupported socket address class: " + address);
         }
-        _sender = sender;
+        _sender = network.getSender();
     }
 
     @Override
@@ -297,26 +277,6 @@ public class MultiVersionProtocolEngine
 
         }
 
-        public SocketAddress getRemoteAddress()
-        {
-            return _network.getRemoteAddress();
-        }
-
-        public SocketAddress getLocalAddress()
-        {
-            return _network.getLocalAddress();
-        }
-
-        public long getWrittenBytes()
-        {
-            return 0;
-        }
-
-        public long getReadBytes()
-        {
-            return 0;
-        }
-
         public void received(ByteBuffer msg)
         {
             _logger.error("Error processing incoming data, could not negotiate a common protocol");
@@ -343,11 +303,6 @@ public class MultiVersionProtocolEngine
 
         }
 
-        public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
-        {
-
-        }
-
         @Override
         public long getLastReadTime()
         {
@@ -360,11 +315,6 @@ public class MultiVersionProtocolEngine
             return 0;
         }
 
-        public long getConnectionId()
-        {
-            return _id;
-        }
-
         @Override
         public Subject getSubject()
         {
@@ -396,26 +346,6 @@ public class MultiVersionProtocolEngine
         private long _lastReadTime = System.currentTimeMillis();
         private final AtomicBoolean _hasWork = new AtomicBoolean();
 
-        public SocketAddress getRemoteAddress()
-        {
-            return _network.getRemoteAddress();
-        }
-
-        public SocketAddress getLocalAddress()
-        {
-            return _network.getLocalAddress();
-        }
-
-        public long getWrittenBytes()
-        {
-            return 0;
-        }
-
-        public long getReadBytes()
-        {
-            return 0;
-        }
-
         @Override
         public void setMessageAssignmentSuspended(final boolean value)
         {
@@ -585,11 +515,6 @@ public class MultiVersionProtocolEngine
         }
 
 
-        public long getConnectionId()
-        {
-            return _id;
-        }
-
         @Override
         public Subject getSubject()
         {
@@ -614,7 +539,7 @@ public class MultiVersionProtocolEngine
                 _delegate = new ClosedDelegateProtocolEngine();
                 if(_logger.isDebugEnabled())
                 {
-                    _logger.debug("Connection from  " + getRemoteAddress() + " was closed before any protocol version was established.");
+                    _logger.debug("Connection from  " + _network.getRemoteAddress() + " was closed before any protocol version was established.");
                 }
             }
             catch(Exception e)
@@ -654,11 +579,6 @@ public class MultiVersionProtocolEngine
             }
         }
 
-        public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
-        {
-
-        }
-
         @Override
         public long getLastReadTime()
         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Sat Jun 27 21:13:25 2015
@@ -50,7 +50,7 @@ public class NonBlockingNetworkTransport
     private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
                                                                     CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
     private final Set<TransportEncryption> _encryptionSet;
-    private final ProtocolEngineFactory _factory;
+    private final MultiVersionProtocolEngineFactory _factory;
     private final ServerSocketChannel _serverSocket;
     private final int _timeout;
     private final NetworkConnectionScheduler _scheduler;
@@ -133,7 +133,7 @@ public class NonBlockingNetworkTransport
         {
             socketChannel = serverSocketChannel.accept();
 
-            final ProtocolEngine engine =
+            final MultiVersionProtocolEngine engine =
                     _factory.newProtocolEngine(socketChannel.socket().getRemoteSocketAddress());
 
             if(engine != null)
@@ -171,7 +171,7 @@ public class NonBlockingNetworkTransport
                                                   _scheduler,
                                                   _port);
 
-                engine.setNetworkConnection(connection, connection.getSender());
+                engine.setNetworkConnection(connection);
                 connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
 
                 idleTimeoutTicker.setConnection(connection);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Sat Jun 27 21:13:25 2015
@@ -20,15 +20,11 @@
  */
 package org.apache.qpid.server.transport;
 
-import java.net.SocketAddress;
-
 import javax.security.auth.Subject;
 
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferReceiver;
-import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.TransportActivity;
 
 /**
@@ -37,17 +33,6 @@ import org.apache.qpid.transport.network
  */
 public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
 {
-   // Returns the remote address of the NetworkDriver
-   SocketAddress getRemoteAddress();
-
-   // Returns the local address of the NetworkDriver
-   SocketAddress getLocalAddress();
-
-   // Returns number of bytes written
-   long getWrittenBytes();
-
-   // Returns number of bytes read
-   long getReadBytes();
 
    // Called by the NetworkDriver when the socket has been closed for reading
    void closed();
@@ -59,13 +44,6 @@ public interface ProtocolEngine extends
    // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
    void readerIdle();
 
-   void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
-
-   /**
-    * Gets the connection ID associated with this ProtocolEngine
-    */
-   long getConnectionId();
-
    Subject getSubject();
 
    boolean isTransportBlockedForWriting();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Sat Jun 27 21:13:25 2015
@@ -124,7 +124,7 @@ public abstract class AbstractSystemMess
     }
 
     @Override
-    public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+    public boolean verifySessionAccess(final AMQSessionModel<?> session)
     {
         return true;
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Sat Jun 27 21:13:25 2015
@@ -71,7 +71,6 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.plugin.ConnectionValidator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.plugin.SystemNodeCreator;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -89,6 +88,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.MessageStoreProvider;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.LocalTransaction;
@@ -103,7 +103,7 @@ public abstract class AbstractVirtualHos
     private final Collection<ConnectionValidator> _connectionValidators = new ArrayList<>();
 
 
-    private final Set<Connection<?>> _connections = newSetFromMap(new ConcurrentHashMap<Connection<?>, Boolean>());
+    private final Set<AMQPConnection<?>> _connections = newSetFromMap(new ConcurrentHashMap<AMQPConnection<?>, Boolean>());
     private final Set<VirtualHostConnectionListener> _connectionAssociationListeners = new CopyOnWriteArraySet<>();
 
     private static enum BlockingType { STORE, FILESYSTEM };
@@ -461,7 +461,7 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    public Collection<Connection<?>> getConnections()
+    public Collection<AMQPConnection<?>> getConnections()
     {
         return _connections;
     }
@@ -530,7 +530,7 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+    public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
     {
         getSecurityManager().authoriseCreateConnection(connection);
         for(ConnectionValidator validator : _connectionValidators)
@@ -894,13 +894,13 @@ public abstract class AbstractVirtualHos
         }
         for(Connection conn : _connections)
         {
-            conn.getUnderlyingConnection().stop();
+            conn.getUnderlyingConnection().stopConnection();
         }
 
         List<ListenableFuture<Void>> connectionCloseFutures = new ArrayList<>();
         while (!_connections.isEmpty())
         {
-            Iterator<Connection<?>> itr = _connections.iterator();
+            Iterator<AMQPConnection<?>> itr = _connections.iterator();
             while(itr.hasNext())
             {
                 Connection<?> connection = itr.next();
@@ -1148,7 +1148,7 @@ public abstract class AbstractVirtualHos
                 {
                     _logger.debug("Checking for long running open transactions on connection " + connection);
                 }
-                for (AMQSessionModel<?,?> session : connection.getUnderlyingConnection().getSessionModels())
+                for (AMQSessionModel<?> session : connection.getUnderlyingConnection().getSessionModels())
                 {
                     if (_logger.isDebugEnabled())
                     {
@@ -1687,34 +1687,60 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    public void registerConnection(final Connection<?> connection)
+    public void registerConnection(final AMQPConnection<?> connection)
     {
-        _connections.add(connection);
+        doSync(registerConnectionAsync(connection));
+    }
 
-        AMQConnectionModel<?,?> underlyingConnection = connection.getUnderlyingConnection();
-        if(_blocked.get())
+    public ListenableFuture<Void> registerConnectionAsync(final AMQPConnection<?> connection)
+    {
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
         {
-            underlyingConnection.block();
-        }
+            @Override
+            public ListenableFuture<Void> call() throws Exception
+            {
+                _connections.add(connection);
 
-        underlyingConnection.setScheduler(_networkConnectionScheduler);
+                if(_blocked.get())
+                {
+                    connection.block();
+                }
+
+                connection.setScheduler(_networkConnectionScheduler);
+
+                for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
+                {
+                    listener.connectionAssociated(connection);
+                }
+                return Futures.immediateFuture(null);
+            }
+        });
 
-        for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
-        {
-            listener.connectionAssociated(connection);
-        }
     }
 
     @Override
-    public void deregisterConnection(final Connection<?> connection)
+    public void deregisterConnection(final AMQPConnection<?> connection)
+    {
+        doSync(deregisterConnectionAsync(connection));
+    }
+
+    public ListenableFuture<Void> deregisterConnectionAsync(final AMQPConnection<?> connection)
     {
-        if (_connections.remove(connection))
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
         {
-            for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
+            @Override
+            public ListenableFuture<Void> call() throws Exception
             {
-                listener.connectionRemoved(connection);
+                if (_connections.remove(connection))
+                {
+                    for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
+                    {
+                        listener.connectionRemoved(connection);
+                    }
+                }
+                return Futures.immediateFuture(null);
             }
-        }
+        });
     }
 
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Sat Jun 27 21:13:25 2015
@@ -32,13 +32,14 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.NoFactoryForTypeException;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
 
 public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AMQQueue<?>, E extends ExchangeImpl<?> >
@@ -106,7 +107,7 @@ public interface VirtualHostImpl< X exte
 
     EventLogger getEventLogger();
 
-    boolean authoriseCreateConnection(AMQConnectionModel<?, ?> connection);
+    boolean authoriseCreateConnection(AMQPConnection<?> connection);
 
     String getLocalAddress(String routingAddress);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org