You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/03/25 16:22:35 UTC
svn commit: r1736588 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpi...
Author: lquack
Date: Fri Mar 25 15:22:34 2016
New Revision: 1736588
URL: http://svn.apache.org/viewvc?rev=1736588&view=rev
Log:
QPID-7162: [Java Broker] Refactor AbstractAMQPConnection and subclasses
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Fri Mar 25 15:22:34 2016
@@ -63,6 +63,7 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.model.adapter.SessionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -109,6 +110,7 @@ public abstract class AbstractAMQPConnec
private volatile boolean _messageAuthorizationRequired;
private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
+ private volatile int _messageCompressionThreshold;
public AbstractAMQPConnection(Broker<?> broker,
ServerNetworkConnection network,
@@ -177,6 +179,7 @@ public abstract class AbstractAMQPConnec
_aggregateTicker.addTicker(slowConnectionOpenTicker);
_lastReadTime = _lastWriteTime = getCreatedTime();
+ logConnectionOpen();
}
public Broker<?> getBroker()
@@ -364,7 +367,7 @@ public abstract class AbstractAMQPConnec
}
- protected void performDeleteTasks()
+ public void performDeleteTasks()
{
if(runningAsSubject())
{
@@ -673,7 +676,7 @@ public abstract class AbstractAMQPConnec
getSubject());
}
- protected void logConnectionOpen()
+ private void logConnectionOpen()
{
runAsSubject(new PrivilegedAction<Object>()
{
@@ -734,7 +737,18 @@ public abstract class AbstractAMQPConnec
return _logSubject;
}
- protected abstract EventLogger getEventLogger();
+ public EventLogger getEventLogger()
+ {
+ final VirtualHost<?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
+ {
+ return virtualHost.getEventLogger();
+ }
+ else
+ {
+ return getBroker().getEventLogger();
+ }
+ }
@Override
public final boolean isAuthorizedMessagePrincipal(final String userId)
@@ -748,6 +762,55 @@ public abstract class AbstractAMQPConnec
return _virtualHost;
}
+ public void setVirtualHost(VirtualHost<?> virtualHost)
+ {
+ associateVirtualHost(virtualHost);
+
+ _messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
+ Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+ if(_messageCompressionThreshold <= 0)
+ {
+ _messageCompressionThreshold = Integer.MAX_VALUE;
+ }
+
+ getSubject().getPrincipals().add(virtualHost.getPrincipal());
+
+ updateAccessControllerContext();
+ logConnectionOpen();
+ }
+
+ public int getMessageCompressionThreshold()
+ {
+ return _messageCompressionThreshold;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getNetwork().getRemoteAddress() + "(" + ((getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName()) + ")");
+ }
+
+ public Principal getAuthorizedPrincipal()
+ {
+ return AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject());
+ }
+
+ public void setSubject(final Subject subject)
+ {
+ if (subject == null)
+ {
+ throw new IllegalArgumentException("subject cannot be null");
+ }
+
+ getSubject().getPrincipals().addAll(subject.getPrincipals());
+ getSubject().getPrivateCredentials().addAll(subject.getPrivateCredentials());
+ getSubject().getPublicCredentials().addAll(subject.getPublicCredentials());
+
+ updateAccessControllerContext();
+
+ }
+
+
private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
{
private final long _allowedTime;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Fri Mar 25 15:22:34 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.AccessController;
-import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Iterator;
@@ -36,7 +35,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
@@ -63,8 +61,6 @@ public class AMQPConnection_0_10 extends
private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
private final ServerInputHandler _inputHandler;
-
- private final ServerNetworkConnection _network;
private final ServerConnection _connection;
private volatile boolean _transportBlockedForWriting;
@@ -100,21 +96,19 @@ public class AMQPConnection_0_10 extends
_inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
_connection.addFrameSizeObserver(_inputHandler);
- _network = network;
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
- _connection.setNetworkConnection(_network);
- _disassembler = new ServerDisassembler(wrapSender(_network.getSender()), Constant.MIN_MAX_FRAME_SIZE);
+ _connection.setNetworkConnection(getNetwork());
+ _disassembler = new ServerDisassembler(wrapSender(getNetwork().getSender()), Constant.MIN_MAX_FRAME_SIZE);
_connection.setSender(_disassembler);
_connection.addFrameSizeObserver(_disassembler);
return null;
}
}, getAccessControllerContext());
- logConnectionOpen();
}
private ByteBufferSender wrapSender(final ByteBufferSender sender)
@@ -200,7 +194,7 @@ public class AMQPConnection_0_10 extends
public Object run()
{
_connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _connection.getConnectionDelegate().getState(), true));
- _network.close();
+ getNetwork().close();
return null;
}
}, getAccessControllerContext());
@@ -209,7 +203,7 @@ public class AMQPConnection_0_10 extends
public String getAddress()
{
- return _network.getRemoteAddress().toString();
+ return getNetwork().getRemoteAddress().toString();
}
@Override
@@ -238,12 +232,6 @@ public class AMQPConnection_0_10 extends
}
@Override
- protected void performDeleteTasks()
- {
- super.performDeleteTasks();
- }
-
- @Override
public boolean isTransportBlockedForWriting()
{
return _transportBlockedForWriting;
@@ -310,11 +298,6 @@ public class AMQPConnection_0_10 extends
_connection.sendConnectionCloseAsync(cause, message);
}
- public Principal getAuthorizedPrincipal()
- {
- return _connection.getAuthorizedPrincipal();
- }
-
public void closeSessionAsync(final AMQSessionModel<?> session,
final AMQConstant cause, final String message)
{
@@ -345,16 +328,4 @@ public class AMQPConnection_0_10 extends
{
return _connection.getSessionCountLimit();
}
-
- @Override
- protected EventLogger getEventLogger()
- {
- return _connection.getEventLogger();
- }
-
- @Override
- public void logConnectionOpen()
- {
- super.logConnectionOpen();
- }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Mar 25 15:22:34 2016
@@ -49,7 +49,6 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -70,7 +69,6 @@ public class ServerConnection extends Co
public static final long CLOSE_OK_TIMEOUT = 10000l;
private final Broker<?> _broker;
- private Principal _authorizedPrincipal = null;
private final long _connectionId;
private final Object _reference = new Object();
private final AmqpPort<?> _port;
@@ -81,8 +79,6 @@ public class ServerConnection extends Co
private final Queue<Action<? super ServerConnection>> _asyncTaskList =
new ConcurrentLinkedQueue<>();
- private int _messageCompressionThreshold;
-
private final AMQPConnection_0_10 _amqpConnection;
private boolean _ignoreFutureInput;
private boolean _ignoreAllButConnectionCloseOk;
@@ -123,8 +119,7 @@ public class ServerConnection extends Co
EventLogger getEventLogger()
{
- VirtualHost<?> virtualHost = getVirtualHost();
- return virtualHost == null ? _broker.getEventLogger() : virtualHost.getEventLogger();
+ return _amqpConnection.getEventLogger();
}
@Override
@@ -132,11 +127,6 @@ public class ServerConnection extends Co
{
super.setState(state);
- if (state == State.OPEN)
- {
- _amqpConnection.logConnectionOpen();
- }
-
if(state == State.CLOSING)
{
getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, (ServerNetworkConnection) getNetworkConnection()));
@@ -161,17 +151,7 @@ public class ServerConnection extends Co
public void setVirtualHost(VirtualHost<?> virtualHost)
{
- _amqpConnection.associateVirtualHost(virtualHost);
- _messageCompressionThreshold =
- virtualHost.getContextValue(Integer.class,
- Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
-
- if(_messageCompressionThreshold <= 0)
- {
- _messageCompressionThreshold = Integer.MAX_VALUE;
- }
- _amqpConnection.getSubject().getPrincipals().add(virtualHost.getPrincipal());
- _amqpConnection.updateAccessControllerContext();
+ _amqpConnection.setVirtualHost(virtualHost);
}
public AmqpPort<?> getPort()
@@ -372,29 +352,14 @@ public class ServerConnection extends Co
return _amqpConnection.getSubject();
}
- /**
- * Sets the authorized subject. It also extracts the UsernamePrincipal from the subject
- * and caches it for optimisation purposes.
- *
- * @param authorizedSubject
- */
public void setAuthorizedSubject(final Subject authorizedSubject)
{
- if (authorizedSubject == null)
- {
- _authorizedPrincipal = null;
- }
- else
- {
- getAuthorizedSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
- _amqpConnection.updateAccessControllerContext();
- _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject);
- }
+ _amqpConnection.setSubject(authorizedSubject);
}
public Principal getAuthorizedPrincipal()
{
- return _authorizedPrincipal;
+ return _amqpConnection.getAuthorizedPrincipal();
}
public long getConnectionId()
@@ -508,7 +473,7 @@ public class ServerConnection extends Co
public int getMessageCompressionThreshold()
{
- return _messageCompressionThreshold;
+ return _amqpConnection.getMessageCompressionThreshold();
}
public int getMaxMessageSize()
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Mar 25 15:22:34 2016
@@ -232,9 +232,9 @@ public class AMQChannel
_connection = connection;
_channelId = channelId;
- _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(),
- connection.getAuthorizedSubject().getPublicCredentials(),
- connection.getAuthorizedSubject().getPrivateCredentials());
+ _subject = new Subject(false, connection.getSubject().getPrincipals(),
+ connection.getSubject().getPublicCredentials(),
+ connection.getSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
_accessControllerContext = org.apache.qpid.server.security.SecurityManager.getAccessControlContextFromSubject(_subject);
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Fri Mar 25 15:22:34 2016
@@ -45,7 +45,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
-import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
@@ -69,7 +68,6 @@ import org.apache.qpid.server.transport.
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
@@ -78,7 +76,6 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.ServerNetworkConnection;
@@ -147,7 +144,6 @@ public class AMQPConnection_0_8
private volatile int _maxFrameSize;
private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
- private final ServerNetworkConnection _network;
private final ByteBufferSender _sender;
private volatile boolean _deferFlush;
@@ -156,7 +152,6 @@ public class AMQPConnection_0_8
private volatile boolean _closeWhenNoRoute;
private volatile boolean _compressionSupported;
- private volatile int _messageCompressionThreshold;
/**
* QPID-6744 - Older queue clients (<=0.32) set the nowait flag false on the queue.delete method and then
@@ -191,11 +186,8 @@ public class AMQPConnection_0_8
? getBroker().getContextValue(String.class, Broker.SEND_QUEUE_DELETE_OK_REGARDLESS_CLIENT_VER_REGEXP): "";
_sendQueueDeleteOkRegardlessClientVerRegexp = Pattern.compile(sendQueueDeleteOkRegardlessRegexp);
- _network = network;
_sender = network.getSender();
_closeWhenNoRoute = getBroker().getConnection_closeWhenNoRoute();
-
- logConnectionOpen();
}
@Override
@@ -505,15 +497,16 @@ public class AMQPConnection_0_8
private void initHeartbeats(int delay)
{
+ ServerNetworkConnection network = getNetwork();
if (delay > 0)
{
- _network.setMaxWriteIdleMillis(1000L * delay);
- _network.setMaxReadIdleMillis(1000L * BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
+ network.setMaxWriteIdleMillis(1000L * delay);
+ network.setMaxReadIdleMillis(1000L * BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
}
else
{
- _network.setMaxWriteIdleMillis(0);
- _network.setMaxReadIdleMillis(0);
+ network.setMaxWriteIdleMillis(0);
+ network.setMaxReadIdleMillis(0);
}
}
@@ -588,7 +581,7 @@ public class AMQPConnection_0_8
finally
{
final long timeoutTime = System.currentTimeMillis() + CLOSE_OK_TIMEOUT;
- getAggregateTicker().addTicker(new ConnectionClosingTicker(timeoutTime, _network));
+ getAggregateTicker().addTicker(new ConnectionClosingTicker(timeoutTime, getNetwork()));
}
}
}
@@ -596,18 +589,12 @@ public class AMQPConnection_0_8
public void closeNetworkConnection()
{
- _network.close();
- }
-
- @Override
- public String toString()
- {
- return _network.getRemoteAddress() + "(" + ((getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName()) + ")");
+ getNetwork().close();
}
private String getLocalFQDN()
{
- SocketAddress address = _network.getLocalAddress();
+ SocketAddress address = getNetwork().getLocalAddress();
if (address instanceof InetSocketAddress)
{
return ((InetSocketAddress) address).getHostName();
@@ -708,57 +695,14 @@ public class AMQPConnection_0_8
return getMethodRegistry();
}
- public void setVirtualHost(VirtualHost<?> virtualHost)
- {
- associateVirtualHost(virtualHost);
-
- _messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
- Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
- if(_messageCompressionThreshold <= 0)
- {
- _messageCompressionThreshold = Integer.MAX_VALUE;
- }
-
- getSubject().getPrincipals().add(virtualHost.getPrincipal());
-
- updateAccessControllerContext();
- logConnectionOpen();
- }
-
public ProtocolOutputConverter getProtocolOutputConverter()
{
return _protocolOutputConverter;
}
- public void setAuthorizedSubject(final Subject authorizedSubject)
- {
- if (authorizedSubject == null)
- {
- throw new IllegalArgumentException("authorizedSubject cannot be null");
- }
-
- getSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
- getSubject().getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
- getSubject().getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
-
- updateAccessControllerContext();
-
- }
-
- public Subject getAuthorizedSubject()
- {
- return getSubject();
- }
-
- public Principal getAuthorizedPrincipal()
- {
-
- return getSubject().getPrincipals(AuthenticatedPrincipal.class).size() == 0 ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(getSubject());
- }
-
public Principal getPeerPrincipal()
{
- return _network.getPeerPrincipal();
+ return getNetwork().getPeerPrincipal();
}
public MethodRegistry getMethodRegistry()
@@ -825,7 +769,7 @@ public class AMQPConnection_0_8
public Object run()
{
getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _state, true));
- _network.close();
+ getNetwork().close();
return null;
}
}, getAccessControllerContext());
@@ -843,7 +787,7 @@ public class AMQPConnection_0_8
public String getAddress()
{
- return String.valueOf(_network.getRemoteAddress());
+ return String.valueOf(getNetwork().getRemoteAddress());
}
public void closeSessionAsync(final AMQSessionModel<?> session, final AMQConstant cause, final String message)
@@ -1160,7 +1104,7 @@ public class AMQPConnection_0_8
broker.getConnection_heartBeatDelay());
writeFrame(tuneBody.generateFrame(0));
_state = ConnectionState.AWAIT_TUNE_OK;
- setAuthorizedSubject(authResult.getSubject());
+ setSubject(authResult.getSubject());
disposeSaslServer();
break;
case CONTINUE:
@@ -1252,7 +1196,7 @@ public class AMQPConnection_0_8
case SUCCESS:
_logger.debug("Connected as: {}", authResult.getSubject());
- setAuthorizedSubject(authResult.getSubject());
+ setSubject(authResult.getSubject());
int frameMax = getDefaultMaxFrameSize();
@@ -1377,31 +1321,12 @@ public class AMQPConnection_0_8
return _compressionSupported && getBroker().isMessageCompressionEnabled();
}
- public int getMessageCompressionThreshold()
- {
- return _messageCompressionThreshold;
- }
-
private SubjectCreator getSubjectCreator()
{
return getPort().getAuthenticationProvider().getSubjectCreator(getTransport().isSecure());
}
@Override
- public EventLogger getEventLogger()
- {
- final VirtualHost<?> virtualHost = getVirtualHost();
- if (virtualHost != null)
- {
- return virtualHost.getEventLogger();
- }
- else
- {
- return getBroker().getEventLogger();
- }
- }
-
- @Override
public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
{
assertState(ConnectionState.OPEN);
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Fri Mar 25 15:22:34 2016
@@ -106,7 +106,7 @@ public class AMQChannelTest extends Qpid
_protocolOutputConverter = mock(ProtocolOutputConverter.class);
_amqConnection = mock(AMQPConnection_0_8.class);
- when(_amqConnection.getAuthorizedSubject()).thenReturn(authenticatedSubject);
+ when(_amqConnection.getSubject()).thenReturn(authenticatedSubject);
when(_amqConnection.getAuthorizedPrincipal()).thenReturn(authenticatedPrincipal);
when(_amqConnection.getVirtualHost()).thenReturn((VirtualHost)_virtualHost);
when(_amqConnection.getProtocolOutputConverter()).thenReturn(_protocolOutputConverter);
@@ -177,10 +177,10 @@ public class AMQChannelTest extends Qpid
});
Set<Principal> authenticatedUser = Collections.<Principal>singleton(new AuthenticatedPrincipal("user"));
- _amqConnection.setAuthorizedSubject(new Subject(true,
- authenticatedUser,
- Collections.<Principal>emptySet(),
- Collections.<Principal>emptySet()));
+ _amqConnection.setSubject(new Subject(true,
+ authenticatedUser,
+ Collections.<Principal>emptySet(),
+ Collections.<Principal>emptySet()));
_amqConnection.associateVirtualHost(_virtualHost);
int channelId = 1;
@@ -210,7 +210,7 @@ public class AMQChannelTest extends Qpid
});
Set<Principal> authenticatedUser = Collections.<Principal>singleton(new AuthenticatedPrincipal("user"));
- _amqConnection.setAuthorizedSubject(new Subject(true, authenticatedUser, Collections.<Principal>emptySet(), Collections.<Principal>emptySet()));
+ _amqConnection.setSubject(new Subject(true, authenticatedUser, Collections.<Principal>emptySet(), Collections.<Principal>emptySet()));
_amqConnection.associateVirtualHost(_virtualHost);
AMQChannel channel = new AMQChannel(_amqConnection, 1, _virtualHost.getMessageStore());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Mar 25 15:22:34 2016
@@ -57,12 +57,10 @@ import org.apache.qpid.bytebuffer.QpidBy
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
@@ -557,12 +555,6 @@ public class AMQPConnection_1_0 extends
}
- @Override
- protected void performDeleteTasks()
- {
- super.performDeleteTasks();
- }
-
public void close()
{
getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_RESPONSE_TIMEOUT,
@@ -639,11 +631,6 @@ public class AMQPConnection_1_0 extends
_connection.sendConnectionCloseAsync(cause, message);
}
- public Principal getAuthorizedPrincipal()
- {
- return _connection.getAuthorizedPrincipal();
- }
-
public void closeSessionAsync(final AMQSessionModel<?> session,
final AMQConstant cause, final String message)
{
@@ -674,18 +661,4 @@ public class AMQPConnection_1_0 extends
{
return _connection.getSessionCountLimit();
}
-
- @Override
- protected EventLogger getEventLogger()
- {
- final VirtualHost<?> virtualHost = _connection.getVirtualHost();
- if (virtualHost != null)
- {
- return virtualHost.getEventLogger();
- }
- else
- {
- return _broker.getEventLogger();
- }
- }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1736588&r1=1736587&r2=1736588&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Mar 25 15:22:34 2016
@@ -35,7 +35,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.security.auth.Subject;
@@ -238,8 +237,6 @@ public class Connection_1_0 implements C
{
setUserPrincipal(user);
}
- _amqpConnection.getSubject().getPrincipals().add(vhost.getPrincipal());
- _amqpConnection.updateAccessControllerContext();
if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject()) == null)
{
closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
@@ -248,7 +245,7 @@ public class Connection_1_0 implements C
{
try
{
- _amqpConnection.associateVirtualHost(vhost);
+ _amqpConnection.setVirtualHost(vhost);
}
catch (VirtualHostUnavailableException e)
{
@@ -272,11 +269,7 @@ public class Connection_1_0 implements C
void setUserPrincipal(final Principal user)
{
- Subject authSubject = _subjectCreator.createSubjectWithGroups(user);
- _amqpConnection.getSubject().getPrincipals().addAll(authSubject.getPrincipals());
- _amqpConnection.getSubject().getPublicCredentials().addAll(authSubject.getPublicCredentials());
- _amqpConnection.getSubject().getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
- _amqpConnection.updateAccessControllerContext();
+ _amqpConnection.setSubject(_subjectCreator.createSubjectWithGroups(user));
}
public void remoteSessionCreation(SessionEndpoint endpoint)
@@ -435,12 +428,6 @@ public class Connection_1_0 implements C
return _connectionEndpoint.getRemoteContainerId();
}
- public Principal getAuthorizedPrincipal()
- {
- Set<AuthenticatedPrincipal> authPrincipals = _amqpConnection.getSubject().getPrincipals(AuthenticatedPrincipal.class);
- return authPrincipals.isEmpty() ? null : authPrincipals.iterator().next();
- }
-
public long getSessionCountLimit()
{
return 0; // TODO
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org