You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/27 23:13:27 UTC

svn commit: r1687962 [2/4] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ broker-codegen/src/main/java/org/apache/qpid/server/model/validation/ broker-core/src/main/java/org/apache/qpid/server/connection/ b...

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Sat Jun 27 21:13:25 2015
@@ -48,13 +48,14 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.VirtualHostAlias;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.virtualhost.*;
 
@@ -502,7 +503,7 @@ class RedirectingVirtualHostImpl
     }
 
     @Override
-    public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+    public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
     {
         return false;
     }
@@ -549,13 +550,13 @@ class RedirectingVirtualHostImpl
     }
 
     @Override
-    public void registerConnection(final Connection<?> connection)
+    public void registerConnection(final AMQPConnection<?> connection)
     {
         throwUnsupportedForRedirector();
     }
 
     @Override
-    public void deregisterConnection(final Connection<?> connection)
+    public void deregisterConnection(final AMQPConnection<?> connection)
     {
         throwUnsupportedForRedirector();
     }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java Sat Jun 27 21:13:25 2015
@@ -40,14 +40,14 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class ConnectionVersionValidatorTest extends QpidTestCase
 {
 
     private VirtualHost _virtualHostMock;
-    private AMQConnectionModel _connectionMock;
+    private AMQPConnection _connectionMock;
     private EventLogger _eventLoggerMock;
     private ConnectionVersionValidator _connectionValidator;
 
@@ -57,7 +57,7 @@ public class ConnectionVersionValidatorT
 
         _connectionValidator = new ConnectionVersionValidator();
         _virtualHostMock = mock(VirtualHost.class);
-        _connectionMock = mock(AMQConnectionModel.class);
+        _connectionMock = mock(AMQPConnection.class);
         _eventLoggerMock = mock(EventLogger.class);
         Broker brokerMock = mock(Broker.class);
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Sat Jun 27 21:13:25 2015
@@ -24,15 +24,21 @@ package org.apache.qpid.server.consumer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.lang.reflect.Type;
 import java.net.SocketAddress;
+import java.security.AccessControlException;
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -43,20 +49,23 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -314,7 +323,7 @@ public class MockConsumer implements Con
     }
 
 
-    private static class MockSessionModel implements AMQSessionModel
+    private static class MockSessionModel implements AMQSessionModel<MockSessionModel>
     {
         private final UUID _id = UUID.randomUUID();
         private Session _modelObject;
@@ -338,9 +347,9 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public AMQConnectionModel getConnectionModel()
+        public AMQPConnection<?> getAMQPConnection()
         {
-            return new MockConnectionModel();
+            return null;
         }
 
         @Override
@@ -499,11 +508,6 @@ public class MockConsumer implements Con
 
         }
 
-        @Override
-        public int compareTo(final Object o)
-        {
-            return 0;
-        }
 
         @Override
         public void transportStateChanged()
@@ -528,9 +532,15 @@ public class MockConsumer implements Con
         {
 
         }
+
+        @Override
+        public int compareTo(final AMQSessionModel o)
+        {
+            return 0;
+        }
     }
 
