You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/03/25 16:22:35 UTC

svn commit: r1736588 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpi...

Author: lquack
Date: Fri Mar 25 15:22:34 2016
New Revision: 1736588

URL: http://svn.apache.org/viewvc?rev=1736588&view=rev
Log:
QPID-7162: [Java Broker] Refactor AbstractAMQPConnection and subclasses

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Fri Mar 25 15:22:34 2016
@@ -63,6 +63,7 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.model.adapter.SessionAdapter;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.network.AggregateTicker;
@@ -109,6 +110,7 @@ public abstract class AbstractAMQPConnec
     private volatile boolean _messageAuthorizationRequired;
 
     private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
+    private volatile int _messageCompressionThreshold;
 
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
@@ -177,6 +179,7 @@ public abstract class AbstractAMQPConnec
         _aggregateTicker.addTicker(slowConnectionOpenTicker);
         _lastReadTime = _lastWriteTime = getCreatedTime();
 
+        logConnectionOpen();
     }
 
     public Broker<?> getBroker()
@@ -364,7 +367,7 @@ public abstract class AbstractAMQPConnec
     }
 
 
-    protected void performDeleteTasks()
+    public void performDeleteTasks()
     {
         if(runningAsSubject())
         {
@@ -673,7 +676,7 @@ public abstract class AbstractAMQPConnec
                 getSubject());
     }
 
-    protected void logConnectionOpen()
+    private void logConnectionOpen()
     {
         runAsSubject(new PrivilegedAction<Object>()
         {
@@ -734,7 +737,18 @@ public abstract class AbstractAMQPConnec
         return _logSubject;
     }
 
-    protected abstract EventLogger getEventLogger();
+    public EventLogger getEventLogger()
+    {
+        final VirtualHost<?> virtualHost = getVirtualHost();
+        if (virtualHost != null)
+        {
+            return virtualHost.getEventLogger();
+        }
+        else
+        {
+            return getBroker().getEventLogger();
+        }
+    }
 
     @Override
     public final boolean isAuthorizedMessagePrincipal(final String userId)
@@ -748,6 +762,55 @@ public abstract class AbstractAMQPConnec
         return _virtualHost;
     }
 
+    public void setVirtualHost(VirtualHost<?> virtualHost)
+    {
+        associateVirtualHost(virtualHost);
+
+        _messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
+                                                                   Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+        if(_messageCompressionThreshold <= 0)
+        {
+            _messageCompressionThreshold = Integer.MAX_VALUE;
+        }
+
+        getSubject().getPrincipals().add(virtualHost.getPrincipal());
+
+        updateAccessControllerContext();
+        logConnectionOpen();
+    }
+
+    public int getMessageCompressionThreshold()
+    {
+        return _messageCompressionThreshold;
+    }
+
+    @Override
+    public String toString()
+    {
+        return getNetwork().getRemoteAddress() + "(" + ((getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName()) + ")");
+    }
+
+    public Principal getAuthorizedPrincipal()
+    {
+        return AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject());
+    }
+
+    public void setSubject(final Subject subject)
+    {
+        if (subject == null)
+        {
+            throw new IllegalArgumentException("subject cannot be null");
+        }
+
+        getSubject().getPrincipals().addAll(subject.getPrincipals());
+        getSubject().getPrivateCredentials().addAll(subject.getPrivateCredentials());
+        getSubject().getPublicCredentials().addAll(subject.getPublicCredentials());
+
+        updateAccessControllerContext();
+
+    }
+
+
     private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
     {
         private final long _allowedTime;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Fri Mar 25 15:22:34 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.security.AccessController;
-import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.util.Collections;
 import java.util.Iterator;
@@ -36,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.State;
@@ -63,8 +61,6 @@ public class AMQPConnection_0_10 extends
     private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
     private final ServerInputHandler _inputHandler;
 
-
-    private final ServerNetworkConnection _network;
     private final ServerConnection _connection;
 
     private volatile boolean _transportBlockedForWriting;
@@ -100,21 +96,19 @@ public class AMQPConnection_0_10 extends
 
         _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
         _connection.addFrameSizeObserver(_inputHandler);
-        _network = network;
 
         AccessController.doPrivileged(new PrivilegedAction<Object>()
         {
             @Override
             public Object run()
             {
-                _connection.setNetworkConnection(_network);
-                _disassembler = new ServerDisassembler(wrapSender(_network.getSender()), Constant.MIN_MAX_FRAME_SIZE);
+                _connection.setNetworkConnection(getNetwork());
+                _disassembler = new ServerDisassembler(wrapSender(getNetwork().getSender()), Constant.MIN_MAX_FRAME_SIZE);
                 _connection.setSender(_disassembler);
                 _connection.addFrameSizeObserver(_disassembler);
                 return null;
             }
         }, getAccessControllerContext());
-        logConnectionOpen();
     }
 
     private ByteBufferSender wrapSender(final ByteBufferSender sender)
