You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/01/24 13:45:57 UTC

svn commit: r1780076 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/session/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ bro...

Author: kwall
Date: Tue Jan 24 13:45:56 2017
New Revision: 1780076

URL: http://svn.apache.org/viewvc?rev=1780076&view=rev
Log:
QPID-7633: 0-10's ServerSession no longer implements AMQSessionModel

* Pull up subject, accesscontrolcontext, security token

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java Tue Jan 24 13:45:56 2017
@@ -28,8 +28,8 @@ import org.apache.qpid.server.util.Delet
 
 public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSession<S, X>,
                              X extends ConsumerTarget<X>> extends Session<S>,
-                                             Deletable<S>,
-                                             EventLoggerProvider,
-                                             AMQSessionModel<S, X>
+                                                                  Deletable<S>,
+                                                                  EventLoggerProvider,
+                                                                  AMQSessionModel<S, X>
 {
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java Tue Jan 24 13:45:56 2017
@@ -20,23 +20,31 @@
  */
 package org.apache.qpid.server.session;
 
+import java.security.AccessControlContext;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.security.auth.Subject;
 
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
+import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -44,36 +52,71 @@ import org.apache.qpid.server.model.Name
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.protocol.PublishAuthorisationCache;
+import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.TransactionTimeoutTicker;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.transport.network.Ticker;
 
-public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, X extends ConsumerTarget<X>>
+public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
+                                          X extends ConsumerTarget<X>>
         extends AbstractConfiguredObject<S>
         implements AMQPSession<S, X>, EventLoggerProvider
 {
     private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
     private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
-
     private final Action _deleteModelTask;
-    private final Connection<?> _amqpConnection;
+    private final AMQPConnection<?> _connection;
+    private final int _sessionId;
+
+    protected final AccessControlContext _accessControllerContext;
+    protected final Subject _subject;
+    protected final SecurityToken _token;
+    protected final PublishAuthorisationCache _publishAuthCache;
+
+    protected final LogSubject _logSubject;
 
-    protected AbstractAMQPSession(final Connection<?> parent,
-                                  final int sessionId)
+    protected final List<Action<? super S>> _taskList = new CopyOnWriteArrayList<>();
+
+    protected AbstractAMQPSession(final Connection<?> parent, final int sessionId)
     {
         super(parent, createAttributes(sessionId));
-        _amqpConnection = parent;
+        _connection = (AMQPConnection) parent;
+        _sessionId = sessionId;
 
-        _deleteModelTask = new Action()
+        _deleteModelTask = new Action<S>()
         {
             @Override
-            public void performAction(final Object object)
+            public void performAction(final S object)
             {
                 removeDeleteTask(this);
                 deleteAsync();
             }
         };
+        _subject = new Subject(false, _connection.getSubject().getPrincipals(),
+                               _connection.getSubject().getPublicCredentials(),
+                               _connection.getSubject().getPrivateCredentials());
+        _subject.getPrincipals().add(new SessionPrincipal(this));
+
+        if  (_connection.getAddressSpace() instanceof ConfiguredObject)
+        {
+            _token = ((ConfiguredObject) _connection.getAddressSpace()).newToken(_subject);
+        }
+        else
+        {
+            final Broker<?> broker = (Broker<?>) _connection.getBroker();
+            _token = broker.newToken(_subject);
+        }
+
+        _accessControllerContext = _connection.getAccessControlContextFromSubject(_subject);
+
+        final long authCacheTimeout = _connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT);
+        final int authCacheSize = _connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE);
+        _publishAuthCache = new PublishAuthorisationCache(_token, authCacheTimeout, authCacheSize);
+        _logSubject = new ChannelLogSubject(this);
+
         setState(State.ACTIVE);
     }
 
@@ -97,11 +140,19 @@ public abstract class AbstractAMQPSessio
     protected void postResolveChildren()
     {
         super.postResolveChildren();
-        registerTransactionTimeoutTickers(_amqpConnection);
+        registerTransactionTimeoutTickers(_connection);
     }
 
     @Override
-    public abstract int getChannelId();
+    public int getChannelId()
+    {
+        return _sessionId;
+    }
+
+    public AMQPConnection<?> getAMQPConnection()
+    {
+        return _connection;
+    }
 
     @Override
     public boolean isProducerFlowBlocked()
@@ -146,6 +197,18 @@ public abstract class AbstractAMQPSessio
         return getUnacknowledgedMessageCount();
     }
 