-    private static class MockConnectionModel implements AMQConnectionModel
+    private static class MockConnectionModel implements AMQPConnection<MockConnectionModel>
     {
 
         @Override
@@ -544,81 +554,112 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public StatisticsCounter getMessageDeliveryStatistics()
+        public void closeAsync(AMQConstant cause, String message)
+        {
+        }
+
+        @Override
+        public void closeSessionAsync(AMQSessionModel<?> session, AMQConstant cause,
+                                      String message)
+        {
+        }
+
+        @Override
+        public long getConnectionId()
+        {
+            return 0;
+        }
+
+        @Override
+        public void block()
         {
-            return null;
         }
 
         @Override
-        public StatisticsCounter getMessageReceiptStatistics()
+        public void unblock()
+        {
+
+        }
+
+        @Override
+        public String getRemoteAddressString()
+        {
+            return "remoteAddress:1234";
+        }
+
+        public SocketAddress getRemoteSocketAddress()
         {
             return null;
         }
 
         @Override
-        public StatisticsCounter getDataDeliveryStatistics()
+        public String getClientId()
         {
             return null;
         }
 
         @Override
-        public StatisticsCounter getDataReceiptStatistics()
+        public String getRemoteContainerName()
         {
             return null;
         }
 
         @Override
-        public void resetStatistics()
+        public void notifyWork()
         {
 
         }
 
         @Override
-        public void closeAsync(AMQConstant cause, String message)
+        public boolean isMessageAssignmentSuspended()
         {
+            return false;
         }
 
         @Override
-        public void closeSessionAsync(AMQSessionModel session, AMQConstant cause,
-                                      String message)
+        public boolean hasSessionWithName(final byte[] name)
         {
+            return false;
         }
 
         @Override
-        public long getConnectionId()
+        public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
         {
-            return 0;
+
         }
 
         @Override
-        public List<AMQSessionModel> getSessionModels()
+        public String getClientVersion()
         {
             return null;
         }
 
         @Override
-        public void block()
+        public boolean isIncoming()
         {
+            return false;
         }
 
         @Override
-        public void unblock()
+        public String getLocalAddress()
         {
+            return null;
         }
 
         @Override
-        public LogSubject getLogSubject()
+        public String getPrincipal()
         {
             return null;
         }
 
         @Override
-        public String getRemoteAddressString()
+        public String getRemoteAddress()
         {
-            return "remoteAddress:1234";
+            return null;
         }
 
-        public SocketAddress getRemoteAddress()
+        @Override
+        public String getRemoteProcessName()
         {
             return null;
         }
@@ -630,130 +671,395 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public String getClientId()
+        public long getSessionCountLimit()
+        {
+            return 0;
+        }
+
+        @Override
+        public Principal getAuthorizedPrincipal()
         {
             return null;
         }
 
         @Override
-        public String getRemoteContainerName()
+        public AmqpPort<?> getPort()
         {
             return null;
         }
 
         @Override
-        public void addSessionListener(final SessionModelListener listener)
+        public long getBytesIn()
+        {
+            return 0;
+        }
+
+        @Override
+        public long getBytesOut()
         {
+            return 0;
+        }
 
+        @Override
+        public long getMessagesIn()
+        {
+            return 0;
         }
 
         @Override
-        public void removeSessionListener(final SessionModelListener listener)
+        public long getMessagesOut()
         {
+            return 0;
+        }
 
+        @Override
+        public long getLastIoTime()
+        {
+            return 0;
         }
 
         @Override
-        public void notifyWork()
+        public int getSessionCount()
         {
+            return 0;
+        }
 
+        @Override
+        public Collection<Session> getSessions()
+        {
+            return null;
         }
 
         @Override
-        public boolean isMessageAssignmentSuspended()
+        public AbstractAMQPConnection<?> getUnderlyingConnection()
+        {
+            return null;
+        }
+
+        @Override
+        public Transport getTransport()
+        {
+            return null;
+        }
+
+        @Override
+        public boolean isConnectionStopped()
         {
             return false;
         }
 
         @Override
-        public ProtocolEngine getProtocolEngine()
+        public String getVirtualHostName()
         {
             return null;
         }
 
         @Override
-        public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+        public VirtualHost<?, ?, ?> getVirtualHost()
+        {
+            return null;
+        }
+
+        @Override
+        public void addDeleteTask(final Action task)
         {
 
         }
 
         @Override
-        public String getClientVersion()
+        public void removeDeleteTask(final Action task)
+        {
+
+        }
+
+
+        @Override
+        public UUID getId()
         {
             return null;
         }
 
         @Override
-        public String getClientProduct()
+        public String getName()
         {
             return null;
         }
 
         @Override
-        public Principal getAuthorizedPrincipal()
+        public String getDescription()
         {
             return null;
         }
 
         @Override
-        public long getSessionCountLimit()
+        public String getType()
+        {
+            return null;
+        }
+
+        @Override
+        public Map<String, String> getContext()
+        {
+            return null;
+        }
+
+        @Override
+        public <T> T getContextValue(final Class<T> clazz, final String propertyName)
+        {
+            return null;
+        }
+
+        @Override
+        public <T> T getContextValue(final Class<T> clazz, final Type t, final String propertyName)
+        {
+            return null;
+        }
+
+        @Override
+        public Set<String> getContextKeys(final boolean excludeSystem)
+        {
+            return null;
+        }
+
+        @Override
+        public String getLastUpdatedBy()
+        {
+            return null;
+        }
+
+        @Override
+        public long getLastUpdatedTime()
         {
             return 0;
         }
 
         @Override
-        public long getLastIoTime()
+        public String getCreatedBy()
+        {
+            return null;
+        }
+
+        @Override
+        public long getCreatedTime()
         {
             return 0;
         }
 
         @Override
-        public AmqpPort<?> getPort()
+        public org.apache.qpid.server.model.State getDesiredState()
         {
             return null;
         }
 
         @Override
-        public Transport getTransport()
+        public org.apache.qpid.server.model.State getState()
         {
             return null;
         }
 
         @Override
-        public void stop()
+        public void addChangeListener(final ConfigurationChangeListener listener)
         {
+
         }
 
         @Override
-        public boolean isStopped()
+        public boolean removeChangeListener(final ConfigurationChangeListener listener)
         {
             return false;
         }
 
         @Override
-        public String getVirtualHostName()
+        public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
         {
             return null;
         }
 
         @Override
-        public VirtualHost<?, ?, ?> getVirtualHost()
+        public boolean isDurable()
+        {
+            return false;
+        }
+
+        @Override
+        public LifetimePolicy getLifetimePolicy()
         {
             return null;
         }
 
         @Override
-        public void addDeleteTask(final Action task)
+        public Collection<String> getAttributeNames()
         {
+            return null;
+        }
 
+        @Override
+        public Object getAttribute(final String name)
+        {
+            return null;
         }
 
         @Override
-        public void removeDeleteTask(final Action task)
+        public Map<String, Object> getActualAttributes()
+        {
+            return null;
+        }
+
+        @Override
+        public Object setAttribute(final String name, final Object expected, final Object desired)
+                throws IllegalStateException, AccessControlException, IllegalArgumentException
+        {
+            return null;
+        }
+
+        @Override
+        public Map<String, Number> getStatistics()
+        {
+            return null;
+        }
+
+        @Override
+        public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
+        {
+            return null;
+        }
+
+        @Override
+        public <C extends ConfiguredObject> C getChildById(final Class<C> clazz, final UUID id)
+        {
+            return null;
+        }
+
+        @Override
+        public <C extends ConfiguredObject> C getChildByName(final Class<C> clazz, final String name)
+        {
+            return null;
+        }
+
+        @Override
+        public <C extends ConfiguredObject> C createChild(final Class<C> childClass,
+                                                          final Map<String, Object> attributes,
+                                                          final ConfiguredObject... otherParents)
+        {
+            return null;
+        }
+
+        @Override
+        public <C extends ConfiguredObject> ListenableFuture<C> createChildAsync(final Class<C> childClass,
+                                                                                 final Map<String, Object> attributes,
+                                                                                 final ConfiguredObject... otherParents)
+        {
+            return null;
+        }
+
+        @Override
+        public void setAttributes(final Map<String, Object> attributes)
+                throws IllegalStateException, AccessControlException, IllegalArgumentException
         {
 
         }
 
+        @Override
+        public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes)
+                throws IllegalStateException, AccessControlException, IllegalArgumentException
+        {
+            return null;
+        }
+
+        @Override
+        public Class<? extends ConfiguredObject> getCategoryClass()
+        {
+            return null;
+        }
 