@@ -200,7 +194,7 @@ public class AMQPConnection_0_10 extends
             public Object run()
             {
                 _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _connection.getConnectionDelegate().getState(), true));
-                _network.close();
+                getNetwork().close();
                 return null;
             }
         }, getAccessControllerContext());
@@ -209,7 +203,7 @@ public class AMQPConnection_0_10 extends
 
     public String getAddress()
     {
-        return _network.getRemoteAddress().toString();
+        return getNetwork().getRemoteAddress().toString();
     }
 
     @Override
@@ -238,12 +232,6 @@ public class AMQPConnection_0_10 extends
     }
 
     @Override
-    protected void performDeleteTasks()
-    {
-        super.performDeleteTasks();
-    }
-
-    @Override
     public boolean isTransportBlockedForWriting()
     {
         return _transportBlockedForWriting;
@@ -310,11 +298,6 @@ public class AMQPConnection_0_10 extends
         _connection.sendConnectionCloseAsync(cause, message);
     }
 
-    public Principal getAuthorizedPrincipal()
-    {
-        return _connection.getAuthorizedPrincipal();
-    }
-
     public void closeSessionAsync(final AMQSessionModel<?> session,
                                   final AMQConstant cause, final String message)
     {
@@ -345,16 +328,4 @@ public class AMQPConnection_0_10 extends
     {
         return _connection.getSessionCountLimit();
     }
-
-    @Override
-    protected EventLogger getEventLogger()
-    {
-        return _connection.getEventLogger();
-    }
-
-    @Override
-    public void logConnectionOpen()
-    {
-        super.logConnectionOpen();
-    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Mar 25 15:22:34 2016
@@ -49,7 +49,6 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -70,7 +69,6 @@ public class ServerConnection extends Co
     public static final long CLOSE_OK_TIMEOUT = 10000l;
     private final Broker<?> _broker;
 
-    private Principal _authorizedPrincipal = null;
     private final long _connectionId;
     private final Object _reference = new Object();
     private final AmqpPort<?> _port;
@@ -81,8 +79,6 @@ public class ServerConnection extends Co
     private final Queue<Action<? super ServerConnection>> _asyncTaskList =
             new ConcurrentLinkedQueue<>();
 
-    private int _messageCompressionThreshold;
-
     private final AMQPConnection_0_10 _amqpConnection;
     private boolean _ignoreFutureInput;
     private boolean _ignoreAllButConnectionCloseOk;
@@ -123,8 +119,7 @@ public class ServerConnection extends Co
 
     EventLogger getEventLogger()
     {
-        VirtualHost<?> virtualHost = getVirtualHost();
-        return virtualHost == null ? _broker.getEventLogger() : virtualHost.getEventLogger();
+        return _amqpConnection.getEventLogger();
     }
 
     @Override
@@ -132,11 +127,6 @@ public class ServerConnection extends Co
     {
         super.setState(state);
 
-        if (state == State.OPEN)
-        {
-            _amqpConnection.logConnectionOpen();
-        }
-
         if(state == State.CLOSING)
         {
             getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, (ServerNetworkConnection) getNetworkConnection()));
@@ -161,17 +151,7 @@ public class ServerConnection extends Co
 
     public void setVirtualHost(VirtualHost<?> virtualHost)
     {
-        _amqpConnection.associateVirtualHost(virtualHost);
-        _messageCompressionThreshold =
-                virtualHost.getContextValue(Integer.class,
-                                            Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
-
-        if(_messageCompressionThreshold <= 0)
-        {
-            _messageCompressionThreshold = Integer.MAX_VALUE;
-        }
-        _amqpConnection.getSubject().getPrincipals().add(virtualHost.getPrincipal());
-        _amqpConnection.updateAccessControllerContext();
+        _amqpConnection.setVirtualHost(virtualHost);
     }
 
     public AmqpPort<?> getPort()
@@ -372,29 +352,14 @@ public class ServerConnection extends Co
         return _amqpConnection.getSubject();
     }
 
-    /**
-     * Sets the authorized subject.  It also extracts the UsernamePrincipal from the subject
-     * and caches it for optimisation purposes.
-     *
-     * @param authorizedSubject
-     */
     public void setAuthorizedSubject(final Subject authorizedSubject)
     {
-        if (authorizedSubject == null)
-        {
-            _authorizedPrincipal = null;
-        }
-        else
-        {
-            getAuthorizedSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
-            _amqpConnection.updateAccessControllerContext();
-            _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject);
-        }
+        _amqpConnection.setSubject(authorizedSubject);
     }
 
     public Principal getAuthorizedPrincipal()
     {
-        return _authorizedPrincipal;
+        return _amqpConnection.getAuthorizedPrincipal();
     }
 
     public long getConnectionId()
@@ -508,7 +473,7 @@ public class ServerConnection extends Co
 
     public int getMessageCompressionThreshold()
     {
-        return _messageCompressionThreshold;
+        return _amqpConnection.getMessageCompressionThreshold();
     }
 
     public int getMaxMessageSize()

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Mar 25 15:22:34 2016
@@ -232,9 +232,9 @@ public class AMQChannel
         _connection = connection;
         _channelId = channelId;
 
-        _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(),
-                               connection.getAuthorizedSubject().getPublicCredentials(),
-                               connection.getAuthorizedSubject().getPrivateCredentials());
+        _subject = new Subject(false, connection.getSubject().getPrincipals(),
+                               connection.getSubject().getPublicCredentials(),
+                               connection.getSubject().getPrivateCredentials());
         _subject.getPrincipals().add(new SessionPrincipal(this));
 
         _accessControllerContext = org.apache.qpid.server.security.SecurityManager.getAccessControlContextFromSubject(_subject);

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Fri Mar 25 15:22:34 2016
@@ -45,7 +45,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
-import javax.security.auth.Subject;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
@@ -69,7 +68,6 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
@@ -78,7 +76,6 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
@@ -147,7 +144,6 @@ public class AMQPConnection_0_8
     private volatile int _maxFrameSize;
     private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
 
