You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/07/15 00:55:54 UTC

svn commit: r1503076 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/logging/actors/ main/java/org/apache/qpid/server/logging/subjects/ main/java/org/apache/qpid/server/protocol/ main/java/org/apache/qpid/server/protocol/v0_10/ ...

Author: rgodfrey
Date: Sun Jul 14 22:55:54 2013
New Revision: 1503076

URL: http://svn.apache.org/r1503076
Log:
QPID-4659 : [Java Broker] fix protocol version specific code in logging, subscriptions

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java Sun Jul 14 22:55:54 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.logging.actors;
 
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.logging.RootMessageLogger;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
@@ -45,7 +46,7 @@ public class AMQPChannelActor extends Ab
      * @param channel    The Channel for this LogActor
      * @param rootLogger The root Logger that this LogActor should use
      */
-    public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger)
+    public AMQPChannelActor(AMQSessionModel channel, RootMessageLogger rootLogger)
     {
         super(rootLogger);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java Sun Jul 14 22:55:54 2013
@@ -14,14 +14,15 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.logging.actors;
 
 import org.apache.qpid.server.logging.RootMessageLogger;
 import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 
@@ -39,7 +40,7 @@ public class AMQPConnectionActor extends
 {
     private ConnectionLogSubject _logSubject;
 
-    public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger)
+    public AMQPConnectionActor(AMQConnectionModel session, RootMessageLogger rootLogger)
     {
         super(rootLogger);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Sun Jul 14 22:55:54 2013
@@ -20,20 +20,16 @@
  */
 package org.apache.qpid.server.logging.subjects;
 
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 
 public class ChannelLogSubject extends AbstractLogSubject
 {
 
-    public ChannelLogSubject(AMQChannel channel)
+    public ChannelLogSubject(AMQSessionModel session)
     {
-        AMQProtocolSession session = channel.getProtocolSession();
-
         /**
          * LOG FORMAT used by the AMQPConnectorActor follows
          * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}.
@@ -47,39 +43,14 @@ public class ChannelLogSubject extends A
          * 3 - Virtualhost
          * 4 - Channel ID
          */
+        AMQConnectionModel connection = session.getConnectionModel();
         setLogStringWithFormat(CHANNEL_FORMAT,
-                               session.getSessionID(),
-                               session.getAuthorizedPrincipal().getName(),
-                               session.getRemoteAddress(),
-                               session.getVirtualHost().getName(),
-                               channel.getChannelId());
-    }
+                               connection == null ? -1L : connection.getConnectionId(),
+                               (connection == null || connection.getPrincipalAsString() == null) ? "?" : connection.getPrincipalAsString(),
+                               (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(),
+                               (connection == null || connection.getVirtualHostName() == null) ? "?" : connection.getVirtualHostName(),
+                               session.getChannelId());
 
-    public ChannelLogSubject(ServerSession session)
-    {
-        /**
-         * LOG FORMAT used by the AMQPConnectorActor follows
-         * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}.
-         *
-         * Uses a MessageFormat call to insert the required values according to
-         * these indices:
-         *
-         * 0 - Connection ID
-         * 1 - User ID
-         * 2 - IP
-         * 3 - Virtualhost
-         * 4 - Channel ID
-         */
-        if(session.getConnection() instanceof ServerConnection)
-        {
-            ServerConnection connection = (ServerConnection) session.getConnection();
-            setLogStringWithFormat(CHANNEL_FORMAT,
-                                   connection == null ? -1L : connection.getConnectionId(),
-                                   session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
-                                   (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(),
-                                   session.getVirtualHost().getName(),
-                                   session.getChannel());
-        }
     }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java Sun Jul 14 22:55:54 2013
@@ -20,34 +20,33 @@
  */
 package org.apache.qpid.server.logging.subjects;
 
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.text.MessageFormat;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
 
-import java.text.MessageFormat;
-
 /** The Connection LogSubject */
 public class ConnectionLogSubject extends AbstractLogSubject
 {
 
-    public ConnectionLogSubject(AMQProtocolSession session)
+    // The Session this Actor is representing
+    private AMQConnectionModel _session;
+
+    public ConnectionLogSubject(AMQConnectionModel session)
     {
         _session = session;
     }
 
-    // The Session this Actor is representing
-    private AMQProtocolSession _session;
-
     // Used to stop re-creating the _logString when we reach our final format
     private boolean _upToDate = false;
 
     /**
      * Update the LogString as the Connection process proceeds.
-     * 
+     *
      * When the Session has an authorized ID add that to the string.
-     * 
+     *
      * When the Session then gains a Vhost add that to the string, at this point
      * we can set upToDate = true as the _logString will not need to be updated
      * from this point onwards.
@@ -56,44 +55,44 @@ public class ConnectionLogSubject extend
     {
         if (!_upToDate)
         {
-            if (_session.getAuthorizedPrincipal() != null)
+            if (_session.getPrincipalAsString() != null)
             {
-                if (_session.getVirtualHost() != null)
+                if (_session.getVirtualHostName() != null)
                 {
                     /**
                      * LOG FORMAT used by the AMQPConnectorActor follows
                      * ConnectionLogSubject.CONNECTION_FORMAT :
                      * con:{0}({1}@{2}/{3})
-                     * 
+                     *
                      * Uses a MessageFormat call to insert the required values
                      * according to these indices:
-                     * 
+                     *
                      * 0 - Connection ID 1 - User ID 2 - IP 3 - Virtualhost
                      */
                     setLogString("[" + MessageFormat.format(CONNECTION_FORMAT,
-                                                            _session.getSessionID(),
-                                                            _session.getAuthorizedPrincipal().getName(),
-                                                            _session.getRemoteAddress(),
-                                                            _session.getVirtualHost().getName())
+                                                            _session.getConnectionId(),
+                                                            _session.getPrincipalAsString(),
+                                                            _session.getRemoteAddressString(),
+                                                            _session.getVirtualHostName())
                                  + "] ");
 
                     _upToDate = true;
-                } 
+                }
                 else
                 {
                     setLogString("[" + MessageFormat.format(USER_FORMAT,
-                                                            _session.getSessionID(),
-                                                            _session.getAuthorizedPrincipal().getName(),
-                                                            _session.getRemoteAddress())
+                                                            _session.getConnectionId(),
+                                                            _session.getPrincipalAsString(),
+                                                            _session.getRemoteAddressString())
                                  + "] ");
 
                 }
-            } 
+            }
             else
             {
                     setLogString("[" + MessageFormat.format(SOCKET_FORMAT,
-                                                            _session.getSessionID(),
-                                                            _session.getRemoteAddress())
+                                                            _session.getConnectionId(),
+                                                            _session.getRemoteAddressString())
                                  + "] ");
             }
         }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Sun Jul 14 22:55:54 2013