+        @Override
+        public Class<? extends ConfiguredObject> getTypeClass()
+        {
+            return null;
+        }
+
+        @Override
+        public boolean managesChildStorage()
+        {
+            return false;
+        }
+
+        @Override
+        public <C extends ConfiguredObject<C>> C findConfiguredObject(final Class<C> clazz, final String name)
+        {
+            return null;
+        }
+
+        @Override
+        public ConfiguredObjectRecord asObjectRecord()
+        {
+            return null;
+        }
+
+        @Override
+        public void open()
+        {
+
+        }
+
+        @Override
+        public ListenableFuture<Void> openAsync()
+        {
+            return null;
+        }
+
+        @Override
+        public void close()
+        {
+
+        }
+
+        @Override
+        public ListenableFuture<Void> closeAsync()
+        {
+            return null;
+        }
+
+        @Override
+        public ListenableFuture<Void> deleteAsync()
+        {
+            return null;
+        }
+
+        @Override
+        public TaskExecutor getTaskExecutor()
+        {
+            return null;
+        }
+
+        @Override
+        public TaskExecutor getChildExecutor()
+        {
+            return null;
+        }
+
+        @Override
+        public ConfiguredObjectFactory getObjectFactory()
+        {
+            return null;
+        }
+
+        @Override
+        public Model getModel()
+        {
+            return null;
+        }
+
+        @Override
+        public void delete()
+        {
+
+        }
+
+        @Override
+        public void decryptSecrets()
+        {
+
+        }
     }
 }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java Sat Jun 27 21:13:25 2015
@@ -27,8 +27,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Arrays;
-
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.filter.Filter;
 import ch.qos.logback.core.spi.FilterReply;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java Sat Jun 27 21:13:25 2015
@@ -21,13 +21,13 @@
 package org.apache.qpid.server.logging.actors;
 
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public abstract class BaseConnectionActorTestCase extends BaseActorTestCase
 {
-    private AMQConnectionModel _session;
+    private AMQPConnection<?> _connection;
     private VirtualHostImpl _virtualHost;
 
     @Override
@@ -35,7 +35,7 @@ public abstract class BaseConnectionActo
     {
         super.setUp();
         BrokerTestHelper.setUp();
-        _session = BrokerTestHelper.createConnection();
+        _connection = BrokerTestHelper.createConnection();
         _virtualHost = BrokerTestHelper.createVirtualHost("test");
     }
 
@@ -53,9 +53,9 @@ public abstract class BaseConnectionActo
             {
                 _virtualHost.close();
             }
-            if (_session != null)
+            if (_connection != null)
             {
-                _session.closeAsync(AMQConstant.CONNECTION_FORCED, "");
+                _connection.closeAsync(AMQConstant.CONNECTION_FORCED, "");
             }
         }
         finally
@@ -65,9 +65,9 @@ public abstract class BaseConnectionActo
         }
     }
 
-    public AMQConnectionModel getConnection()
+    public AMQPConnection<?> getConnection()
     {
-        return _session;
+        return _connection;
     }
 
 }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java Sat Jun 27 21:13:25 2015
@@ -38,7 +38,7 @@ public class ChannelLogSubjectTest exten
         super.setUp();
 
         AMQSessionModel session = mock(AMQSessionModel.class);
-        when(session.getConnectionModel()).thenReturn(getConnection());
+        when(session.getAMQPConnection()).thenReturn(getConnection());
         when(session.getChannelId()).thenReturn(_channelID);
         _subject = new ChannelLogSubject(session);
     }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java Sat Jun 27 21:13:25 2015
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.logging.subjects;
 
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
 
 import java.security.Principal;
 
@@ -38,7 +38,7 @@ public class ConnectionLogSubjectTest ex
     private static final String IP_STRING = "127.0.0.1:1";
     private static final String VHOST = "test";
 
-    private AMQConnectionModel _connection;
+    private AMQPConnection _connection;
 
     @Override
     public void setUp() throws Exception
@@ -48,7 +48,7 @@ public class ConnectionLogSubjectTest ex
         final Principal principal = mock(Principal.class);
         when(principal.getName()).thenReturn(USER);
 
-        _connection = mock(AMQConnectionModel.class);
+        _connection = mock(AMQPConnection.class);
         when(_connection.getConnectionId()).thenReturn(CONNECTION_ID);
         when(_connection.getAuthorizedPrincipal()).thenReturn(principal);
         when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING);
@@ -66,7 +66,7 @@ public class ConnectionLogSubjectTest ex
         verifyConnection(CONNECTION_ID, USER, IP_STRING, VHOST, message);
     }
 
-    public AMQConnectionModel getConnection()
+    public AMQPConnection getConnection()
     {
         return _connection;
     }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Sat Jun 27 21:13:25 2015
@@ -53,12 +53,12 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.model.adapter.ConnectionAdapter;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
@@ -230,10 +230,10 @@ public class VirtualHostTest extends Qpi
         VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName);
         assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
 
-        AMQConnectionModel connection = createMockProtocolConnection(virtualHost);
+        AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost);
         assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getConnectionCount());
 
-        Connection modelConnection = mock(Connection.class);
+        AMQPConnection modelConnection = mock(AMQPConnection.class);
         when(modelConnection.getUnderlyingConnection()).thenReturn(connection);
         when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
         virtualHost.registerConnection(modelConnection);
@@ -257,12 +257,12 @@ public class VirtualHostTest extends Qpi
         VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName);
         assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
 
-        AMQConnectionModel connection = createMockProtocolConnection(virtualHost);
+        AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost);
         assertEquals("Unexpected number of connections before connection registered",
                 0,
                 virtualHost.getConnectionCount());
 
-        Connection modelConnection = mock(Connection.class);
+        AMQPConnection modelConnection = mock(AMQPConnection.class);
         when(modelConnection.getUnderlyingConnection()).thenReturn(connection);
         virtualHost.registerConnection(modelConnection);
 
@@ -397,9 +397,9 @@ public class VirtualHostTest extends Qpi
         return host;
     }
 