-    private final ServerNetworkConnection _network;
     private final ByteBufferSender _sender;
 
     private volatile boolean _deferFlush;
@@ -156,7 +152,6 @@ public class AMQPConnection_0_8
 
     private volatile boolean _closeWhenNoRoute;
     private volatile boolean _compressionSupported;
-    private volatile int _messageCompressionThreshold;
 
     /**
      * QPID-6744 - Older queue clients (<=0.32) set the nowait flag false on the queue.delete method and then
@@ -191,11 +186,8 @@ public class AMQPConnection_0_8
                 ? getBroker().getContextValue(String.class, Broker.SEND_QUEUE_DELETE_OK_REGARDLESS_CLIENT_VER_REGEXP): "";
         _sendQueueDeleteOkRegardlessClientVerRegexp = Pattern.compile(sendQueueDeleteOkRegardlessRegexp);
 
-        _network = network;
         _sender = network.getSender();
         _closeWhenNoRoute = getBroker().getConnection_closeWhenNoRoute();
-
-        logConnectionOpen();
     }
 
     @Override
@@ -505,15 +497,16 @@ public class AMQPConnection_0_8
 
     private void initHeartbeats(int delay)
     {
+        ServerNetworkConnection network = getNetwork();
         if (delay > 0)
         {
-            _network.setMaxWriteIdleMillis(1000L * delay);
-            _network.setMaxReadIdleMillis(1000L * BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
+            network.setMaxWriteIdleMillis(1000L * delay);
+            network.setMaxReadIdleMillis(1000L * BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
         }
         else
         {
-            _network.setMaxWriteIdleMillis(0);
-            _network.setMaxReadIdleMillis(0);
+            network.setMaxWriteIdleMillis(0);
+            network.setMaxReadIdleMillis(0);
         }
     }
 
@@ -588,7 +581,7 @@ public class AMQPConnection_0_8
                 finally
                 {
                     final long timeoutTime = System.currentTimeMillis() + CLOSE_OK_TIMEOUT;
-                    getAggregateTicker().addTicker(new ConnectionClosingTicker(timeoutTime, _network));
+                    getAggregateTicker().addTicker(new ConnectionClosingTicker(timeoutTime, getNetwork()));
                 }
             }
         }
@@ -596,18 +589,12 @@ public class AMQPConnection_0_8
 
     public void closeNetworkConnection()
     {
-        _network.close();
-    }
-
-    @Override
-    public String toString()
-    {
-        return _network.getRemoteAddress() + "(" + ((getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName()) + ")");
+        getNetwork().close();
     }
 
     private String getLocalFQDN()
     {
-        SocketAddress address = _network.getLocalAddress();
+        SocketAddress address = getNetwork().getLocalAddress();
         if (address instanceof InetSocketAddress)
         {
             return ((InetSocketAddress) address).getHostName();
@@ -708,57 +695,14 @@ public class AMQPConnection_0_8
         return getMethodRegistry();
     }
 
-    public void setVirtualHost(VirtualHost<?> virtualHost)
-    {
-        associateVirtualHost(virtualHost);
-
-        _messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
-                                                                   Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
-        if(_messageCompressionThreshold <= 0)
-        {
-            _messageCompressionThreshold = Integer.MAX_VALUE;
-        }
-
-        getSubject().getPrincipals().add(virtualHost.getPrincipal());
-
-        updateAccessControllerContext();
-        logConnectionOpen();
-    }
-
     public ProtocolOutputConverter getProtocolOutputConverter()
     {
         return _protocolOutputConverter;
     }
 
-    public void setAuthorizedSubject(final Subject authorizedSubject)
-    {
-        if (authorizedSubject == null)
-        {
-            throw new IllegalArgumentException("authorizedSubject cannot be null");
-        }
-
-        getSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
-        getSubject().getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
-        getSubject().getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
-
-        updateAccessControllerContext();
-
-    }
-
-    public Subject getAuthorizedSubject()
-    {
-        return getSubject();
-    }
-
-    public Principal getAuthorizedPrincipal()
-    {
-
-        return getSubject().getPrincipals(AuthenticatedPrincipal.class).size() == 0 ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(getSubject());
-    }
-
     public Principal getPeerPrincipal()
     {
-        return _network.getPeerPrincipal();
+        return getNetwork().getPeerPrincipal();
     }
 
     public MethodRegistry getMethodRegistry()
@@ -825,7 +769,7 @@ public class AMQPConnection_0_8
             public Object run()
             {
                 getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _state, true));
-                _network.close();
+                getNetwork().close();
                 return null;
             }
         }, getAccessControllerContext());
@@ -843,7 +787,7 @@ public class AMQPConnection_0_8
 
     public String getAddress()
     {
-        return String.valueOf(_network.getRemoteAddress());
+        return String.valueOf(getNetwork().getRemoteAddress());
     }
 
     public void closeSessionAsync(final AMQSessionModel<?> session, final AMQConstant cause, final String message)
@@ -1160,7 +1104,7 @@ public class AMQPConnection_0_8
                                                                 broker.getConnection_heartBeatDelay());
                 writeFrame(tuneBody.generateFrame(0));
                 _state = ConnectionState.AWAIT_TUNE_OK;
-                setAuthorizedSubject(authResult.getSubject());
+                setSubject(authResult.getSubject());
                 disposeSaslServer();
                 break;
             case CONTINUE:
@@ -1252,7 +1196,7 @@ public class AMQPConnection_0_8
 
                     case SUCCESS:
                         _logger.debug("Connected as: {}", authResult.getSubject());
-                        setAuthorizedSubject(authResult.getSubject());
+                        setSubject(authResult.getSubject());
 
                         int frameMax = getDefaultMaxFrameSize();
 
@@ -1377,31 +1321,12 @@ public class AMQPConnection_0_8
         return _compressionSupported && getBroker().isMessageCompressionEnabled();
     }
 
-    public int getMessageCompressionThreshold()
-    {
-        return _messageCompressionThreshold;
-    }
-
     private SubjectCreator getSubjectCreator()
     {
         return getPort().getAuthenticationProvider().getSubjectCreator(getTransport().isSecure());
     }
 
     @Override
-    public EventLogger getEventLogger()
-    {
-        final VirtualHost<?> virtualHost = getVirtualHost();
-        if (virtualHost != null)
-        {
-            return virtualHost.getEventLogger();
-        }
-        else
-        {
-            return getBroker().getEventLogger();
-        }
-    }
-
-    @Override
     public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
     {
         assertState(ConnectionState.OPEN);

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Fri Mar 25 15:22:34 2016
@@ -106,7 +106,7 @@ public class AMQChannelTest extends Qpid
         _protocolOutputConverter = mock(ProtocolOutputConverter.class);
 
         _amqConnection = mock(AMQPConnection_0_8.class);
-        when(_amqConnection.getAuthorizedSubject()).thenReturn(authenticatedSubject);
+        when(_amqConnection.getSubject()).thenReturn(authenticatedSubject);
         when(_amqConnection.getAuthorizedPrincipal()).thenReturn(authenticatedPrincipal);
         when(_amqConnection.getVirtualHost()).thenReturn((VirtualHost)_virtualHost);
         when(_amqConnection.getProtocolOutputConverter()).thenReturn(_protocolOutputConverter);
@@ -177,10 +177,10 @@ public class AMQChannelTest extends Qpid
         });
 
         Set<Principal> authenticatedUser = Collections.<Principal>singleton(new AuthenticatedPrincipal("user"));
-        _amqConnection.setAuthorizedSubject(new Subject(true,
-                                                        authenticatedUser,
-                                                        Collections.<Principal>emptySet(),
-                                                        Collections.<Principal>emptySet()));
+        _amqConnection.setSubject(new Subject(true,
+                                              authenticatedUser,
+                                              Collections.<Principal>emptySet(),
+                                              Collections.<Principal>emptySet()));
         _amqConnection.associateVirtualHost(_virtualHost);
 
         int channelId = 1;
@@ -210,7 +210,7 @@ public class AMQChannelTest extends Qpid
         });
 
         Set<Principal> authenticatedUser = Collections.<Principal>singleton(new AuthenticatedPrincipal("user"));
-        _amqConnection.setAuthorizedSubject(new Subject(true, authenticatedUser, Collections.<Principal>emptySet(),  Collections.<Principal>emptySet()));
+        _amqConnection.setSubject(new Subject(true, authenticatedUser, Collections.<Principal>emptySet(), Collections.<Principal>emptySet()));
         _amqConnection.associateVirtualHost(_virtualHost);
 
         AMQChannel channel = new AMQChannel(_amqConnection, 1, _virtualHost.getMessageStore());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Mar 25 15:22:34 2016
@@ -57,12 +57,10 @@ import org.apache.qpid.bytebuffer.QpidBy
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
@@ -557,12 +555,6 @@ public class AMQPConnection_1_0 extends
 
     }
 
-    @Override
-    protected void performDeleteTasks()
-    {
-        super.performDeleteTasks();
-    }
-
     public void close()
     {
         getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_RESPONSE_TIMEOUT,
@@ -639,11 +631,6 @@ public class AMQPConnection_1_0 extends
         _connection.sendConnectionCloseAsync(cause, message);
     }
 
-    public Principal getAuthorizedPrincipal()
-    {
-        return _connection.getAuthorizedPrincipal();
-    }
-
     public void closeSessionAsync(final AMQSessionModel<?> session,
                                   final AMQConstant cause, final String message)
     {
@@ -674,18 +661,4 @@ public class AMQPConnection_1_0 extends
     {
         return _connection.getSessionCountLimit();
     }
-
-    @Override
-    protected EventLogger getEventLogger()
-    {
-        final VirtualHost<?> virtualHost = _connection.getVirtualHost();
-        if (virtualHost !=  null)
-        {
-            return virtualHost.getEventLogger();
-        }
-        else
-        {
-            return _broker.getEventLogger();
-        }
-    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Mar 25 15:22:34 2016
@@ -35,7 +35,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.security.auth.Subject;
@@ -238,8 +237,6 @@ public class Connection_1_0 implements C
                     {
                         setUserPrincipal(user);
                     }
-                    _amqpConnection.getSubject().getPrincipals().add(vhost.getPrincipal());
-                    _amqpConnection.updateAccessControllerContext();
                     if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject()) == null)
                     {
                         closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
@@ -248,7 +245,7 @@ public class Connection_1_0 implements C
                     {
                         try
                         {
-                            _amqpConnection.associateVirtualHost(vhost);
+                            _amqpConnection.setVirtualHost(vhost);
                         }
                         catch (VirtualHostUnavailableException e)
                         {
@@ -272,11 +269,7 @@ public class Connection_1_0 implements C
 
     void setUserPrincipal(final Principal user)
     {
-        Subject authSubject = _subjectCreator.createSubjectWithGroups(user);
-        _amqpConnection.getSubject().getPrincipals().addAll(authSubject.getPrincipals());
-        _amqpConnection.getSubject().getPublicCredentials().addAll(authSubject.getPublicCredentials());
-        _amqpConnection.getSubject().getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
-        _amqpConnection.updateAccessControllerContext();
+        _amqpConnection.setSubject(_subjectCreator.createSubjectWithGroups(user));
     }
 
     public void remoteSessionCreation(SessionEndpoint endpoint)
@@ -435,12 +428,6 @@ public class Connection_1_0 implements C
         return _connectionEndpoint.getRemoteContainerId();
     }
 
-    public Principal getAuthorizedPrincipal()
-    {
-        Set<AuthenticatedPrincipal> authPrincipals = _amqpConnection.getSubject().getPrincipals(AuthenticatedPrincipal.class);
-        return authPrincipals.isEmpty() ? null : authPrincipals.iterator().next();
-    }
-
     public long getSessionCountLimit()
     {
         return 0;  // TODO



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org