@@ -92,4 +92,6 @@ public interface AMQConnectionModel exte
     void stop();
 
     boolean isStopped();
+
+    String getVirtualHostName();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Sun Jul 14 22:55:54 2013
@@ -76,7 +76,7 @@ public class ProtocolEngineCreator_0_10 
         final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker,
                 fqdn, broker.getSubjectCreator(address));
 
-        ServerConnection conn = new ServerConnection(id);
+        ServerConnection conn = new ServerConnection(id,broker);
 
         conn.setConnectionDelegate(connDelegate);
         conn.setRemoteAddress(network.getRemoteAddress());

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sun Jul 14 22:55:54 2013
@@ -32,9 +32,11 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -60,7 +62,7 @@ public class ServerConnection extends Co
 {
     private Runnable _onOpenTask;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
-    private LogActor _actor = GenericActor.getInstance(this);
+    private LogActor _actor;
 
     private Subject _authorizedSubject = null;
     private Principal _authorizedPrincipal = null;
@@ -76,9 +78,10 @@ public class ServerConnection extends Co
     private Transport _transport;
     private volatile boolean _stopped;
 
-    public ServerConnection(final long connectionId)
+    public ServerConnection(final long connectionId, Broker broker)
     {
         _connectionId = connectionId;
+        _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger());
     }
 
     public Object getReference()