-    private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost)
+    private AbstractAMQPConnection createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost)
     {
-        final AMQConnectionModel connection = mock(AMQConnectionModel.class);
+        final AbstractAMQPConnection connection = mock(AbstractAMQPConnection.class);
         final List<Action<?>> tasks = new ArrayList<>();
         final ArgumentCaptor<Action> deleteTaskCaptor = ArgumentCaptor.forClass(Action.class);
         Answer answer = new Answer()

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java Sat Jun 27 21:13:25 2015
@@ -59,13 +59,13 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.model.VirtualHostLogger;
 import org.apache.qpid.server.model.VirtualHostLoggerFilter;
 import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.security.access.ObjectProperties;
 import org.apache.qpid.server.security.access.ObjectProperties.Property;
 import org.apache.qpid.server.security.access.ObjectType;
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.security.access.OperationLoggingDetails;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class SecurityManagerTest extends QpidTestCase
@@ -190,7 +190,7 @@ public class SecurityManagerTest extends
 
     public void testAuthoriseCreateConnection()
     {
-        AMQConnectionModel<?,?> connection = mock(AMQConnectionModel.class);
+        AMQPConnection<?> connection = mock(AMQPConnection.class);
         when(connection.getVirtualHostName()).thenReturn(TEST_VIRTUAL_HOST);
 
         ObjectProperties properties = new ObjectProperties();

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Sat Jun 27 21:13:25 2015
@@ -43,12 +43,12 @@ import org.apache.qpid.server.model.Syst
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 import org.apache.qpid.server.virtualhost.QueueExistsException;
 import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
@@ -181,34 +181,34 @@ public class BrokerTestHelper
         return createVirtualHost(attributes, broker, defaultVHN);
     }
 
-    public static AMQSessionModel<?,?> createSession(int channelId, AMQConnectionModel<?,?> connection)
+    public static AMQSessionModel<?> createSession(int channelId, AMQPConnection<?> connection)
     {
         @SuppressWarnings("rawtypes")
         AMQSessionModel session = mock(AMQSessionModel.class);
-        when(session.getConnectionModel()).thenReturn(connection);
+        when(session.getAMQPConnection()).thenReturn(connection);
         when(session.getChannelId()).thenReturn(channelId);
         return session;
     }
 
-    public static AMQSessionModel<?,?> createSession(int channelId) throws Exception
+    public static AMQSessionModel<?> createSession(int channelId) throws Exception
     {
-        AMQConnectionModel<?,?> session = createConnection();
+        AMQPConnection<?> session = createConnection();
         return createSession(channelId, session);
     }
 
-    public static AMQSessionModel<?,?> createSession() throws Exception
+    public static AMQSessionModel<?> createSession() throws Exception
     {
         return createSession(1);
     }
 
-    public static AMQConnectionModel<?,?> createConnection() throws Exception
+    public static AMQPConnection<?> createConnection() throws Exception
     {
         return createConnection("test");
     }
 
-    public static AMQConnectionModel<?,?> createConnection(String hostName) throws Exception
+    public static AMQPConnection<?> createConnection(String hostName) throws Exception
     {
-        return mock(AMQConnectionModel.class);
+        return mock(AMQPConnection.class);
     }
 
     public static ExchangeImpl<?> createExchange(String hostName, final boolean durable, final EventLogger eventLogger) throws Exception

Modified: qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java (original)
+++ qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java Sat Jun 27 21:13:25 2015
@@ -175,7 +175,7 @@ public class DefaultAccessControl implem
         Set<ConnectionPrincipal> principals = subject.getPrincipals(ConnectionPrincipal.class);
         if(!principals.isEmpty())
         {
-            SocketAddress address = principals.iterator().next().getConnection().getRemoteAddress();
+            SocketAddress address = principals.iterator().next().getConnection().getRemoteSocketAddress();
             if(address instanceof InetSocketAddress)
             {
                 addressOfClient = ((InetSocketAddress) address).getAddress();

Modified: qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java (original)
+++ qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java Sat Jun 27 21:13:25 2015
@@ -35,7 +35,6 @@ import org.apache.qpid.server.connection
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.Result;
 import org.apache.qpid.server.security.access.ObjectProperties;
 import org.apache.qpid.server.security.access.ObjectType;
@@ -44,6 +43,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.access.config.Rule;
 import org.apache.qpid.server.security.access.config.RuleSet;
 import org.apache.qpid.server.security.auth.TestPrincipalUtils;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
@@ -239,8 +239,8 @@ public class DefaultAccessControlTest ex
         final InetAddress inetAddress = InetAddress.getLocalHost();
         final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1);
 
-        AMQConnectionModel connectionModel = mock(AMQConnectionModel.class);
-        when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress);
+        AMQPConnection connectionModel = mock(AMQPConnection.class);
+        when(connectionModel.getRemoteSocketAddress()).thenReturn(inetSocketAddress);
 
         subject.getPrincipals().add(new ConnectionPrincipal(connectionModel));
 
@@ -269,8 +269,8 @@ public class DefaultAccessControlTest ex
         final InetAddress inetAddress = InetAddress.getLocalHost();
         final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1);
 
-        AMQConnectionModel connectionModel = mock(AMQConnectionModel.class);
-        when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress);
+        AMQPConnection connectionModel = mock(AMQPConnection.class);
+        when(connectionModel.getRemoteSocketAddress()).thenReturn(inetSocketAddress);
 
         subject.getPrincipals().add(new ConnectionPrincipal(connectionModel));
 

