You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/27 23:13:27 UTC
svn commit: r1687962 [1/4] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
broker-codegen/src/main/java/org/apache/qpid/server/model/validation/
broker-core/src/main/java/org/apache/qpid/server/connection/ b...
Author: rgodfrey
Date: Sat Jun 27 21:13:25 2015
New Revision: 1687962
URL: http://svn.apache.org/r1687962
Log:
QPID-6612 : Refactor various connection related classes
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (with props)
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (with props)
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (with props)
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (with props)
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Sat Jun 27 21:13:25 2015
@@ -47,13 +47,14 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.VirtualHostAlias;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.*;
@@ -500,7 +501,7 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
- public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+ public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
return false;
}
@@ -547,13 +548,13 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
- public void registerConnection(final Connection<?> connection)
+ public void registerConnection(final AMQPConnection<?> connection)
{
throwUnsupportedForReplica();
}
@Override
- public void deregisterConnection(final Connection<?> connection)
+ public void deregisterConnection(final AMQPConnection<?> connection)
{
throwUnsupportedForReplica();
}
Modified: qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java (original)
+++ qpid/java/trunk/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java Sat Jun 27 21:13:25 2015
@@ -275,7 +275,7 @@ public class AttributeAnnotationValidato
return true;
}
}
- if(typeElement.getKind()==ElementKind.ENUM)
+ if(typeElement != null && typeElement.getKind()==ElementKind.ENUM)
{
return true;
}
@@ -290,11 +290,14 @@ public class AttributeAnnotationValidato
final TypeElement managedAttributeTypeValueElement =
elementUtils.getTypeElement(ManagedAttributeValueTypeValidator.MANAGED_ATTRIBUTE_VALUE_TYPE_CLASS_NAME);
- for(AnnotationMirror annotation : typeElement.getAnnotationMirrors())
+ if(typeElement != null)
{
- if(annotation.getAnnotationType().asElement().equals(managedAttributeTypeValueElement))
+ for (AnnotationMirror annotation : typeElement.getAnnotationMirrors())
{
- return true;
+ if (annotation.getAnnotationType().asElement().equals(managedAttributeTypeValueElement))
+ {
+ return true;
+ }
}
}
if(typeUtils.isSameType(type,elementUtils.getTypeElement("java.lang.Object").asType()))
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java Sat Jun 27 21:13:25 2015
@@ -23,14 +23,14 @@ package org.apache.qpid.server.connectio
import java.net.SocketAddress;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
public class ConnectionPrincipal implements SocketConnectionPrincipal
{
- private final AMQConnectionModel _connection;
+ private final AMQPConnection<?> _connection;
- public ConnectionPrincipal(final AMQConnectionModel connection)
+ public ConnectionPrincipal(final AMQPConnection<?> connection)
{
_connection = connection;
}
@@ -44,10 +44,10 @@ public class ConnectionPrincipal impleme
@Override
public SocketAddress getRemoteAddress()
{
- return _connection.getRemoteAddress();
+ return _connection.getRemoteSocketAddress();
}
- public AMQConnectionModel getConnection()
+ public AMQPConnection<?> getConnection()
{
return _connection;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionVersionValidator.java Sat Jun 27 21:13:25 2015
@@ -38,7 +38,7 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
@PluggableService
@@ -71,7 +71,7 @@ public class ConnectionVersionValidator
}
@Override
- public boolean validateConnectionCreation(final AMQConnectionModel<?, ?> connection,
+ public boolean validateConnectionCreation(final AMQPConnection<?> connection,
final VirtualHost<?, ?, ?> virtualHost)
{
String connectionVersion = connection.getClientVersion();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Sat Jun 27 21:13:25 2015
@@ -88,7 +88,7 @@ public abstract class AbstractConsumerTa
@Override
public final boolean isSuspended()
{
- return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended();
+ return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || doIsSuspended();
}
protected abstract boolean doIsSuspended();
@@ -185,7 +185,7 @@ public abstract class AbstractConsumerTa
public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
_queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
- getSessionModel().getConnectionModel().notifyWork();
+ getSessionModel().getAMQPConnection().notifyWork();
return entry.getMessage().getSize();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractMessageLogger.java Sat Jun 27 21:13:25 2015
@@ -24,11 +24,11 @@ package org.apache.qpid.server.logging;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.logging.subjects.LogSubjectFormat;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal;
import org.apache.qpid.server.security.auth.TaskPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
import javax.security.auth.Subject;
import java.security.AccessController;
@@ -145,7 +145,7 @@ public abstract class AbstractMessageLog
return "["+taskPrincipal.getName()+"] ";
}
- protected String generateConnectionMessage(final AMQConnectionModel connection)
+ protected String generateConnectionMessage(final AMQPConnection<?> connection)
{
if (connection.getAuthorizedPrincipal() != null)
{
@@ -190,7 +190,7 @@ public abstract class AbstractMessageLog
protected String generateSessionMessage(final AMQSessionModel session)
{
- AMQConnectionModel connection = session.getConnectionModel();
+ AMQPConnection<?> connection = session.getAMQPConnection();
return "[" + MessageFormat.format(CHANNEL_FORMAT, connection == null ? -1L : connection.getConnectionId(),
(connection == null || connection.getAuthorizedPrincipal() == null)
? "?"
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Sat Jun 27 21:13:25 2015
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
@@ -43,7 +43,7 @@ public class ChannelLogSubject extends A
* 3 - Virtualhost
* 4 - Channel ID
*/
- AMQConnectionModel connection = session.getConnectionModel();
+ AMQPConnection connection = session.getAMQPConnection();
setLogStringWithFormat(CHANNEL_FORMAT,
connection == null ? -1L : connection.getConnectionId(),
(connection == null || connection.getAuthorizedPrincipal() == null) ? "?" : connection.getAuthorizedPrincipal().getName(),
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java Sat Jun 27 21:13:25 2015
@@ -21,7 +21,8 @@
package org.apache.qpid.server.logging.subjects;
import java.text.MessageFormat;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+
+import org.apache.qpid.server.transport.AMQPConnection;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -32,11 +33,11 @@ public class ConnectionLogSubject extend
{
// The Session this Actor is representing
- private AMQConnectionModel _session;
+ private AMQPConnection<?> _connection;
- public ConnectionLogSubject(AMQConnectionModel session)
+ public ConnectionLogSubject(AMQPConnection<?> connection)
{
- _session = session;
+ _connection = connection;
}
// Used to stop re-creating the _logString when we reach our final format
@@ -55,9 +56,9 @@ public class ConnectionLogSubject extend
{
if (!_upToDate)
{
- if (_session.getAuthorizedPrincipal() != null)
+ if (_connection.getAuthorizedPrincipal() != null)
{
- if (_session.getVirtualHostName() != null)
+ if (_connection.getVirtualHostName() != null)
{
/**
* LOG FORMAT used by the AMQPConnectorActor follows
@@ -70,10 +71,10 @@ public class ConnectionLogSubject extend
* 0 - Connection ID 1 - User ID 2 - IP 3 - Virtualhost
*/
setLogString("[" + MessageFormat.format(CONNECTION_FORMAT,
- _session.getConnectionId(),
- _session.getAuthorizedPrincipal().getName(),
- _session.getRemoteAddressString(),
- _session.getVirtualHostName())
+ _connection.getConnectionId(),
+ _connection.getAuthorizedPrincipal().getName(),
+ _connection.getRemoteAddressString(),
+ _connection.getVirtualHostName())
+ "] ");
_upToDate = true;
@@ -81,9 +82,9 @@ public class ConnectionLogSubject extend
else
{
setLogString("[" + MessageFormat.format(USER_FORMAT,
- _session.getConnectionId(),
- _session.getAuthorizedPrincipal().getName(),
- _session.getRemoteAddressString())
+ _connection.getConnectionId(),
+ _connection.getAuthorizedPrincipal().getName(),
+ _connection.getRemoteAddressString())
+ "] ");
}
@@ -91,8 +92,8 @@ public class ConnectionLogSubject extend
else
{
setLogString("[" + MessageFormat.format(SOCKET_FORMAT,
- _session.getConnectionId(),
- _session.getRemoteAddressString())
+ _connection.getConnectionId(),
+ _connection.getRemoteAddressString())
+ "] ");
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Sat Jun 27 21:13:25 2015
@@ -43,7 +43,7 @@ public interface MessageSource extends T
void removeConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener);
- boolean verifySessionAccess(AMQSessionModel<?,?> session);
+ boolean verifySessionAccess(AMQSessionModel<?> session);
interface ConsumerRegistrationListener<Q extends MessageSource>
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Sat Jun 27 21:13:25 2015
@@ -22,7 +22,7 @@ package org.apache.qpid.server.model;
import java.util.Collection;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
@ManagedObject( creatable = false )
public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
@@ -85,7 +85,7 @@ public interface Connection<X extends Co
VirtualHost getVirtualHost();
@DerivedAttribute
- Port getPort();
+ Port<?> getPort();
@ManagedStatistic
long getBytesIn();
@@ -109,5 +109,5 @@ public interface Connection<X extends Co
Collection<Session> getSessions();
- AMQConnectionModel<?,?> getUnderlyingConnection();
+ AbstractAMQPConnection<?> getUnderlyingConnection();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Sat Jun 27 21:13:25 2015
@@ -30,6 +30,7 @@ import java.util.UUID;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.virtualhost.VirtualHostConnectionListener;
@ManagedObject( defaultType = "ProvidedStore", description = VirtualHost.CLASS_DESCRIPTION)
@@ -166,7 +167,7 @@ public interface VirtualHost<X extends V
Collection<String> getExchangeTypeNames();
@ManagedOperation(nonModifying = true)
- Collection<Connection<?>> getConnections();
+ Collection<? extends Connection<?>> getConnections();
@ManagedOperation(nonModifying = true)
Connection<?> getConnection(@Param(name="name") String name);
@@ -179,8 +180,8 @@ public interface VirtualHost<X extends V
Principal getPrincipal();
- void registerConnection(Connection<?> connection);
- void deregisterConnection(Connection<?> connection);
+ void registerConnection(AMQPConnection<?> connection);
+ void deregisterConnection(AMQPConnection<?> connection);
public static interface Transaction
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Sat Jun 27 21:13:25 2015
@@ -25,7 +25,6 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Sat Jun 27 21:13:25 2015
@@ -39,14 +39,15 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.util.Action;
-final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
+public final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
{
// Attributes
private final AMQSessionModel _session;
- public SessionAdapter(final ConnectionAdapter connectionAdapter,
+ public SessionAdapter(final AbstractAMQPConnection<?> connectionAdapter,
final AMQSessionModel session)
{
super(parentsMap(connectionAdapter), createAttributes(session));
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java Sat Jun 27 21:13:25 2015
@@ -21,9 +21,9 @@
package org.apache.qpid.server.plugin;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
public interface ConnectionValidator extends Pluggable
{
- boolean validateConnectionCreation(AMQConnectionModel<?, ?> connection, final VirtualHost<?, ?, ?> virtualHost);
+ boolean validateConnectionCreation(AMQPConnection<?> connection, final VirtualHost<?, ?, ?> virtualHost);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Sat Jun 27 21:13:25 2015
@@ -29,6 +29,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.transport.network.Ticker;
@@ -37,11 +38,11 @@ import org.apache.qpid.transport.network
* Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
* when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}.
*/
-public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<AMQSessionModel>, Deletable<T>
+public interface AMQSessionModel<T extends AMQSessionModel<T>> extends Comparable<AMQSessionModel>, Deletable<T>
{
public UUID getId();
- public C getConnectionModel();
+ public AMQPConnection<?> getAMQPConnection();
public String getClientID();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sat Jun 27 21:13:25 2015
@@ -84,7 +84,6 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.TypedContent;
import org.apache.qpid.server.plugin.MessageFilterFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
@@ -93,6 +92,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -342,7 +342,7 @@ public abstract class AbstractQueue<X ex
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
- AMQSessionModel<?,?> sessionModel;
+ AMQSessionModel<?> sessionModel;
if(sessionPrincipals.isEmpty())
{
sessionModel = null;
@@ -360,14 +360,14 @@ public abstract class AbstractQueue<X ex
{
case PRINCIPAL:
- _exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal();
+ _exclusiveOwner = sessionModel.getAMQPConnection().getAuthorizedPrincipal();
break;
case CONTAINER:
- _exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName();
+ _exclusiveOwner = sessionModel.getAMQPConnection().getRemoteContainerName();
break;
case CONNECTION:
- _exclusiveOwner = sessionModel.getConnectionModel();
- addExclusivityConstraint(sessionModel.getConnectionModel());
+ _exclusiveOwner = sessionModel.getAMQPConnection();
+ addExclusivityConstraint(sessionModel.getAMQPConnection());
break;
case SESSION:
_exclusiveOwner = sessionModel;
@@ -405,7 +405,7 @@ public abstract class AbstractQueue<X ex
{
if(sessionModel != null)
{
- addLifetimeConstraint(sessionModel.getConnectionModel());
+ addLifetimeConstraint(sessionModel.getAMQPConnection());
}
else
{
@@ -717,12 +717,12 @@ public abstract class AbstractQueue<X ex
case CONNECTION:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSessionModel().getConnectionModel();
- addExclusivityConstraint(target.getSessionModel().getConnectionModel());
+ exclusiveOwner = target.getSessionModel().getAMQPConnection();
+ addExclusivityConstraint(target.getSessionModel().getAMQPConnection());
}
else
{
- if(exclusiveOwner != target.getSessionModel().getConnectionModel())
+ if(exclusiveOwner != target.getSessionModel().getAMQPConnection())
{
throw new ConsumerAccessRefused();
}
@@ -751,11 +751,11 @@ public abstract class AbstractQueue<X ex
case PRINCIPAL:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+ exclusiveOwner = target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
}
else
{
- if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+ if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal()))
{
throw new ConsumerAccessRefused();
}
@@ -764,11 +764,11 @@ public abstract class AbstractQueue<X ex
case CONTAINER:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
+ exclusiveOwner = target.getSessionModel().getAMQPConnection().getRemoteContainerName();
}
else
{
- if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
+ if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getRemoteContainerName()))
{
throw new ConsumerAccessRefused();
}
@@ -2718,7 +2718,7 @@ public abstract class AbstractQueue<X ex
}
@Override
- public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+ public boolean verifySessionAccess(final AMQSessionModel<?> session)
{
boolean allowed;
switch(_exclusive)
@@ -2730,13 +2730,13 @@ public abstract class AbstractQueue<X ex
allowed = _exclusiveOwner == null || _exclusiveOwner == session;
break;
case CONNECTION:
- allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel();
+ allowed = _exclusiveOwner == null || _exclusiveOwner == session.getAMQPConnection();
break;
case PRINCIPAL:
- allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal());
+ allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getAMQPConnection().getAuthorizedPrincipal());
break;
case CONTAINER:
- allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName());
+ allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getAMQPConnection().getRemoteContainerName());
break;
case LINK:
allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
@@ -2822,7 +2822,7 @@ public abstract class AbstractQueue<X ex
_exclusiveOwner = session;
break;
case LINK:
- _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection();
}
}
@@ -2833,14 +2833,14 @@ public abstract class AbstractQueue<X ex
case NONE:
case CONTAINER:
case PRINCIPAL:
- AMQConnectionModel con = null;
+ AMQPConnection con = null;
for(ConsumerImpl c : getConsumers())
{
if(con == null)
{
- con = c.getSessionModel().getConnectionModel();
+ con = c.getSessionModel().getAMQPConnection();
}
- else if(!con.equals(c.getSessionModel().getConnectionModel()))
+ else if(!con.equals(c.getSessionModel().getAMQPConnection()))
{
throw new ExistingConsumerPreventsExclusive();
}
@@ -2848,10 +2848,10 @@ public abstract class AbstractQueue<X ex
_exclusiveOwner = con;
break;
case SESSION:
- _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel();
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getAMQPConnection();
break;
case LINK:
- _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection();
}
}
@@ -2866,9 +2866,9 @@ public abstract class AbstractQueue<X ex
{
if(containerID == null)
{
- containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName();
+ containerID = c.getSessionModel().getAMQPConnection().getRemoteContainerName();
}
- else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName()))
+ else if(!containerID.equals(c.getSessionModel().getAMQPConnection().getRemoteContainerName()))
{
throw new ExistingConsumerPreventsExclusive();
}
@@ -2876,13 +2876,13 @@ public abstract class AbstractQueue<X ex
_exclusiveOwner = containerID;
break;
case CONNECTION:
- _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName();
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPConnection)_exclusiveOwner).getRemoteContainerName();
break;
case SESSION:
- _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName();
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getAMQPConnection().getRemoteContainerName();
break;
case LINK:
- _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName();
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection().getRemoteContainerName();
}
}
@@ -2897,9 +2897,9 @@ public abstract class AbstractQueue<X ex
{
if(principal == null)
{
- principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+ principal = c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
}
- else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+ else if(!principal.equals(c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal()))
{
throw new ExistingConsumerPreventsExclusive();
}
@@ -2907,13 +2907,13 @@ public abstract class AbstractQueue<X ex
_exclusiveOwner = principal;
break;
case CONNECTION:
- _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal();
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPConnection)_exclusiveOwner).getAuthorizedPrincipal();
break;
case SESSION:
- _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal();
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getAMQPConnection().getAuthorizedPrincipal();
break;
case LINK:
- _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java Sat Jun 27 21:13:25 2015
@@ -62,7 +62,6 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.model.VirtualHostLogger;
import org.apache.qpid.server.model.VirtualHostLoggerFilter;
import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.ObjectProperties.Property;
@@ -71,6 +70,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.access.OperationLoggingDetails;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.TaskPrincipal;
+import org.apache.qpid.server.transport.AMQPConnection;
public class SecurityManager
{
@@ -251,7 +251,7 @@ public class SecurityManager
}
}
- public void authoriseCreateConnection(final AMQConnectionModel connection)
+ public void authoriseCreateConnection(final AMQPConnection<?> connection)
{
String virtualHostName = connection.getVirtualHostName();
ObjectProperties properties = new ObjectProperties(virtualHostName);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SiteSpecificTrustStore.java Sat Jun 27 21:13:25 2015
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.server.security;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1687962&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Sat Jun 27 21:13:25 2015
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.net.SocketAddress;
+import java.security.Principal;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.Deletable;
+
+public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>
+{
+ boolean isMessageAssignmentSuspended();
+
+ long getConnectionId();
+
+ Principal getAuthorizedPrincipal();
+
+ String getRemoteAddressString();
+
+ String getVirtualHostName();
+
+ void notifyWork();
+
+ String getRemoteContainerName();
+
+ boolean isConnectionStopped();
+
+ void registerMessageReceived(long size, long arrivalTime);
+
+ void registerMessageDelivered(long size);
+
+ void closeSessionAsync(AMQSessionModel<?> session, AMQConstant cause, String message);
+
+ SocketAddress getRemoteSocketAddress();
+
+ void block();
+
+ void unblock();
+
+ void setScheduler(NetworkConnectionScheduler networkConnectionScheduler);
+
+ boolean hasSessionWithName(byte[] name);
+
+ void closeAsync(AMQConstant connectionForced, String reason);
+
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1687962&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Sat Jun 27 21:13:25 2015
@@ -0,0 +1,517 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.net.SocketAddress;
+import java.security.AccessController;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.Subject;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.adapter.SessionAdapter;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C>>
+ extends AbstractConfiguredObject<C>
+ implements ProtocolEngine, AMQPConnection<C>
+
+{
+ private final Broker<?> _broker;
+ private final NetworkConnection _network;
+ private final AmqpPort<?> _port;
+ private final Transport _transport;
+ private final long _connectionId;
+ private final AggregateTicker _aggregateTicker;
+ private final Subject _subject = new Subject();
+ private final List<Action<? super C>> _connectionCloseTaskList =
+ new CopyOnWriteArrayList<>();
+
+ private final Action<? super AMQPConnection<C>> _underlyingConnectionDeleteTask;
+
+ private String _clientProduct;
+ private String _clientVersion;
+ private String _remoteProcessPid;
+ private String _clientId;
+
+ private volatile boolean _stopped;
+ private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final AtomicBoolean _underlyingClosed = new AtomicBoolean();
+
+
+ public AbstractAMQPConnection(Broker<?> broker,
+ NetworkConnection network,
+ AmqpPort<?> port,
+ Transport transport,
+ long connectionId,
+ final AggregateTicker aggregateTicker)
+ {
+ super(parentsMap(port),createAttributes(connectionId, network));
+
+ _broker = broker;
+ _network = network;
+ _port = port;
+ _transport = transport;
+ _connectionId = connectionId;
+ _aggregateTicker = aggregateTicker;
+ _subject.getPrincipals().add(new ConnectionPrincipal(this));
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+ _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+
+
+ // Used to allow the protocol layers to tell the model they have been deleted
+ _underlyingConnectionDeleteTask = new Action<AMQPConnection<?>>()
+ {
+ @Override
+ public void performAction(final AMQPConnection<?> object)
+ {
+ removeDeleteTask(this);
+ _underlyingClosed.set(true);
+ deleteAsync();
+ }
+ };
+ addDeleteTask(_underlyingConnectionDeleteTask);
+
+ setState(State.ACTIVE);
+
+ }
+
+ private static Map<String, Object> createAttributes(long connectionId, NetworkConnection network)
+ {
+ Map<String,Object> attributes = new HashMap<>();
+ attributes.put(ID, UUID.randomUUID());
+ attributes.put(NAME, "[" + connectionId + "] " + String.valueOf(network.getRemoteAddress()).replaceAll("/", ""));
+ attributes.put(DURABLE, false);
+ return attributes;
+ }
+
+ public final Broker<?> getBroker()
+ {
+ return _broker;
+ }
+
+ public final NetworkConnection getNetwork()
+ {
+ return _network;
+ }
+
+ public final AmqpPort<?> getPort()
+ {
+ return _port;
+ }
+
+ public final Transport getTransport()
+ {
+ return _transport;
+ }
+
+ @Override
+ public final AggregateTicker getAggregateTicker()
+ {
+ return _aggregateTicker;
+ }
+
+
+ public long getLastIoTime()
+ {
+ return Math.max(getLastReadTime(), getLastWriteTime());
+ }
+
+ public final long getConnectionId()
+ {
+ return _connectionId;
+ }
+
+ public final StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public String getRemoteAddressString()
+ {
+ return String.valueOf(_network.getRemoteAddress());
+ }
+
+ public final void stopConnection()
+ {
+ _stopped = true;
+ }
+
+ public final ProtocolEngine getProtocolEngine()
+ {
+ return this;
+ }
+
+ public boolean isConnectionStopped()
+ {
+ return _stopped;
+ }
+
+ public final String getVirtualHostName()
+ {
+ return getVirtualHost() == null ? null : getVirtualHost().getName();
+ }
+
+ public String getClientVersion()
+ {
+ return _clientVersion;
+ }
+
+ public String getRemoteProcessPid()
+ {
+ return _remoteProcessPid;
+ }
+
+ public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ {
+ ((NonBlockingConnection)_network).changeScheduler(networkConnectionScheduler);
+ }
+
+ public String getClientProduct()
+ {
+ return _clientProduct;
+ }
+
+ public void addDeleteTask(final Action<? super C> task)
+ {
+ _connectionCloseTaskList.add(task);
+ }
+
+ public void removeDeleteTask(final Action<? super C> task)
+ {
+ _connectionCloseTaskList.remove(task);
+ }
+
+
+ protected void performDeleteTasks()
+ {
+ if(runningAsSubject())
+ {
+ for (Action<? super C> task : _connectionCloseTaskList)
+ {
+ task.performAction((C)this);
+ }
+ }
+ else
+ {
+ runAsSubject(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ performDeleteTasks();
+ return null;
+ }
+ });
+ }
+ }
+
+ public String getClientId()
+ {
+ return _clientId;
+ }
+
+ public final StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public final StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public final SocketAddress getRemoteSocketAddress()
+ {
+ return _network.getRemoteAddress();
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ ((VirtualHostImpl<?,?,?>)getVirtualHost()).registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ ((VirtualHostImpl<?,?,?>)getVirtualHost()).registerMessageReceived(messageSize, timestamp);
+ }
+
+ public final void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public final StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public void setClientProduct(final String clientProduct)
+ {
+ _clientProduct = clientProduct;
+ }
+
+ public void setClientVersion(final String clientVersion)
+ {
+ _clientVersion = clientVersion;
+ }
+
+ public void setRemoteProcessPid(final String remoteProcessPid)
+ {
+ _remoteProcessPid = remoteProcessPid;
+ }
+
+ public void setClientId(final String clientId)
+ {
+ _clientId = clientId;
+ }
+
+ private <T> T runAsSubject(PrivilegedAction<T> action)
+ {
+ return Subject.doAs(_subject, action);
+ }
+
+ private boolean runningAsSubject()
+ {
+ return _subject.equals(Subject.getSubject(AccessController.getContext()));
+ }
+
+ @Override
+ public Subject getSubject()
+ {
+ return _subject;
+ }
+
+ public void sessionAdded(final AMQSessionModel<?> session)
+ {
+ SessionAdapter adapter = new SessionAdapter(this, session);
+ adapter.create();
+ childAdded(adapter);
+
+ }
+
+ public void sessionRemoved(final AMQSessionModel<?> session)
+ {
+ }
+
+ public void virtualHostAssociated()
+ {
+ getVirtualHost().registerConnection(this);
+ }
+
+
+
+ @Override
+ public boolean isIncoming()
+ {
+ return true;
+ }
+
+ @Override
+ public String getLocalAddress()
+ {
+ return null;
+ }
+
+ @Override
+ public String getPrincipal()
+ {
+ final Principal authorizedPrincipal = getAuthorizedPrincipal();
+ return authorizedPrincipal == null ? null : authorizedPrincipal.getName();
+ }
+
+ @Override
+ public String getRemoteAddress()
+ {
+ return getRemoteAddressString();
+ }
+
+ @Override
+ public String getRemoteProcessName()
+ {
+ return null;
+ }
+
+ public Collection<Session> getSessions()
+ {
+ return getChildren(Session.class);
+ }
+
+ @SuppressWarnings("unused")
+ @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
+ private ListenableFuture<Void> doDelete()
+ {
+ if (_underlyingClosed.get())
+ {
+ deleted();
+ return Futures.immediateFuture(null);
+ }
+ else
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ asyncCloseUnderlying().addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor()
+ );
+ return returnVal;
+ }
+ }
+
+ @Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ if (_underlyingClosed.get())
+ {
+ return Futures.immediateFuture(null);
+ }
+ else
+ {
+
+ return asyncCloseUnderlying();
+ }
+
+ }
+
+ private ListenableFuture<Void> asyncCloseUnderlying()
+ {
+ final SettableFuture<Void> closeFuture = SettableFuture.create();
+ addDeleteTask(new Action<AMQPConnection<?>>()
+ {
+ @Override
+ public void performAction(final AMQPConnection<?> object)
+ {
+ closeFuture.set(null);
+ }
+ });
+ removeDeleteTask(_underlyingConnectionDeleteTask);
+
+ closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ return closeFuture;
+ }
+
+ @Override
+ protected void onClose()
+ {
+ }
+
+ @Override
+ public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ {
+ if(childClass == Session.class)
+ {
+ throw new IllegalStateException();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot create a child of class " + childClass.getSimpleName());
+ }
+
+ }
+
+ @Override
+ public long getBytesIn()
+ {
+ return getDataReceiptStatistics().getTotal();
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ return getDataDeliveryStatistics().getTotal();
+ }
+
+ @Override
+ public long getMessagesIn()
+ {
+ return getMessageReceiptStatistics().getTotal();
+ }
+
+ @Override
+ public long getMessagesOut()
+ {
+ return getMessageDeliveryStatistics().getTotal();
+ }
+
+ public abstract List<? extends AMQSessionModel<?>> getSessionModels();
+
+ @Override
+ public int getSessionCount()
+ {
+ return getSessionModels().size();
+ }
+
+ @Override
+ public AbstractAMQPConnection<?> getUnderlyingConnection()
+ {
+ return this;
+ }
+
+
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Sat Jun 27 21:13:25 2015
@@ -104,26 +104,6 @@ public class MultiVersionProtocolEngine
return _delegate.isMessageAssignmentSuspended();
}
- public SocketAddress getRemoteAddress()
- {
- return _delegate.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _delegate.getLocalAddress();
- }
-
- public long getWrittenBytes()
- {
- return _delegate.getWrittenBytes();
- }
-
- public long getReadBytes()
- {
- return _delegate.getReadBytes();
- }
-
public void closed()
{
_logger.debug("Closed");
@@ -165,7 +145,7 @@ public class MultiVersionProtocolEngine
public long getConnectionId()
{
- return _delegate.getConnectionId();
+ return _id;
}
@Override
@@ -188,7 +168,7 @@ public class MultiVersionProtocolEngine
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
- public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
+ public void setNetworkConnection(NetworkConnection network)
{
_network = network;
SocketAddress address = _network.getLocalAddress();
@@ -200,7 +180,7 @@ public class MultiVersionProtocolEngine
{
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
- _sender = sender;
+ _sender = network.getSender();
}
@Override
@@ -297,26 +277,6 @@ public class MultiVersionProtocolEngine
}
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- public long getWrittenBytes()
- {
- return 0;
- }
-
- public long getReadBytes()
- {
- return 0;
- }
-
public void received(ByteBuffer msg)
{
_logger.error("Error processing incoming data, could not negotiate a common protocol");
@@ -343,11 +303,6 @@ public class MultiVersionProtocolEngine
}
- public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
- {
-
- }
-
@Override
public long getLastReadTime()
{
@@ -360,11 +315,6 @@ public class MultiVersionProtocolEngine
return 0;
}
- public long getConnectionId()
- {
- return _id;
- }
-
@Override
public Subject getSubject()
{
@@ -396,26 +346,6 @@ public class MultiVersionProtocolEngine
private long _lastReadTime = System.currentTimeMillis();
private final AtomicBoolean _hasWork = new AtomicBoolean();
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- public long getWrittenBytes()
- {
- return 0;
- }
-
- public long getReadBytes()
- {
- return 0;
- }
-
@Override
public void setMessageAssignmentSuspended(final boolean value)
{
@@ -585,11 +515,6 @@ public class MultiVersionProtocolEngine
}
- public long getConnectionId()
- {
- return _id;
- }
-
@Override
public Subject getSubject()
{
@@ -614,7 +539,7 @@ public class MultiVersionProtocolEngine
_delegate = new ClosedDelegateProtocolEngine();
if(_logger.isDebugEnabled())
{
- _logger.debug("Connection from " + getRemoteAddress() + " was closed before any protocol version was established.");
+ _logger.debug("Connection from " + _network.getRemoteAddress() + " was closed before any protocol version was established.");
}
}
catch(Exception e)
@@ -654,11 +579,6 @@ public class MultiVersionProtocolEngine
}
}
- public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
- {
-
- }
-
@Override
public long getLastReadTime()
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Sat Jun 27 21:13:25 2015
@@ -50,7 +50,7 @@ public class NonBlockingNetworkTransport
private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private final Set<TransportEncryption> _encryptionSet;
- private final ProtocolEngineFactory _factory;
+ private final MultiVersionProtocolEngineFactory _factory;
private final ServerSocketChannel _serverSocket;
private final int _timeout;
private final NetworkConnectionScheduler _scheduler;
@@ -133,7 +133,7 @@ public class NonBlockingNetworkTransport
{
socketChannel = serverSocketChannel.accept();
- final ProtocolEngine engine =
+ final MultiVersionProtocolEngine engine =
_factory.newProtocolEngine(socketChannel.socket().getRemoteSocketAddress());
if(engine != null)
@@ -171,7 +171,7 @@ public class NonBlockingNetworkTransport
_scheduler,
_port);
- engine.setNetworkConnection(connection, connection.getSender());
+ engine.setNetworkConnection(connection);
connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
idleTimeoutTicker.setConnection(connection);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Sat Jun 27 21:13:25 2015
@@ -20,15 +20,11 @@
*/
package org.apache.qpid.server.transport;
-import java.net.SocketAddress;
-
import javax.security.auth.Subject;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferReceiver;
-import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
/**
@@ -37,17 +33,6 @@ import org.apache.qpid.transport.network
*/
public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
{
- // Returns the remote address of the NetworkDriver
- SocketAddress getRemoteAddress();
-
- // Returns the local address of the NetworkDriver
- SocketAddress getLocalAddress();
-
- // Returns number of bytes written
- long getWrittenBytes();
-
- // Returns number of bytes read
- long getReadBytes();
// Called by the NetworkDriver when the socket has been closed for reading
void closed();
@@ -59,13 +44,6 @@ public interface ProtocolEngine extends
// Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
void readerIdle();
- void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
-
- /**
- * Gets the connection ID associated with this ProtocolEngine
- */
- long getConnectionId();
-
Subject getSubject();
boolean isTransportBlockedForWriting();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Sat Jun 27 21:13:25 2015
@@ -124,7 +124,7 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+ public boolean verifySessionAccess(final AMQSessionModel<?> session)
{
return true;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Sat Jun 27 21:13:25 2015
@@ -71,7 +71,6 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -89,6 +88,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
@@ -103,7 +103,7 @@ public abstract class AbstractVirtualHos
private final Collection<ConnectionValidator> _connectionValidators = new ArrayList<>();
- private final Set<Connection<?>> _connections = newSetFromMap(new ConcurrentHashMap<Connection<?>, Boolean>());
+ private final Set<AMQPConnection<?>> _connections = newSetFromMap(new ConcurrentHashMap<AMQPConnection<?>, Boolean>());
private final Set<VirtualHostConnectionListener> _connectionAssociationListeners = new CopyOnWriteArraySet<>();
private static enum BlockingType { STORE, FILESYSTEM };
@@ -461,7 +461,7 @@ public abstract class AbstractVirtualHos
}
@Override
- public Collection<Connection<?>> getConnections()
+ public Collection<AMQPConnection<?>> getConnections()
{
return _connections;
}
@@ -530,7 +530,7 @@ public abstract class AbstractVirtualHos
}
@Override
- public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+ public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
getSecurityManager().authoriseCreateConnection(connection);
for(ConnectionValidator validator : _connectionValidators)
@@ -894,13 +894,13 @@ public abstract class AbstractVirtualHos
}
for(Connection conn : _connections)
{
- conn.getUnderlyingConnection().stop();
+ conn.getUnderlyingConnection().stopConnection();
}
List<ListenableFuture<Void>> connectionCloseFutures = new ArrayList<>();
while (!_connections.isEmpty())
{
- Iterator<Connection<?>> itr = _connections.iterator();
+ Iterator<AMQPConnection<?>> itr = _connections.iterator();
while(itr.hasNext())
{
Connection<?> connection = itr.next();
@@ -1148,7 +1148,7 @@ public abstract class AbstractVirtualHos
{
_logger.debug("Checking for long running open transactions on connection " + connection);
}
- for (AMQSessionModel<?,?> session : connection.getUnderlyingConnection().getSessionModels())
+ for (AMQSessionModel<?> session : connection.getUnderlyingConnection().getSessionModels())
{
if (_logger.isDebugEnabled())
{
@@ -1687,34 +1687,60 @@ public abstract class AbstractVirtualHos
}
@Override
- public void registerConnection(final Connection<?> connection)
+ public void registerConnection(final AMQPConnection<?> connection)
{
- _connections.add(connection);
+ doSync(registerConnectionAsync(connection));
+ }
- AMQConnectionModel<?,?> underlyingConnection = connection.getUnderlyingConnection();
- if(_blocked.get())
+ public ListenableFuture<Void> registerConnectionAsync(final AMQPConnection<?> connection)
+ {
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- underlyingConnection.block();
- }
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ _connections.add(connection);
- underlyingConnection.setScheduler(_networkConnectionScheduler);
+ if(_blocked.get())
+ {
+ connection.block();
+ }
+
+ connection.setScheduler(_networkConnectionScheduler);
+
+ for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
+ {
+ listener.connectionAssociated(connection);
+ }
+ return Futures.immediateFuture(null);
+ }
+ });
- for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
- {
- listener.connectionAssociated(connection);
- }
}
@Override
- public void deregisterConnection(final Connection<?> connection)
+ public void deregisterConnection(final AMQPConnection<?> connection)
+ {
+ doSync(deregisterConnectionAsync(connection));
+ }
+
+ public ListenableFuture<Void> deregisterConnectionAsync(final AMQPConnection<?> connection)
{
- if (_connections.remove(connection))
+ return doOnConfigThread(new Callable<ListenableFuture<Void>>()
{
- for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
+ @Override
+ public ListenableFuture<Void> call() throws Exception
{
- listener.connectionRemoved(connection);
+ if (_connections.remove(connection))
+ {
+ for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
+ {
+ listener.connectionRemoved(connection);
+ }
+ }
+ return Futures.immediateFuture(null);
}
- }
+ });
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Sat Jun 27 21:13:25 2015
@@ -32,13 +32,14 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.NoFactoryForTypeException;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AMQQueue<?>, E extends ExchangeImpl<?> >
@@ -106,7 +107,7 @@ public interface VirtualHostImpl< X exte
EventLogger getEventLogger();
- boolean authoriseCreateConnection(AMQConnectionModel<?, ?> connection);
+ boolean authoriseCreateConnection(AMQPConnection<?> connection);
String getLocalAddress(String routingAddress);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org