@@ -154,6 +157,12 @@ public class ServerConnection extends Co
     }
 
     @Override
+    public String getVirtualHostName()
+    {
+        return _virtualHost == null ? null : _virtualHost.getName();
+    }
+
+    @Override
     public Port getPort()
     {
         return _port;
@@ -503,7 +512,7 @@ public class ServerConnection extends Co
 
     public String getPrincipalAsString()
     {
-        return getAuthorizedPrincipal().getName();
+        return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
     }
 
     public long getSessionCountLimit()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun Jul 14 22:55:54 2013
@@ -45,7 +45,6 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.DtxNotSelectedException;
 import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -250,7 +249,7 @@ public class ServerSessionDelegate exten
                         return;
                     }
 
-                    Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+                    Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
                                                                   destination,
                                                                   method.getAcceptMode(),
                                                                   method.getAcquireMode(),

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java Sun Jul 14 22:55:54 2013
@@ -132,9 +132,9 @@ public class Subscription_0_10 implement
                              MessageAcquireMode acquireMode,
                              MessageFlowMode flowMode,
                              FlowCreditManager_0_10 creditManager,
-                             FilterManager filters,Map<String, Object> arguments, long subscriptionId)
+                             FilterManager filters,Map<String, Object> arguments)
     {
-        _subscriptionID = subscriptionId;
+        _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
         _session = session;
         _postIdSettingAction = new AddMessageDispositionListenerAction(session);
         _destination = destination;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Sun Jul 14 22:55:54 2013
@@ -1322,6 +1322,12 @@ public class AMQProtocolEngine implement
         return _stopped;
     }
 
+    @Override
+    public String getVirtualHostName()
+    {
+        return _virtualHost == null ? null : _virtualHost.getName();
+    }
+
     public long getLastReceivedTime()
     {
         return _lastReceivedTime;
@@ -1359,7 +1365,7 @@ public class AMQProtocolEngine implement
 
     public String getAuthId()
     {
-        return getAuthorizedPrincipal().getName();
+        return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
     }
 
     public Integer getRemotePID()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java Sun Jul 14 22:55:54 2013
@@ -56,33 +56,23 @@ public interface SubscriptionFactory
 
 
     Subscription createSubscription(AMQChannel channel,
-                                            AMQProtocolSession protocolSession,
-                                            AMQShortString consumerTag,
-                                            boolean acks,
-                                            FieldTable filters,
-                                            boolean noLocal,
-                                            FlowCreditManager creditManager,
-                                            ClientDeliveryMethod clientMethod,
-                                            RecordDeliveryMethod recordMethod
-    )
-            throws AMQException;
+                                    AMQProtocolSession protocolSession,
+                                    AMQShortString consumerTag,
+                                    boolean acks,
+                                    FieldTable filters,
+                                    boolean noLocal,
+                                    FlowCreditManager creditManager,
+                                    ClientDeliveryMethod clientMethod,
+                                    RecordDeliveryMethod recordMethod) throws AMQException;
 
 
-    SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel,
-                                                                          AMQProtocolSession session,
-                                                                          AMQShortString consumerTag,
-                                                                          FieldTable filters,
-                                                                          boolean noLocal,
-                                                                          FlowCreditManager creditManager,
-                                                                          ClientDeliveryMethod deliveryMethod,
-                                                                          RecordDeliveryMethod recordMethod) throws AMQException;
+    Subscription createBasicGetNoAckSubscription(AMQChannel channel,
+                                                 AMQProtocolSession session,
+                                                 AMQShortString consumerTag,
+                                                 FieldTable filters,
+                                                 boolean noLocal,
+                                                 FlowCreditManager creditManager,
+                                                 ClientDeliveryMethod deliveryMethod,
+                                                 RecordDeliveryMethod recordMethod) throws AMQException;
 