Added: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1687962&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sat Jun 27 21:13:25 2015
@@ -0,0 +1,371 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+
+public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
+{
+    private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
+    private final InputHandler _inputHandler;
+
+
+    private final NetworkConnection _network;
+    private ServerConnection _connection;
+
+    private long _createTime = System.currentTimeMillis();
+    private volatile long _lastReadTime = _createTime;
+    private volatile long _lastWriteTime = _createTime;
+    private volatile boolean _transportBlockedForWriting;
+
+    private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
+    private final AtomicBoolean _stateChanged = new AtomicBoolean();
+    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+
+
+    public AMQPConnection_0_10(final Broker<?> broker,
+                               NetworkConnection network,
+                               final AmqpPort<?> port,
+                               final Transport transport,
+                               final long id,
+                               final AggregateTicker aggregateTicker)
+    {
+        super(broker, network, port, transport, id, aggregateTicker);
+
+        _connection = new ServerConnection(id, broker, port, transport);
+        _connection.setAmqpConnection(this);
+        SocketAddress address = network.getLocalAddress();
+        String fqdn = null;
+
+        if (address instanceof InetSocketAddress)
+        {
+            fqdn = ((InetSocketAddress) address).getHostName();
+        }
+        SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
+        ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, fqdn, subjectCreator);
+
+        _connection.setConnectionDelegate(connDelegate);
+        _connection.setRemoteAddress(network.getRemoteAddress());
+        _connection.setLocalAddress(network.getLocalAddress());
+
+        _inputHandler = new InputHandler(new ServerAssembler(_connection));
+        _network = network;
+
+        Subject.doAs(getSubject(), new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                _connection.getEventLogger().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+
+                _connection.setNetworkConnection(_network);
+                ServerDisassembler disassembler = new ServerDisassembler(wrapSender(_network.getSender()), Constant.MIN_MAX_FRAME_SIZE);
+                _connection.setSender(disassembler);
+                _connection.addFrameSizeObserver(disassembler);
+                // FIXME Two log messages to maintain compatibility with earlier protocol versions
+                _connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
+
+                return null;
+            }
+        });
+    }
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        Thread lock = _messageAssignmentSuspended.get();
+        return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
+    }
+
+    @Override
+    public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+    {
+        _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+
+        for(AMQSessionModel<?> session : _connection.getSessionModels())
+        {
+            for (Consumer<?> consumer : session.getConsumers())
+            {
+                ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+                if (!messageAssignmentSuspended)
+                {
+                    consumerImpl.getTarget().notifyCurrentState();
+                }
+                else
+                {
+                    // ensure that by the time the method returns, no consumer can be in the process of
+                    // delivering a message.
+                    consumerImpl.getSendLock();
+                    consumerImpl.releaseSendLock();
+                }
+            }
+        }
+    }
+
+
+    private ByteBufferSender wrapSender(final ByteBufferSender sender)
+    {
+        return new ByteBufferSender()
+        {
+            @Override
+            public void send(ByteBuffer msg)
+            {
+                _lastWriteTime = System.currentTimeMillis();
+                sender.send(msg);
+            }
+
+            @Override
+            public void flush()
+            {
+                sender.flush();
+
+            }
+
+            @Override
+            public void close()
+            {
+                sender.close();
+            }
+        };
+    }
+
+    @Override
+    public long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    @Override
+    public long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
+
+    public void received(final ByteBuffer buf)
+    {
+        Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                _lastReadTime = System.currentTimeMillis();
+                if (_connection.getAuthorizedPrincipal() == null &&
+                    (_lastReadTime - _createTime) > _connection.getPort().getContextValue(Long.class,
+                                                                                          Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+                {
+
+                    _logger.warn("Connection has taken more than "
+                                 + _connection.getPort()
+                                         .getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+                                 + "ms to establish identity.  Closing as possible DoS.");
+                    _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+                    _network.close();
+
+                }
+                _inputHandler.received(buf);
+                _connection.receivedComplete();
+                return null;
+            }
+        });
+    }
+
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
+    public void writerIdle()
+    {
+        _connection.doHeartBeat();
+    }
+
+    public void readerIdle()
+    {
+        Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+                    _network.close();
+                    return null;
+                }
+            });
+
+    }
+
+    public String getAddress()
+    {
+        return _network.getRemoteAddress().toString();
+    }
+
+    @Override
+    public void closed()
+    {
+        _inputHandler.closed();
+    }
+
+    @Override
+    protected void performDeleteTasks()
+    {
+        super.performDeleteTasks();
+    }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _transportBlockedForWriting = blocked;
+        _connection.transportStateChanged();
+    }
+
+    @Override
+    public void processPending()
+    {
+        _connection.processPending();
+    }
+
+    @Override
+    public boolean hasWork()
+    {
+        return _stateChanged.get();
+    }
+
+    @Override
+    public void notifyWork()
+    {
+        _stateChanged.set(true);
+
+        final Action<ProtocolEngine> listener = _workListener.get();
+        if(listener != null)
+        {
+            listener.performAction(this);
+        }
+    }
+
+    public void clearWork()
+    {
+        _stateChanged.set(false);
+    }
+
+    public void setWorkListener(final Action<ProtocolEngine> listener)
+    {
+        _workListener.set(listener);
+    }
+
+    public boolean hasSessionWithName(final byte[] name)
+    {
+        return _connection.hasSessionWithName(name);
+    }
+
+    public void closeAsync(final AMQConstant cause, final String message)
+    {
+        _connection.closeAsync(cause, message);
+    }
+
+    public Principal getAuthorizedPrincipal()
+    {
+        return _connection.getAuthorizedPrincipal();
+    }
+
+    public void closeSessionAsync(final AMQSessionModel<?> session,
+                                  final AMQConstant cause, final String message)
+    {
+        _connection.closeSessionAsync((ServerSession)session, cause, message);
+    }
+
+    public void block()
+    {
+        _connection.block();
+    }
+
+    public String getRemoteContainerName()
+    {
+        return _connection.getRemoteContainerName();
+    }
+
+    public VirtualHost<?, ?, ?> getVirtualHost()
+    {
+        return _connection.getVirtualHost();
+    }
+
+    public List<ServerSession> getSessionModels()
+    {
+        return _connection.getSessionModels();
+    }
+
+    public void unblock()
+    {
+        _connection.unblock();
+    }
+
+    public LogSubject getLogSubject()
+    {
+        return _connection.getLogSubject();
+    }
+
+    public long getSessionCountLimit()
+    {
+        return _connection.getSessionCountLimit();
+    }
+
+}

Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sat Jun 27 21:13:25 2015
@@ -107,7 +107,7 @@ public class ConsumerTarget_0_10 extends
     @Override
     public boolean doIsSuspended()
     {
-        return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
+        return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getAMQPConnection().isConnectionStopped(); // TODO check for Session suspension
     }
 
     public boolean close()
@@ -559,10 +559,10 @@ public class ConsumerTarget_0_10 extends
         switch(flowMode)
         {
             case CREDIT:
-                _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
+                _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getAmqpConnection());
                 break;
             case WINDOW:
-                _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
+                _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getAmqpConnection());
                 break;
             default:
                 // this should never happen, as 0-10 is finalised and so the enum should never change

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Sat Jun 27 21:13:25 2015
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
@@ -30,8 +27,6 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
@@ -72,25 +67,11 @@ public class ProtocolEngineCreator_0_10
                                             Transport transport,
                                             long id, final AggregateTicker aggregateTicker)
     {
-        String fqdn = null;
-        SocketAddress address = network.getLocalAddress();
-        if (address instanceof InetSocketAddress)
-        {
-            fqdn = ((InetSocketAddress) address).getHostName();
-        }
-        SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
-        ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, fqdn, subjectCreator);
-
-        ServerConnection conn = new ServerConnection(id, broker, port, transport);
-
-        conn.setConnectionDelegate(connDelegate);
-        conn.setRemoteAddress(network.getRemoteAddress());
-        conn.setLocalAddress(network.getLocalAddress());
-
-        ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network, aggregateTicker);
-        conn.setProtocolEngine(protocolEngine);
 