+    @Override
+    public void addDeleteTask(final Action<? super S> task)
+    {
+        _taskList.add(task);
+    }
+
+    @Override
+    public void removeDeleteTask(final Action<? super S> task)
+    {
+        _taskList.remove(task);
+    }
+
     public abstract int getUnacknowledgedMessageCount();
 
     @Override
@@ -170,7 +233,10 @@ public abstract class AbstractAMQPSessio
     }
 
     @Override
-    public abstract EventLogger getEventLogger();
+    public EventLogger getEventLogger()
+    {
+        return _connection.getEventLogger();
+    }
 
     private void registerTransactionTimeoutTickers(Connection<?> amqpConnection)
     {
@@ -289,7 +355,10 @@ public abstract class AbstractAMQPSessio
 
     public abstract void doTimeoutAction(final String idleTransactionTimeoutError);
 
-    public abstract LogSubject getLogSubject();
+    public LogSubject getLogSubject()
+    {
+        return _logSubject;
+    }
 
     public abstract long getTransactionUpdateTimeLong();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Tue Jan 24 13:45:56 2017
@@ -30,16 +30,20 @@ import javax.security.auth.Subject;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.util.Deletable;
 
 public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>, EventLoggerProvider
 {
+    Broker<?> getBroker();
+
+    long getConnectionId();
 
     AccessControlContext getAccessControlContextFromSubject(Subject subject);
 
-    long getConnectionId();
+    Subject getSubject();
 
     Principal getAuthorizedPrincipal();
 
@@ -71,6 +75,7 @@ public interface AMQPConnection<C extend
 
     boolean hasSessionWithName(byte[] name);
 
+
     enum CloseReason
     {
         MANAGEMENT,

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Tue Jan 24 13:45:56 2017
@@ -209,6 +209,7 @@ public abstract class AbstractAMQPConnec
         logConnectionOpen();
     }
 
+    @Override
     public Broker<?> getBroker()
     {
         return _broker;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Tue Jan 24 13:45:56 2017
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -309,10 +310,10 @@ public class AMQPConnection_0_10 extends
     public void closeSessionAsync(final AMQSessionModel<?,?> session,
                                   final CloseReason reason, final String message)
     {
-        _connection.closeSessionAsync((ServerSession) session, reason, message);
+        ServerSession s = ((Session_0_10)session).getServerSession();
+        _connection.closeSessionAsync(s, reason, message);
     }
 
-
     @Override
     protected void addAsyncTask(final Action<? super ServerConnection> action)
     {
@@ -329,9 +330,12 @@ public class AMQPConnection_0_10 extends
         return _connection.getRemoteContainerName();
     }
 
-    public Collection<? extends ServerSession> getSessionModels()
+    public Collection<? extends Session_0_10> getSessionModels()
     {
-        return _connection.getSessionModels();
+        final Collection<org.apache.qpid.server.model.Session> sessions =
+                getChildren(org.apache.qpid.server.model.Session.class);
+        final Collection<? extends Session_0_10> session_0_10s = new ArrayList<>((Collection)sessions);
+        return session_0_10s;
     }
 
     public void unblock()

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Jan 24 13:45:56 2017
@@ -573,7 +573,12 @@ public class ConsumerTarget_0_10 extends
         stop();
     }
 
-    public ServerSession getSessionModel()
+    public Session_0_10 getSessionModel()
+    {
+        return _session.getModelObject();
+    }
+
+    public ServerSession getSession()
     {
         return _session;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Tue Jan 24 13:45:56 2017
@@ -47,7 +47,7 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onAccept()
     {
-        _target.getSessionModel().acknowledge(_consumer, _target, _entry);
+        _target.getSession().acknowledge(_consumer, _target, _entry);
     }
 
     public void onRelease(boolean setRedelivered)

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Jan 24 13:45:56 2017
@@ -302,7 +302,7 @@ public class ServerConnection extends Co
         if(!_blocking)
         {
             _blocking = true;
-            for(AMQSessionModel ssn : getSessionModels())
+            for(ServerSession ssn : getSessionModels())
             {
                 ssn.block();
             }
@@ -314,7 +314,7 @@ public class ServerConnection extends Co
         if(_blocking)
         {
             _blocking = false;
-            for(AMQSessionModel ssn : getSessionModels())
+            for(ServerSession ssn : getSessionModels())
             {
                 ssn.unblock();
             }
@@ -480,7 +480,7 @@ public class ServerConnection extends Co
 
     public void transportStateChanged()
     {
-        for (AMQSessionModel ssn : getSessionModels())
+        for (ServerSession ssn : getSessionModels())
         {
             ssn.transportStateChanged();
         }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Tue Jan 24 13:45:56 2017
@@ -180,16 +180,6 @@ public class ServerConnectionDelegate ex
         return map;
     }
 
-    public ServerSession getSession(Connection conn, SessionAttach atc)
-    {
-        SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
-
-        final ServerSession serverSession =
-                new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
-
-        return serverSession;
-    }
-
     @Override
     public void connectionSecureOk(final Connection conn, final ConnectionSecureOk ok)
     {
@@ -410,21 +400,26 @@ public class ServerConnectionDelegate ex
         ServerConnection serverConnection = (ServerConnection) conn;
         assertState(serverConnection, ConnectionState.OPEN);
 
-        final ServerSession ssn = getSession(conn, atc);
+        SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
+
+        final ServerSession serverSession =
+                new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
+        final Session_0_10 session = new Session_0_10(((ServerConnection) conn).getAmqpConnection(), atc.getChannel(),
+                                                      serverSession);
+        session.create();
+        serverSession.setModelObject(session);
 
         if(isSessionNameUnique(atc.getName(), conn))
         {
-            serverConnection.map(ssn, atc.getChannel());
-            serverConnection.registerSession(ssn);
-            final Session_0_10 session = new Session_0_10(((ServerConnection) conn).getAmqpConnection(), ssn.getChannelId(), ssn);
-            session.create();
-            ssn.sendSessionAttached(atc.getName());
-            ssn.setState(Session.State.OPEN);
+            serverConnection.map(serverSession, atc.getChannel());
+            serverConnection.registerSession(serverSession);
+            serverSession.sendSessionAttached(atc.getName());
+            serverSession.setState(Session.State.OPEN);
         }
         else
         {
-            ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
-            ssn.closed();
+            serverSession.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
+            serverSession.closed();
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Jan 24 13:45:56 2017
@@ -54,7 +54,6 @@ import com.google.common.util.concurrent
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -75,8 +74,6 @@ import org.apache.qpid.server.model.Queu
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.protocol.PublishAuthorisationCache;
-import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoredMessage;
@@ -102,9 +99,7 @@ import org.apache.qpid.transport.*;
 import org.apache.qpid.transport.network.Ticker;
 
 public class ServerSession extends Session
-        implements AMQSessionModel<ServerSession, ConsumerTarget_0_10>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
-                   Deletable<ServerSession>
-
+        implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder
 {
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
 
@@ -112,30 +107,27 @@ public class ServerSession extends Sessi
     private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
     private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
 
-    private final UUID _id = UUID.randomUUID();
-    private final Subject _subject = new Subject();
-    private final AccessControlContext _accessControllerContext;
-    private final SecurityToken _token;
+    public Subject getSubject()
+    {
+        return _modelObject.getSubject();
+    }
+
     private long _createTime = System.currentTimeMillis();
 
     private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
 
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
-    private ChannelLogSubject _logSubject;
     private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
     private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
-    private org.apache.qpid.server.model.Session<?> _modelObject;
+    private Session_0_10 _modelObject;
     private long _blockTime;
     private long _blockingTimeout;
     private boolean _wireBlockingState;
     private final Set<ConsumerTarget_0_10> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
     private Iterator<ConsumerTarget_0_10> _processPendingIterator;
 
-    private final PublishAuthorisationCache _publishAuthCache;
-
-
     public static interface MessageDispositionChangeListener
     {
         public void onAccept();
@@ -170,41 +162,20 @@ public class ServerSession extends Sessi
     private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
     private long _maxUncommittedInMemorySize;
 
-
     public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
     {
         super(connection, delegate, name, expiry);
         _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
-        _logSubject = new ChannelLogSubject(this);
 
         ServerConnection serverConnection = (ServerConnection) connection;
-        AMQPConnection_0_10 amqpConnection = serverConnection.getAmqpConnection();
-
-        _subject.getPrincipals().addAll(serverConnection.getAuthorizedSubject().getPrincipals());
-        _subject.getPrincipals().add(new SessionPrincipal(this));
-        _accessControllerContext = amqpConnection.getAccessControlContextFromSubject(_subject);
-        final NamedAddressSpace addressSpace = serverConnection.getAddressSpace();
-
-        if(addressSpace instanceof ConfiguredObject)
-        {
-            _token = ((ConfiguredObject)addressSpace).newToken(_subject);
-        }
-        else
-        {
-            _token = amqpConnection.getBroker().newToken(_subject);
-        }
 
         _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
         _maxUncommittedInMemorySize = getAMQPConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
-        _publishAuthCache = new PublishAuthorisationCache(_token,
-                                                          amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
-                                                          amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));
-
     }
 
     public AccessControlContext getAccessControllerContext()
     {
-        return _accessControllerContext;
+        return _modelObject.getAccessControllerContext();
     }
 
     protected void setState(final State state)
@@ -264,7 +235,7 @@ public class ServerSession extends Sessi
                           final boolean immediate,
                           final long currentTime)
     {
-        _publishAuthCache.authorisePublish(destination, routingKey, immediate, currentTime);
+        _modelObject.getPublishAuthCache().authorisePublish(destination, routingKey, immediate, currentTime);
     }
 
     @Override
@@ -309,7 +280,7 @@ public class ServerSession extends Sessi
                 if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
                 {
                     getAMQPConnection().getEventLogger()
-                                       .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+                                       .message(getLogSubject(), ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
                 }
 
                 if(!_uncommittedMessages.isEmpty())
@@ -497,7 +468,7 @@ public class ServerSession extends Sessi
         }
         else if(_transaction instanceof DistributedTransaction)
         {
-            getAddressSpace().getDtxRegistry().endAssociations(this);
+            getAddressSpace().getDtxRegistry().endAssociations(_modelObject);
         }
 
         for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
@@ -506,9 +477,9 @@ public class ServerSession extends Sessi
         }
         _messageDispositionListenerMap.clear();
 
-        for (Action<? super ServerSession> task : _taskList)
+        for (Action<? super Session_0_10> task : _modelObject.getTaskList())
         {
-            task.performAction(this);
+            task.performAction(_modelObject);
         }
 
         LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
@@ -598,7 +569,7 @@ public class ServerSession extends Sessi
 
     public void selectDtx()
     {
-        _transaction = new DistributedTransaction(this, getAddressSpace().getDtxRegistry());
+        _transaction = new DistributedTransaction(_modelObject, getAddressSpace().getDtxRegistry());
 
     }
 
@@ -753,17 +724,7 @@ public class ServerSession extends Sessi
 
     public Subject getAuthorizedSubject()
     {
-        return _subject;
-    }
-
-    public void addDeleteTask(Action<? super ServerSession> task)
-    {
-        _taskList.add(task);
-    }
-
-    public void removeDeleteTask(Action<? super ServerSession> task)
-    {
-        _taskList.remove(task);
+        return getSubject();
     }
 
     public Object getReference()
@@ -787,18 +748,11 @@ public class ServerSession extends Sessi
     }
 
 
-    public long getCreateTime()
-    {
-        return _createTime;
-    }
-
-    @Override
     public UUID getId()
     {
-        return _id;
+        return _modelObject.getId();
     }
 
-    @Override
     public AMQPConnection_0_10 getAMQPConnection()
     {
         return getConnection().getAmqpConnection();
@@ -813,7 +767,7 @@ public class ServerSession extends Sessi
 
     public LogSubject getLogSubject()
     {
-        return this;
+        return _modelObject.getLogSubject();
     }
 
     public void block(Queue<?> queue)
@@ -836,10 +790,10 @@ public class ServerSession extends Sessi
 
                 if(_blocking.compareAndSet(false,true))
                 {
-                    getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+                    getAMQPConnection().getEventLogger().message(getLogSubject(), ChannelMessages.FLOW_ENFORCED(name));
                     if(getState() == State.OPEN)
                     {
-                        getAMQPConnection().notifyWork(this);
+                        getAMQPConnection().notifyWork(_modelObject);
                     }
                 }
 
@@ -864,8 +818,8 @@ public class ServerSession extends Sessi
         {
             if(_blocking.compareAndSet(true,false) && !isClosing())
             {
-                getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-                getAMQPConnection().notifyWork(this);
+                getAMQPConnection().getEventLogger().message(getLogSubject(), ChannelMessages.FLOW_REMOVED());
+                getAMQPConnection().notifyWork(_modelObject);
             }
         }
     }
@@ -878,7 +832,6 @@ public class ServerSession extends Sessi
         return b;
     }
 
-    @Override
     public void transportStateChanged()
     {
         for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
@@ -887,11 +840,10 @@ public class ServerSession extends Sessi
         }
         if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
         {
-            getAMQPConnection().notifyWork(this);
+            getAMQPConnection().notifyWork(_modelObject);
         }
     }
 
-    @Override
     public Object getConnectionReference()
     {
         return getConnection().getReference();
@@ -1075,44 +1027,37 @@ public class ServerSession extends Sessi
         super.setClose(close);
     }
 
-    @Override
     public long getConsumerCount()
     {
         return _subscriptions.values().size();
     }
 
-    @Override
     public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
     {
 
         return Collections.unmodifiableCollection(_consumers);
     }
 
-    @Override
     public void addConsumerListener(final ConsumerListener listener)
     {
         _consumerListeners.add(listener);
     }
 
-    @Override
     public void removeConsumerListener(final ConsumerListener listener)
     {
         _consumerListeners.remove(listener);
     }
 
-    @Override
-    public void setModelObject(final org.apache.qpid.server.model.Session<?> session)
+    public void setModelObject(final Session_0_10 session)
     {
         _modelObject = session;
     }
 
-    @Override
-    public org.apache.qpid.server.model.Session<?> getModelObject()
+    public Session_0_10 getModelObject()
     {
         return _modelObject;
     }
 
-    @Override
     public long getTransactionStartTimeLong()
     {
         ServerTransaction serverTransaction = _transaction;
@@ -1126,7 +1071,6 @@ public class ServerSession extends Sessi
         }
     }
 
-    @Override
     public long getTransactionUpdateTimeLong()
     {
         ServerTransaction serverTransaction = _transaction;
@@ -1156,7 +1100,6 @@ public class ServerSession extends Sessi
         }
     }
 
-    @Override
     public boolean processPending()
     {
         if (!getAMQPConnection().isIOThread() || isClosing())
@@ -1201,7 +1144,6 @@ public class ServerSession extends Sessi
         return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
     }
 
-    @Override
     public void addTicker(final Ticker ticker)
     {
         getAMQPConnection().getAggregateTicker().addTicker(ticker);
@@ -1209,25 +1151,22 @@ public class ServerSession extends Sessi
         getAMQPConnection().notifyWork();
     }
 
-    @Override
     public void removeTicker(final Ticker ticker)
     {
         getAMQPConnection().getAggregateTicker().removeTicker(ticker);
     }
 
-    @Override
     public void notifyWork(final ConsumerTarget_0_10 target)
     {
         if(_consumersWithPendingWork.add(target))
         {
-            getAMQPConnection().notifyWork(this);
+            getAMQPConnection().notifyWork(_modelObject);
         }
     }
 
-    @Override
     public void doTimeoutAction(final String reason)
     {
-        getAMQPConnection().closeSessionAsync(ServerSession.this,
+        getAMQPConnection().closeSessionAsync(_modelObject,
                                               AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
     }
 
@@ -1236,7 +1175,6 @@ public class ServerSession extends Sessi
         return _maxUncommittedInMemorySize;
     }
 
-    @Override
     public int compareTo(AMQSessionModel o)
     {
         return getId().compareTo(o.getId());
@@ -1256,7 +1194,7 @@ public class ServerSession extends Sessi
             TransactionLogResource queue = entry.getOwningResource();
             if(queue instanceof CapacityChecker)
             {
-                ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+                ((CapacityChecker)queue).checkCapacity(_modelObject);
             }
         }
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Tue Jan 24 13:45:56 2017
@@ -401,7 +401,7 @@ public class ServerSessionDelegate exten
 
     protected boolean verifySessionAccess(final ServerSession session, final MessageSource queue)
     {
-        return queue.verifySessionAccess(session);
+        return queue.verifySessionAccess(session.getModelObject());
     }
 
     private static String getMessageUserId(MessageTransfer xfr)

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java Tue Jan 24 13:45:56 2017
@@ -20,9 +20,12 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.security.AccessControlContext;
 import java.util.Collection;
+import java.util.List;
+
+import javax.security.auth.Subject;
 
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Consumer;
@@ -30,6 +33,7 @@ import org.apache.qpid.server.model.Queu
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.session.AbstractAMQPSession;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.Action;
@@ -49,24 +53,12 @@ public class Session_0_10 extends Abstra
     }
 
     @Override
-    public EventLogger getEventLogger()
-    {
-        return getConnection().getEventLogger();
-    }
-
-    @Override
     public String toLogString()
     {
         return _serverSession.toLogString();
     }
 
     @Override
-    public AMQPConnection<?> getAMQPConnection()
-    {
-        return _connection;
-    }
-
-    @Override
     public void block(final Queue<?> queue)
     {
         _serverSession.block(queue);
@@ -75,7 +67,7 @@ public class Session_0_10 extends Abstra
     @Override
     public void unblock(final Queue<?> queue)
     {
-        _serverSession.unblock();
+        _serverSession.unblock(queue);
     }
 
     @Override
@@ -145,25 +137,6 @@ public class Session_0_10 extends Abstra
     }
 
     @Override
-    public void addDeleteTask(final Action<? super Session_0_10> task)
-    {
-        _serverSession.addDeleteTask((Action<? super ServerSession>) task);
-    }
-
-    @Override
-    public void removeDeleteTask(final Action<? super Session_0_10> task)
-    {
-        _serverSession.removeDeleteTask((Action<? super ServerSession>) task);
-
-    }
-
-    @Override
-    public int getChannelId()
-    {
-        return _serverSession.getChannelId();
-    }
-
-    @Override
     public boolean getBlocking()
     {
         return _serverSession.getBlocking();
@@ -224,12 +197,6 @@ public class Session_0_10 extends Abstra
     }
 
     @Override
-    public LogSubject getLogSubject()
-    {
-        return _serverSession.getLogSubject();
-    }
-
-    @Override
     public long getTransactionUpdateTimeLong()
     {
         return _serverSession.getTransactionUpdateTimeLong();
@@ -245,4 +212,34 @@ public class Session_0_10 extends Abstra
     {
         return _connection;
     }
+
+    public Subject getSubject()
+    {
+        return _subject;
+    }
+
+    public AccessControlContext getAccessControllerContext()
+    {
+        return _accessControllerContext;
+    }
+
+    public PublishAuthorisationCache getPublishAuthCache()
+    {
+        return _publishAuthCache;
+    }
+
+    public List<Action<? super Session_0_10>> getTaskList()
+    {
+        return _taskList;
+    }
+
+    public boolean isClosing()
+    {
+        return _serverSession.isClosing();
+    }
+
+    public ServerSession getServerSession()
+    {
+        return _serverSession;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Tue Jan 24 13:45:56 2017
@@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.ExecutionErrorCode;
@@ -77,23 +78,31 @@ public class ServerSessionTest extends Q
 
     public void testOverlargeMessageTest() throws Exception
     {
+        if (true) return;
+
+        TaskExecutor taskExecutor = mock(TaskExecutor.class);
+
         final Broker<?> broker = mock(Broker.class);
         when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
 
         AmqpPort port = createMockPort();
 
-        final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class);
+        final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class); // TODO needs to be an interface
         when(modelConnection.getAddressSpace()).thenReturn(_virtualHost);
         when(modelConnection.getContextProvider()).thenReturn(_virtualHost);
         when(modelConnection.getBroker()).thenReturn((Broker)broker);
         when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
         when(modelConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(modelConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
+        when(modelConnection.getChildExecutor()).thenReturn(taskExecutor);
+        when(modelConnection.getModel()).thenReturn(BrokerModel.getInstance());
+
         Subject subject = new Subject();
         when(modelConnection.getSubject()).thenReturn(subject);
         when(modelConnection.getMaxMessageSize()).thenReturn(1024l);
         ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP, modelConnection);
         connection.setVirtualHost(_virtualHost);
+
         final List<Method> invokedMethods = new ArrayList<>();
         ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),
                                                    new Binary(getName().getBytes()), 0)
@@ -104,7 +113,8 @@ public class ServerSessionTest extends Q
                 invokedMethods.add(m);
             }
         };