-    Subscription_0_10 createSubscription(final ServerSession session,
-                                         final String destination,
-                                         final MessageAcceptMode acceptMode,
-                                         final MessageAcquireMode acquireMode,
-                                         final MessageFlowMode flowMode,
-                                         final FlowCreditManager_0_10 creditManager,
-                                         final FilterManager filterManager,
-                                         final Map<String,Object> arguments);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java Sun Jul 14 22:55:54 2013
@@ -25,25 +25,14 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.protocol.v0_10.FlowCreditManager_0_10;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageFlowMode;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 public class SubscriptionFactoryImpl implements SubscriptionFactory
 {
-    private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
 
     public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
                                            AMQShortString consumerTag, boolean acks, FieldTable filters,
@@ -92,15 +81,15 @@ public class SubscriptionFactoryImpl imp
 
         if(isBrowser)
         {
-            return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+            return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
         }
         else if(acks)
         {
-            return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+            return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
         }
         else
         {
-            return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+            return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
         }
     }
 
@@ -113,26 +102,9 @@ public class SubscriptionFactoryImpl imp
                                                                                  final ClientDeliveryMethod deliveryMethod,
                                                                                  final RecordDeliveryMethod recordMethod) throws AMQException
     {
-        return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId());
-    }
-
-    public Subscription_0_10 createSubscription(final ServerSession session,
-                                                final String destination,
-                                                final MessageAcceptMode acceptMode,
-                                                final MessageAcquireMode acquireMode,
-                                                final MessageFlowMode flowMode,
-                                                final FlowCreditManager_0_10 creditManager,
-                                                final FilterManager filterManager,
-                                                final Map<String,Object> arguments)
-    {
-        return new Subscription_0_10(session, destination, acceptMode, acquireMode,
-                                flowMode, creditManager, filterManager, arguments, getNextSubscriptionId());
+        return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod);
     }
 
     public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
 
-    private static long getNextSubscriptionId()
-    {
-        return SUB_ID_GENERATOR.getAndIncrement();
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java Sun Jul 14 22:55:54 2013
@@ -101,11 +101,10 @@ public abstract class SubscriptionImpl i
                                    AMQShortString consumerTag, FieldTable filters,
                                    boolean noLocal, FlowCreditManager creditManager,
                                    ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod,
-                                   long subscriptionID)
+                                   RecordDeliveryMethod recordMethod)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
         }
 
 
@@ -152,12 +151,11 @@ public abstract class SubscriptionImpl i
         public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
                                  AMQShortString consumerTag, FieldTable filters,
                                  boolean noLocal, FlowCreditManager creditManager,
-                                   ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod,
-                                   long subscriptionID)
+                                 ClientDeliveryMethod deliveryMethod,
+                                 RecordDeliveryMethod recordMethod)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
         }
 
 
@@ -241,14 +239,13 @@ public abstract class SubscriptionImpl i
     public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
     {
         public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
-                               AMQShortString consumerTag, FieldTable filters,
-                               boolean noLocal, FlowCreditManager creditManager,
-                                   ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod,
-                                   long subscriptionID)
+                                    AMQShortString consumerTag, FieldTable filters,
+                                    boolean noLocal, FlowCreditManager creditManager,
+                                    ClientDeliveryMethod deliveryMethod,
+                                    RecordDeliveryMethod recordMethod)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
         }
 
         public boolean isTransient()
@@ -268,12 +265,11 @@ public abstract class SubscriptionImpl i
         public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
                                AMQShortString consumerTag, FieldTable filters,
                                boolean noLocal, FlowCreditManager creditManager,