-        return protocolEngine;
+        final AMQPConnection_0_10 protocolEngine_0_10 =
+                new AMQPConnection_0_10(broker, network, port, transport, id, aggregateTicker);
+        protocolEngine_0_10.create();
+        return protocolEngine_0_10;
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sat Jun 27 21:13:25 2015
@@ -34,7 +34,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -44,23 +43,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.adapter.ConnectionAdapter;
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.SessionModelListener;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -75,7 +67,7 @@ import org.apache.qpid.transport.Option;
 import org.apache.qpid.transport.ProtocolEvent;
 import org.apache.qpid.transport.Session;
 
-public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
+public class ServerConnection extends Connection implements //AMQConnectionModel<ServerConnection, ServerSession>,
                                                             LogSubject, AuthorizationHolder
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class);
@@ -83,9 +75,7 @@ public class ServerConnection extends Co
     private final Broker<?> _broker;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
 
-    private final Subject _authorizedSubject = new Subject();
     private Principal _authorizedPrincipal = null;
-    private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
     private final long _connectionId;
     private final Object _reference = new Object();
     private VirtualHostImpl<?,?,?> _virtualHost;
@@ -94,23 +84,15 @@ public class ServerConnection extends Co
     private boolean _blocking;
     private final Transport _transport;
 
-    private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList =
-            new CopyOnWriteArrayList<Action<? super ServerConnection>>();
-
     private final Queue<Action<? super ServerConnection>> _asyncTaskList =
             new ConcurrentLinkedQueue<>();
 
-    private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
-            new CopyOnWriteArrayList<SessionModelListener>();
-
-    private volatile boolean _stopped;
     private int _messageCompressionThreshold;
     private final int _maxMessageSize;
 
-    private ProtocolEngine_0_10 _protocolEngine;
+    private AMQPConnection_0_10 _amqpConnection;
     private boolean _ignoreFutureInput;
     private boolean _ignoreAllButConnectionCloseOk;
-    private ConnectionAdapter _adapter;
 
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
@@ -118,7 +100,6 @@ public class ServerConnection extends Co
                             final Transport transport)
     {
         _connectionId = connectionId;
-        _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
         _broker = broker;
 
         _port = port;
@@ -127,13 +108,6 @@ public class ServerConnection extends Co
         int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
         _maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE;
 
-
-        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
-        _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
-        _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
-        _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
-        _adapter = new ConnectionAdapter(this);
-        _adapter.create();
     }
 
     public Object getReference()
@@ -177,19 +151,19 @@ public class ServerConnection extends Co
                     true,
                     true));
 
-            _adapter.virtualHostAssociated();
+            _amqpConnection.virtualHostAssociated();
         }
 
         if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
         {
             if(_virtualHost != null)
             {
-                _virtualHost.deregisterConnection(_adapter);
+                _virtualHost.deregisterConnection(_amqpConnection);
             }
         }
         if(state == State.CLOSING)
         {
-            getProtocolEngine().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, getNetworkConnection()));
+            getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, getNetworkConnection()));
         }
         if (state == State.CLOSED)
         {
@@ -211,26 +185,14 @@ public class ServerConnection extends Co
         return (ServerConnectionDelegate) super.getConnectionDelegate();
     }
 
-    public void setConnectionDelegate(ServerConnectionDelegate delegate)
-    {
-        super.setConnectionDelegate(delegate);
-    }
-
-    @Override
-    public ProtocolEngine getProtocolEngine()
-    {
-        return _protocolEngine;
-    }
-
-    @Override
-    public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+    public AMQPConnection_0_10 getAmqpConnection()
     {
-        _protocolEngine.setScheduler(networkConnectionScheduler);
+        return _amqpConnection;
     }
 
-    public void setProtocolEngine(final ProtocolEngine_0_10 serverProtocolEngine)
+    public void setAmqpConnection(final AMQPConnection_0_10 serverProtocolEngine)
     {
-        _protocolEngine = serverProtocolEngine;
+        _amqpConnection = serverProtocolEngine;
     }
 
     public VirtualHostImpl<?,?,?> getVirtualHost()
@@ -249,39 +211,19 @@ public class ServerConnection extends Co
         {
             _messageCompressionThreshold = Integer.MAX_VALUE;
         }
-        _authorizedSubject.getPrincipals().add(_virtualHost.getPrincipal());
+        _amqpConnection.getSubject().getPrincipals().add(_virtualHost.getPrincipal());
     }
 
-    @Override
-    public String getVirtualHostName()
-    {
-        return _virtualHost == null ? null : _virtualHost.getName();
-    }
-
-    @Override
     public AmqpPort<?> getPort()
     {
         return _port;
     }
 
-    @Override
     public Transport getTransport()
     {
         return _transport;
     }
 