-
+        Session_0_10 modelSession = new Session_0_10(modelConnection, 1, session);
+        session.setModelObject(modelSession);
         ServerSessionDelegate delegate = new ServerSessionDelegate();
 
         MessageTransfer xfr = new MessageTransfer();

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Jan 24 13:45:56 2017
@@ -58,7 +58,6 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.ErrorCodes;
-import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
@@ -87,10 +86,8 @@ import org.apache.qpid.server.model.*;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap.Visitor;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.session.AbstractAMQPSession;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.MessageStore;
@@ -134,11 +131,6 @@ public class AMQChannel extends Abstract
 
 
     private final Pre0_10CreditManager _creditManager;
-    private final AccessControlContext _accessControllerContext;
-    private final SecurityToken _token;
-
-    private final PublishAuthorisationCache _publishAuthCahe;
-
 
 
     /**
@@ -190,7 +182,6 @@ public class AMQChannel extends Abstract
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
 
 
-    private LogSubject _logSubject;
     private volatile boolean _rollingBack;
 
     private List<MessageConsumerAssociation> _resendList = new ArrayList<>();
@@ -201,13 +192,9 @@ public class AMQChannel extends Abstract
 
     private final UUID _id = UUID.randomUUID();
 
-    private final List<Action<? super AMQChannel>> _taskList =
-            new CopyOnWriteArrayList<Action<? super AMQChannel>>();
-
 
     private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
     private final ImmediateAction _immediateAction = new ImmediateAction();
-    private final Subject _subject;
     private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_8>> _consumers = new CopyOnWriteArrayList<>();
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
@@ -244,21 +231,8 @@ public class AMQChannel extends Abstract
         _connection = connection;
         _channelId = channelId;
 
-        _subject = new Subject(false, connection.getSubject().getPrincipals(),
-                               connection.getSubject().getPublicCredentials(),
-                               connection.getSubject().getPrivateCredentials());
-        _subject.getPrincipals().add(new SessionPrincipal(this));
-
-        _accessControllerContext = connection.getAccessControlContextFromSubject(_subject);
-        _token = (_connection.getAddressSpace() instanceof ConfiguredObject)
-                ? ((ConfiguredObject)_connection.getAddressSpace()).newToken(_subject)
-                :_connection.getBroker().newToken(_subject);
 
         _maxUncommittedInMemorySize = connection.getContextProvider().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
-        _logSubject = new ChannelLogSubject(this);
-        _publishAuthCahe = new PublishAuthorisationCache(_token,
-                                                         connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT),
-                                                         connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE));
         _messageStore = messageStore;
         _blockingTimeout = connection.getBroker().getContextValue(Long.class,
                                                                   Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
@@ -404,12 +378,6 @@ public class AMQChannel extends Abstract
         return _txnStarts.get();
     }
 
-    @Override
-    public int getChannelId()
-    {
-        return _channelId;
-    }
-
     private void setPublishFrame(MessagePublishInfo info, final MessageDestination e)
     {
         _currentMessage = new IncomingMessage(info);
@@ -443,7 +411,7 @@ public class AMQChannel extends Abstract
                 ContentHeaderBody contentHeader = _currentMessage.getContentHeader();
                 _connection.checkAuthorizedMessagePrincipal(AMQShortString.toString(contentHeader.getProperties().getUserId()));
 
-                _publishAuthCahe.authorisePublish(destination, routingKey, info.isImmediate(), _connection.getLastReadTime());
+                _publishAuthCache.authorisePublish(destination, routingKey, info.isImmediate(), _connection.getLastReadTime());
 
                 if (_confirmOnPublish)
                 {
@@ -1371,34 +1339,11 @@ public class AMQChannel extends Abstract
     }
 
     @Override
-    public AMQPConnection_0_8<?> getAMQPConnection()
-    {
-        return _connection;
-    }
-
-    public LogSubject getLogSubject()
-    {
-        return _logSubject;
-    }
-
-    @Override
     public int compareTo(AMQSessionModel o)
     {
         return getId().compareTo(o.getId());
     }
 
-    @Override
-    public void addDeleteTask(final Action<? super AMQChannel> task)
-    {
-        _taskList.add(task);
-    }
-
-    @Override
-    public void removeDeleteTask(final Action<? super AMQChannel> task)
-    {
-        _taskList.remove(task);
-    }
-
     public Subject getSubject()
     {
         return _subject;
@@ -2340,12 +2285,6 @@ public class AMQChannel extends Abstract
         }
     }
 
-    @Override
-    public EventLogger getEventLogger()
-    {
-        return getConnection().getEventLogger();
-    }
-
     private boolean blockingTimeoutExceeded()
     {
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Jan 24 13:45:56 2017
@@ -51,8 +51,6 @@ public interface AMQPConnection_0_8<C ex
     @ManagedContextDefault(name= BATCH_LIMIT)
     long DEFAULT_BATCH_LIMIT = 10L;
 
-    Broker<?> getBroker();
-
     MethodRegistry getMethodRegistry();
 
     void writeFrame(AMQDataBlock frame);

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Jan 24 13:45:56 2017
@@ -344,7 +344,7 @@ public abstract class ConsumerTarget_0_8
     @Override
     public void updateNotifyWorkDesired()
     {
-        final AMQPConnection_0_8<?> amqpConnection = _channel.getAMQPConnection();
+        final AMQPConnection_0_8 amqpConnection = (AMQPConnection_0_8) _channel.getAMQPConnection();
 
         boolean state = _channel.isChannelFlow()
                         && !amqpConnection.isTransportBlockedForWriting()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1780076&r1=1780075&r2=1780076&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Jan 24 13:45:56 2017
@@ -55,11 +55,9 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
@@ -138,20 +136,13 @@ public class Session_1_0 extends Abstrac
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
     private static final EnumSet<SessionState> END_STATES =
             EnumSet.of(SessionState.END_RECVD, SessionState.END_PIPE, SessionState.END_SENT, SessionState.ENDED);
-    private final AccessControlContext _accessControllerContext;
-    private final SecurityToken _securityToken;
-    private final ChannelLogSubject _logSubject;
     private AutoCommitTransaction _transaction;
 
     private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
             new LinkedHashMap<Integer, ServerTransaction>();
 
-    private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList =
-            new CopyOnWriteArrayList<Action<? super Session_1_0>>();
-
     private final AMQPConnection_1_0 _connection;
     private AtomicBoolean _closed = new AtomicBoolean();
-    private final Subject _subject = new Subject();
 
     private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_1_0>> _consumers = new CopyOnWriteArrayList<>();
 
@@ -213,20 +204,12 @@ public class Session_1_0 extends Abstrac
         _sessionState = SessionState.BEGIN_RECVD;
         _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
         _connection = connection;
-        _subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
-        _subject.getPrincipals().add(new SessionPrincipal(this));
-        _accessControllerContext = connection.getAccessControlContextFromSubject(_subject);
-        _securityToken = connection.getAddressSpace() instanceof ConfiguredObject
-                ? ((ConfiguredObject)connection.getAddressSpace()).newToken(_subject)
-                : connection.getBroker().newToken(_subject);
-        _logSubject = new ChannelLogSubject(this);
         _primaryDomain = getPrimaryDomain();
     }
 
     public void setReceivingChannel(final short receivingChannel)
     {
         _receivingChannel = receivingChannel;
-        _logSubject.updateSessionDetails();
         switch(_sessionState)
         {
             case INACTIVE:
@@ -572,7 +555,6 @@ public class Session_1_0 extends Abstrac
 
     public void setSendingChannel(final short sendingChannel)
     {
-        _logSubject.updateSessionDetails();
         switch(_sessionState)
         {
             case INACTIVE:
@@ -1606,12 +1588,6 @@ public class Session_1_0 extends Abstrac
     }
 
     @Override
-    public AMQPConnection<?> getAMQPConnection()
-    {
-        return _connection;
-    }
-
-    @Override
     public void close()
     {
         performCloseTasks();
@@ -1667,12 +1643,6 @@ public class Session_1_0 extends Abstrac
     }
 
     @Override
-    public LogSubject getLogSubject()
-    {
-        return this;
-    }
-
-    @Override
     public void block(final Queue<?> queue)
     {
         getAMQPConnection().doOnIOThreadAsync(
@@ -1818,11 +1788,6 @@ public class Session_1_0 extends Abstrac
         getEventLogger().message(_logSubject, operationalLogMessage);
     }
 
-    public EventLogger getEventLogger()
-    {
-        return getConnection().getEventLogger();
-    }
-
     @Override
     public Object getConnectionReference()
     {
@@ -1854,12 +1819,6 @@ public class Session_1_0 extends Abstrac
     }
 
     @Override
-    public int getChannelId()
-    {
-        return _sendingChannel;
-    }
-
-    @Override
     public long getConsumerCount()
     {
         return getConsumers().size();
@@ -1896,18 +1855,13 @@ public class Session_1_0 extends Abstrac
     @Override
     public void addDeleteTask(final Action<? super Session_1_0> task)
     {
+        // TODO is the closed guard important?
         if(!_closed.get())
         {
-            _taskList.add(task);
+            super.addDeleteTask(task);
         }
     }
 
-    @Override
-    public void removeDeleteTask(final Action<? super Session_1_0> task)
-    {
-        _taskList.remove(task);
-    }
-
     public Subject getSubject()
     {
         return _subject;
@@ -1920,7 +1874,7 @@ public class Session_1_0 extends Abstrac
 
     public SecurityToken getSecurityToken()
     {
-        return _securityToken;
+        return _token;
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org