-                                   ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod,
-                                   long subscriptionID)
+                               ClientDeliveryMethod deliveryMethod,
+                               RecordDeliveryMethod recordMethod)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
         }
 
 
@@ -336,15 +332,14 @@ public abstract class SubscriptionImpl i
 
 
 
-    public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession,
+    public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession,
                             AMQShortString consumerTag, FieldTable arguments,
                             boolean noLocal, FlowCreditManager creditManager,
                             ClientDeliveryMethod deliveryMethod,
-                            RecordDeliveryMethod recordMethod,
-                            long subscriptionID)
+                            RecordDeliveryMethod recordMethod)
             throws AMQException
     {
-        _subscriptionID = subscriptionID;
+        _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
         _channel = channel;
         _consumerTag = consumerTag;
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Sun Jul 14 22:55:54 2013
@@ -242,6 +242,12 @@ public class Connection_1_0 implements C
         }
 
         @Override
+        public String getVirtualHostName()
+        {
+            return _vhost == null ? null : _vhost.getName();
+        }
+
+        @Override
         public Port getPort()
         {
             return _port;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sun Jul 14 22:55:54 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.subscription;
 
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.logging.LogActor;
@@ -29,6 +30,8 @@ import org.apache.qpid.server.queue.Queu
 
 public interface Subscription
 {
+    AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
     LogActor getLogActor();
 
     boolean isTransient();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Sun Jul 14 22:55:54 2013
@@ -18,16 +18,18 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import org.apache.qpid.server.logging.RootMessageLogger;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
-import org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.Binary;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class ServerSessionTest extends QpidTestCase
 {
 
@@ -61,13 +63,15 @@ public class ServerSessionTest extends Q
 
     public void testCompareTo() throws Exception
     {
-        ServerConnection connection = new ServerConnection(1);
+        final Broker broker = mock(Broker.class);
+        when(broker.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+        ServerConnection connection = new ServerConnection(1, broker);
         connection.setVirtualHost(_virtualHost);
         ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
                 new Binary(getName().getBytes()), 0);
 
         // create a session with the same name but on a different connection
-        ServerConnection connection2 = new ServerConnection(2);
+        ServerConnection connection2 = new ServerConnection(2, broker);
         connection2.setVirtualHost(_virtualHost);
         ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
                 new Binary(getName().getBytes()), 0);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Sun Jul 14 22:55:54 2013
@@ -569,5 +569,11 @@ public class MockSubscription implements
         {
             return false;
         }
+
+        @Override
+        public String getVirtualHostName()
+        {
+            return null;
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java Sun Jul 14 22:55:54 2013
@@ -23,6 +23,9 @@ package org.apache.qpid.server.subscript
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
 import org.apache.qpid.server.logging.UnitTestMessageLogger;
@@ -41,6 +44,9 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageFlowMode;
 import org.apache.qpid.transport.TestNetworkConnection;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class SubscriptionFactoryImplTest extends QpidTestCase
 {
     private AMQChannel _channel;
@@ -104,14 +110,17 @@ public class SubscriptionFactoryImplTest
         previousId = getNoAckSub.getSubscriptionID();
 
         //create a 0-10 subscription
-        ServerConnection conn = new ServerConnection(1);
+        final Broker broker = mock(Broker.class);
+        when(broker.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+
+        ServerConnection conn = new ServerConnection(1, broker);
         ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), null, null);
         conn.setVirtualHost(_session.getVirtualHost());
         ServerSessionDelegate sesDel = new ServerSessionDelegate();
         Binary name = new Binary(new byte[]{new Byte("1")});
         ServerSession session = new ServerSession(conn, sesDel, name, 0);
 
-        Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT,
+        Subscription sub_0_10 = new Subscription_0_10(session, "1", MessageAcceptMode.EXPLICIT,
                 MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null);
         assertEquals("Unexpected Subscription ID allocated", previousId + 1, sub_0_10.getSubscriptionID());
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org