-    @Override
-    public void stop()
-    {
-        _stopped = true;
-    }
-
-    @Override
-    public boolean isStopped()
-    {
-        return _stopped;
-    }
-
     public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message)
     {
         addAsyncTask(new Action<ServerConnection>()
@@ -345,7 +287,7 @@ public class ServerConnection extends Co
         Subject subject;
         if (event.isConnectionControl())
         {
-            subject = _authorizedSubject;
+            subject = _amqpConnection.getSubject();
         }
         else
         {
@@ -356,7 +298,7 @@ public class ServerConnection extends Co
             }
             else
             {
-                subject = _authorizedSubject;
+                subject = _amqpConnection.getSubject();
             }
         }
 
@@ -446,10 +388,7 @@ public class ServerConnection extends Co
 
     protected void performDeleteTasks()
     {
-        for(Action<? super ServerConnection> task : _connectionCloseTaskList)
-        {
-            task.performAction(this);
-        }
+        _amqpConnection.performDeleteTasks();
     }
 
     public synchronized void block()
@@ -480,7 +419,7 @@ public class ServerConnection extends Co
     public synchronized void registerSession(final Session ssn)
     {
         super.registerSession(ssn);
-        sessionAdded((ServerSession)ssn);
+        _amqpConnection.sessionAdded((ServerSession) ssn);
         if(_blocking)
         {
             ((ServerSession)ssn).block();
@@ -490,13 +429,13 @@ public class ServerConnection extends Co
     @Override
     public synchronized void removeSession(final Session ssn)
     {
-        sessionRemoved((ServerSession)ssn);
+        _amqpConnection.sessionRemoved((ServerSession) ssn);
         super.removeSession(ssn);
     }
 
     public List<ServerSession> getSessionModels()
     {
-        List<ServerSession> sessions = new ArrayList<ServerSession>();
+        List<ServerSession> sessions = new ArrayList<>();
         for (Session ssn : getChannels())
         {
             sessions.add((ServerSession) ssn);
@@ -504,54 +443,13 @@ public class ServerConnection extends Co
         return sessions;
     }
 
-    public void registerMessageDelivered(long messageSize)
-    {
-        _messagesDelivered.registerEvent(1L);
-        _dataDelivered.registerEvent(messageSize);
-        _virtualHost.registerMessageDelivered(messageSize);
-    }
-
-    public void registerMessageReceived(long messageSize, long timestamp)
-    {
-        _messagesReceived.registerEvent(1L, timestamp);
-        _dataReceived.registerEvent(messageSize, timestamp);
-        _virtualHost.registerMessageReceived(messageSize, timestamp);
-    }
-
-    public StatisticsCounter getMessageReceiptStatistics()
-    {
-        return _messagesReceived;
-    }
-
-    public StatisticsCounter getDataReceiptStatistics()
-    {
-        return _dataReceived;
-    }
-
-    public StatisticsCounter getMessageDeliveryStatistics()
-    {
-        return _messagesDelivered;
-    }
-
-    public StatisticsCounter getDataDeliveryStatistics()
-    {
-        return _dataDelivered;
-    }
-
-    public void resetStatistics()
-    {
-        _messagesDelivered.reset();
-        _dataDelivered.reset();
-        _messagesReceived.reset();
-        _dataReceived.reset();
-    }
 
     /**
      * @return authorizedSubject
      */
     public Subject getAuthorizedSubject()
     {
-        return _authorizedSubject;
+        return _amqpConnection.getSubject();
     }
 
     /**
@@ -568,7 +466,7 @@ public class ServerConnection extends Co
         }
         else
         {
-            _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals());
+            getAuthorizedSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
 
             _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject);
         }
@@ -586,13 +484,7 @@ public class ServerConnection extends Co
 
     public String getRemoteAddressString()
     {
-        return String.valueOf(getRemoteAddress());
-    }
-
-    @Override
-    public String getRemoteProcessPid()
-    {
-        return getConnectionDelegate().getRemoteProcessPid();
+        return String.valueOf(getRemoteSocketAddress());
     }
 
     @Override
@@ -636,59 +528,22 @@ public class ServerConnection extends Co
         super.send(event);
     }
 
-    public long getLastIoTime()
-    {
-        return _lastIoTime.longValue();
-    }
-
-    @Override
     public String getClientId()
     {
         return getConnectionDelegate().getClientId();
     }
 
-    @Override
     public String getRemoteContainerName()
     {
         return getConnectionDelegate().getClientId();
     }
 
-    @Override
-    public void addSessionListener(final SessionModelListener listener)
-    {
-        _sessionListeners.add(listener);
-    }
-
-    @Override
-    public void removeSessionListener(final SessionModelListener listener)
-    {
-        _sessionListeners.remove(listener);
-    }
-
-    private void sessionAdded(final AMQSessionModel<?,?> session)
-    {
-        for(SessionModelListener l : _sessionListeners)
-        {
-            l.sessionAdded(session);
-        }
-    }
 
-    private void sessionRemoved(final AMQSessionModel<?,?> session)
-    {
-        for(SessionModelListener l : _sessionListeners)
-        {
-            l.sessionRemoved(session);
-        }
-    }
-
-
-    @Override
     public String getClientVersion()
     {
         return getConnectionDelegate().getClientVersion();
     }
 
-    @Override
     public String getClientProduct()
     {
         return getConnectionDelegate().getClientProduct();
@@ -721,24 +576,12 @@ public class ServerConnection extends Co
         super.doHeartBeat();
     }
 
-    @Override
-    public void addDeleteTask(final Action<? super ServerConnection> task)
-    {
-        _connectionCloseTaskList.add(task);
-    }
-
     private void addAsyncTask(final Action<ServerConnection> action)
     {
         _asyncTaskList.add(action);
         notifyWork();
     }
 
-    @Override
-    public void removeDeleteTask(final Action<? super ServerConnection> task)
-    {
-        _connectionCloseTaskList.remove(task);
-    }
-
     public int getMessageCompressionThreshold()
     {
         return _messageCompressionThreshold;
@@ -757,26 +600,18 @@ public class ServerConnection extends Co
         }
     }
 
-    @Override
     public void notifyWork()
     {
-        _protocolEngine.notifyWork();
-    }
-
-
-    @Override
-    public boolean isMessageAssignmentSuspended()
-    {
-        return _protocolEngine.isMessageAssignmentSuspended();
+        _amqpConnection.notifyWork();
     }
 
     public void processPending()
     {
-        List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels());
+        List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
         while(!sessionsWithPending.isEmpty())
         {
-            final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator();
-            AMQSessionModel<?, ?> session;
+            final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator();
+            AMQSessionModel<?> session;
             while(iter.hasNext())
             {
                 session = iter.next();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Sat Jun 27 21:13:25 2015
@@ -45,10 +45,10 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.*;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -211,7 +211,7 @@ public class ServerConnectionDelegate ex
             sconn.setVirtualHost(vhost);
             try
             {
-                if(!vhost.authoriseCreateConnection(sconn))
+                if(!vhost.authoriseCreateConnection(sconn.getAmqpConnection()))
                 {
                     sconn.setState(Connection.State.CLOSING);
                     sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized");
@@ -367,23 +367,19 @@ public class ServerConnectionDelegate ex
         final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
         final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
 
-        final Iterator<org.apache.qpid.server.model.Connection<?>> connections =
+        final Iterator<? extends org.apache.qpid.server.model.Connection<?>> connections =
                         ((ServerConnection)conn).getVirtualHost().getConnections().iterator();
         while(connections.hasNext())
         {
             final org.apache.qpid.server.model.Connection<?> modelConnnection = connections.next();
-            final AMQConnectionModel amqConnectionModel = modelConnnection.getUnderlyingConnection();
-            if (amqConnectionModel instanceof ServerConnection)
-            {
-                ServerConnection otherConnection = (ServerConnection)amqConnectionModel;
+            final AMQPConnection<?> amqConnectionModel = modelConnnection.getUnderlyingConnection();
 
-                final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
-                        ? ""
-                        : amqConnectionModel.getAuthorizedPrincipal().getName();
-                if (userId.equals(userName) && otherConnection.hasSessionWithName(name))
-                {
-                    return false;
-                }
+            final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+                    ? ""
+                    : amqConnectionModel.getAuthorizedPrincipal().getName();
+            if (userId.equals(userName) && amqConnectionModel.hasSessionWithName(name))
+            {
+                return false;
             }
         }
         return true;
@@ -402,10 +398,20 @@ public class ServerConnectionDelegate ex
                 _compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported));
 
             }
+            final AMQPConnection_0_10 protocolEngine = ((ServerConnection) conn).getAmqpConnection();
+            protocolEngine.setClientId(getStringClientProperty(ConnectionStartProperties.CLIENT_ID_0_10));
+            protocolEngine.setClientProduct(getStringClientProperty(ConnectionStartProperties.PRODUCT));
+            protocolEngine.setClientVersion(getStringClientProperty(ConnectionStartProperties.VERSION_0_10));
+            protocolEngine.setRemoteProcessPid(getStringClientProperty(ConnectionStartProperties.PID));
         }
         super.connectionStartOk(conn, ok);
     }
 
+    private String getStringClientProperty(final String name)
+    {
+        return (_clientProperties == null || _clientProperties.get(name) == null) ? null : String.valueOf(_clientProperties.get(name));
+    }
+
     public Map<String,Object> getClientProperties()
     {
         return _clientProperties;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sat Jun 27 21:13:25 2015
@@ -75,6 +75,7 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -113,7 +114,7 @@ import org.apache.qpid.transport.network
 
 public class ServerSession extends Session
         implements AuthorizationHolder,
-                   AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+                   AMQSessionModel<ServerSession>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
                    Deletable<ServerSession>
 
 {
@@ -192,7 +193,7 @@ public class ServerSession extends Sessi
             @Override
             public void doTimeoutAction(String reason)
             {
-                getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+                getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
             }
         }, getVirtualHost());
 
@@ -275,7 +276,7 @@ public class ServerSession extends Sessi
                                      message.getInitialRoutingAddress(),
                                      instanceProperties, _transaction, _checkCapacityAction
                                     );
-        getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
+        getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
         incrementOutstandingTxnsIfNecessary();
         incrementUncommittedMessageSize(message.getStoredMessage());
         return enqueues;
@@ -321,7 +322,7 @@ public class ServerSession extends Sessi
     public void sendMessage(MessageTransfer xfr,
                             Runnable postIdSettingAction)
     {
-        getConnectionModel().registerMessageDelivered(xfr.getBodySize());
+        getAMQPConnection().registerMessageDelivered(xfr.getBodySize());
         invoke(xfr, postIdSettingAction);
     }
 
@@ -787,9 +788,10 @@ public class ServerSession extends Sessi
         return _id;
     }
 
-    public ServerConnection getConnectionModel()
+    @Override
+    public AMQPConnection<?> getAMQPConnection()
     {
-        return getConnection();
+        return getConnection().getAmqpConnection();
     }
 
     public String getClientID()
@@ -897,7 +899,7 @@ public class ServerSession extends Sessi
                             ? getConnection().getConnectionId()
                             : -1;
 
-        String remoteAddress = String.valueOf(getConnection().getRemoteAddress());
+        String remoteAddress = String.valueOf(getConnection().getRemoteSocketAddress());
         return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
                                     connectionId,
@@ -1188,13 +1190,13 @@ public class ServerSession extends Sessi
     @Override
     public void addTicker(final Ticker ticker)
     {
-        getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker);
+        getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
     }
 
     @Override
     public void removeTicker(final Ticker ticker)
     {
-        getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker);
+        getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
     }
 
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sat Jun 27 21:13:25 2015
@@ -255,7 +255,7 @@ public class ServerSessionDelegate exten
                 }
                 else
                 {
-                    ProtocolEngine protocolEngine = getServerConnection(session).getProtocolEngine();
+                    ProtocolEngine protocolEngine = getServerConnection(session).getAmqpConnection();
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, protocolEngine);
 
                     FilterManager filterManager = null;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Sat Jun 27 21:13:25 2015
@@ -25,7 +25,8 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import javax.security.auth.Subject;
+
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
 import org.apache.qpid.server.model.Broker;
@@ -80,15 +81,17 @@ public class ServerSessionTest extends Q
         AmqpPort amqpPort = createMockPort(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
 
         ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
-        final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class);
-        connection.setProtocolEngine(protocolEngine);
+        final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
+        Subject subject = new Subject();
+        when(protocolEngine.getSubject()).thenReturn(subject);
+        connection.setAmqpConnection(protocolEngine);
         connection.setVirtualHost(_virtualHost);
         ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
                 new Binary(getName().getBytes()), 0);
 
         // create a session with the same name but on a different connection
         ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
-        connection2.setProtocolEngine(protocolEngine);
+        connection2.setAmqpConnection(protocolEngine);
         connection2.setVirtualHost(_virtualHost);
         ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
                 new Binary(getName().getBytes()), 0);
@@ -105,8 +108,11 @@ public class ServerSessionTest extends Q
         AmqpPort port = createMockPort(1024);
 
         ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
-        final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class);
-        connection.setProtocolEngine(protocolEngine);
+        final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
+        Subject subject = new Subject();
+        when(protocolEngine.getSubject()).thenReturn(subject);
+
+        connection.setAmqpConnection(protocolEngine);
         connection.setVirtualHost(_virtualHost);
         final List<Method> invokedMethods = new ArrayList<>();
         ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org