You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/08/09 00:55:35 UTC

svn commit: r1155137 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/federation/ main/java/org/apache/qpid/server/handler/ main/java/org/apache/qpid/server/subscription/ main/java/org/apache/qpid/server/transport/ test/java/org/...

Author: robbie
Date: Mon Aug  8 22:55:35 2011
New Revision: 1155137

URL: http://svn.apache.org/viewvc?rev=1155137&view=rev
Log:
QPID-3386: move all server Subscription creation into the SubscriptionFactoryImpl, ensure all Subscription implementations share a common ID sequence generator

Added:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
Removed:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java Mon Aug  8 22:55:35 2011
@@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.Base
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.transport.ServerSession;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -696,7 +697,7 @@ public class Bridge implements BridgeCon
 
             //TODO Handle the passing of non-null Filters and Arguments here
             
-            Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+            Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
                                                           _destination,
                                                           MessageAcceptMode.NONE,
                                                           MessageAcquireMode.PRE_ACQUIRED,
@@ -768,7 +769,7 @@ public class Bridge implements BridgeCon
 
           //TODO Handle the passing of non-null Filters and Arguments here
             
-            Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+            Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
                                                           _destination,
                                                           MessageAcceptMode.NONE,
                                                           MessageAcquireMode.PRE_ACQUIRED,

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Mon Aug  8 22:55:35 2011
@@ -162,14 +162,7 @@ public class BasicGetMethodHandler imple
         }
         else
         {
-            sub = new GetNoAckSubscription(channel,
-                                                 session,
-                                                 null,
-                                                 null,
-                                                 false,
-                                                 singleMessageCredit,
-                                                 getDeliveryMethod,
-                                                 getRecordMethod);
+            sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
 
         queue.registerSubscription(sub,false);
@@ -180,27 +173,5 @@ public class BasicGetMethodHandler imple
 
     }
 
-    public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
-    {
-        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
-                               AMQShortString consumerTag, FieldTable filters,
-                               boolean noLocal, FlowCreditManager creditManager,
-                                   ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
-            throws AMQException
-        {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
-        }
-
-        public boolean isTransient()
-        {
-            return true;
-        }
 
-        public boolean wouldSuspend(QueueEntry msg)
-        {
-            return !getCreditManager().useCreditForMessage(msg.getMessage());
-        }
-
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java Mon Aug  8 22:55:35 2011
@@ -20,13 +20,21 @@
  */
 package org.apache.qpid.server.subscription;
 
+import java.util.Map;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.transport.ServerSession;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
 
 /**
  * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
@@ -56,4 +64,23 @@ public interface SubscriptionFactory
                                             RecordDeliveryMethod recordMethod
     )
             throws AMQException;
+
+
+    SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel,
+                                                                          AMQProtocolSession session,
+                                                                          AMQShortString consumerTag,
+                                                                          FieldTable filters,
+                                                                          boolean noLocal,
+                                                                          FlowCreditManager creditManager,
+                                                                          ClientDeliveryMethod deliveryMethod,
+                                                                          RecordDeliveryMethod recordMethod) throws AMQException;
+
+    Subscription_0_10 createSubscription(final ServerSession session,
+                                         final String destination,
+                                         final MessageAcceptMode acceptMode,
+                                         final MessageAcquireMode acquireMode,
+                                         final MessageFlowMode flowMode,
+                                         final FlowCreditManager_0_10 creditManager,
+                                         final FilterManager filterManager,
+                                         final Map<String,Object> arguments);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java Mon Aug  8 22:55:35 2011
@@ -20,17 +20,28 @@
  */
 package org.apache.qpid.server.subscription;
 
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
 
 public class SubscriptionFactoryImpl implements SubscriptionFactory
 {
+    private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
     public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
                                            AMQShortString consumerTag, boolean acks, FieldTable filters,
                                            boolean noLocal, FlowCreditManager creditManager) throws AMQException
@@ -78,18 +89,47 @@ public class SubscriptionFactoryImpl imp
 
         if(isBrowser)
         {
-            return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
+            return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
         }
         else if(acks)
         {
-            return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
+            return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
         }
         else
         {
-            return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
+            return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
         }
     }
 
+    public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel,
+                                                                                 final AMQProtocolSession session,
+                                                                                 final AMQShortString consumerTag,
+                                                                                 final FieldTable filters,
+                                                                                 final boolean noLocal,
+                                                                                 final FlowCreditManager creditManager,
+                                                                                 final ClientDeliveryMethod deliveryMethod,
+                                                                                 final RecordDeliveryMethod recordMethod) throws AMQException
+    {
+        return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId());
+    }
+
+    public Subscription_0_10 createSubscription(final ServerSession session,
+                                                final String destination,
+                                                final MessageAcceptMode acceptMode,
+                                                final MessageAcquireMode acquireMode,
+                                                final MessageFlowMode flowMode,
+                                                final FlowCreditManager_0_10 creditManager,
+                                                final FilterManager filterManager,
+                                                final Map<String,Object> arguments)
+    {
+        return new Subscription_0_10(session, destination, acceptMode, acquireMode,
+                                flowMode, creditManager, filterManager, arguments, getNextSubscriptionId());
+    }
 
     public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
+
+    private static long getNextSubscriptionId()
+    {
+        return SUB_ID_GENERATOR.getAndIncrement();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Aug  8 22:55:35 2011
@@ -88,9 +88,7 @@ public abstract class SubscriptionImpl i
 
     private final Lock _stateChangeLock;
 
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-    // Create a simple ID that increments for ever new Subscription
-    private final long _subscriptionID = idGenerator.getAndIncrement();
+    private final long _subscriptionID;
     private LogSubject _logSubject;
     private LogActor _logActor;
     private UUID _id;
@@ -104,10 +102,11 @@ public abstract class SubscriptionImpl i
                                    AMQShortString consumerTag, FieldTable filters,
                                    boolean noLocal, FlowCreditManager creditManager,
                                    ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
+                                   RecordDeliveryMethod recordMethod,
+                                   long subscriptionID)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
         }
 
 
@@ -151,10 +150,11 @@ public abstract class SubscriptionImpl i
                                  AMQShortString consumerTag, FieldTable filters,
                                  boolean noLocal, FlowCreditManager creditManager,
                                    ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
+                                   RecordDeliveryMethod recordMethod,
+                                   long subscriptionID)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
         }
 
 
@@ -211,16 +211,45 @@ public abstract class SubscriptionImpl i
 
     }
 
+    /**
+     * NoAck Subscription for use with BasicGet method.
+     */
+    public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+    {
+        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+                               AMQShortString consumerTag, FieldTable filters,
+                               boolean noLocal, FlowCreditManager creditManager,
+                                   ClientDeliveryMethod deliveryMethod,
+                                   RecordDeliveryMethod recordMethod,
+                                   long subscriptionID)
+            throws AMQException
+        {
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+        }
+
+        public boolean isTransient()
+        {
+            return true;
+        }
+
+        public boolean wouldSuspend(QueueEntry msg)
+        {
+            return !getCreditManager().useCreditForMessage(msg.getMessage());
+        }
+
+    }
+
     static final class AckSubscription extends SubscriptionImpl
     {
         public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
                                AMQShortString consumerTag, FieldTable filters,
                                boolean noLocal, FlowCreditManager creditManager,
                                    ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
+                                   RecordDeliveryMethod recordMethod,
+                                   long subscriptionID)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
         }
 
 
@@ -296,10 +325,11 @@ public abstract class SubscriptionImpl i
                             AMQShortString consumerTag, FieldTable arguments,
                             boolean noLocal, FlowCreditManager creditManager,
                             ClientDeliveryMethod deliveryMethod,
-                            RecordDeliveryMethod recordMethod)
+                            RecordDeliveryMethod recordMethod,
+                            long subscriptionID)
             throws AMQException
     {
-
+        _subscriptionID = subscriptionID;
         _channel = channel;
         _consumerTag = consumerTag;
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Mon Aug  8 22:55:35 2011
@@ -40,7 +40,6 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.SubscriptionActor;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.AMQMessage;
@@ -80,10 +79,7 @@ import java.nio.ByteBuffer;
 
 public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
 {
-
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-    // Create a simple ID that increments for ever new Subscription
-    private final long _subscriptionID = idGenerator.getAndIncrement();
+    private final long _subscriptionID;
 
     private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
     private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
@@ -114,7 +110,6 @@ public class Subscription_0_10 implement
     private MessageFlowMode _flowMode;
     private final ServerSession _session;
     private AtomicBoolean _stopped = new AtomicBoolean(true);
-    private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
     private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
 
     private LogActor _logActor;
@@ -131,8 +126,9 @@ public class Subscription_0_10 implement
                              MessageAcquireMode acquireMode,
                              MessageFlowMode flowMode,
                              FlowCreditManager_0_10 creditManager,
-                             FilterManager filters,Map<String, Object> arguments)
+                             FilterManager filters,Map<String, Object> arguments, long subscriptionId)
     {
+        _subscriptionID = subscriptionId;
         _session = session;
         _destination = destination;
         _acceptMode = acceptMode;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Aug  8 22:55:35 2011
@@ -74,7 +74,7 @@ public class ServerConnectionDelegate ex
 
     public ServerSession getSession(Connection conn, SessionAttach atc)
     {
-        SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+        SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
 
         ServerSession ssn = new ServerSession(conn, serverSessionDelegate,  new Binary(atc.getName()), 0);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Aug  8 22:55:35 2011
@@ -129,16 +129,6 @@ public class ServerSession extends Sessi
         this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
     }
 
-    protected void setState(State state)
-    {
-        super.setState(state);
-
-        if (state == State.OPEN)
-        {
-	        _actor.message(ChannelMessages.CREATE());
-        }
-    }
-
     public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
     {
         super(connection, delegate, name, expiry);
@@ -150,6 +140,16 @@ public class ServerSession extends Sessi
         getConfigStore().addConfiguredObject(this);
     }
 
+    protected void setState(State state)
+    {
+        super.setState(state);
+
+        if (state == State.OPEN)
+        {
+            _actor.message(ChannelMessages.CREATE());
+        }
+    }
+
     private ConfigStore getConfigStore()
     {
         return getConnectionConfig().getConfigStore();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Aug  8 22:55:35 2011
@@ -47,11 +47,11 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Acquired;
@@ -98,11 +98,10 @@ import org.apache.qpid.transport.TxSelec
 public class ServerSessionDelegate extends SessionDelegate
 {
     private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
-    private final IApplicationRegistry _appRegistry;
 
-    public ServerSessionDelegate(IApplicationRegistry appRegistry)
+    public ServerSessionDelegate()
     {
-        _appRegistry = appRegistry;
+
     }
 
     @Override
@@ -254,7 +253,7 @@ public class ServerSessionDelegate exten
                         return;
                     }
 
-                    Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+                    Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
                                                                   destination,
                                                                   method.getAcceptMode(),
                                                                   method.getAcquireMode(),

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java?rev=1155137&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java Mon Aug  8 22:55:35 2011
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.protocol.ProtocolEngine_0_10;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.transport.ServerSessionDelegate;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.TestNetworkConnection;
+
+public class SubscriptionFactoryImplTest extends InternalBrokerBaseCase
+{
+    /**
+     * Tests that while creating Subscriptions of various types, the 
+     * ID numbers assigned are allocated from a common sequence
+     * (in increasing order).
+     */
+    public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception
+    {
+        //create a No-Ack subscription, get the first Subscription ID
+        long previousId = 0;
+        Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), false, null, false, getChannel().getCreditManager());
+        previousId = noAckSub.getSubscriptionID();
+
+        //create an ack subscription, verify the next Subscription ID is used
+        Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager());
+        assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID());
+        previousId = ackSub.getSubscriptionID();
+
+        //create a browser subscription
+        FieldTable filters = new FieldTable();
+        filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
+        Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager());
+        assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID());
+        previousId = browerSub.getSubscriptionID();
+
+        //create an BasicGet NoAck subscription
+        Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(getChannel(), getSession(), new AMQShortString("1"), null, false, 
+                getChannel().getCreditManager(),getChannel().getClientDeliveryMethod(), getChannel().getRecordDeliveryMethod());
+        assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID());
+        previousId = getNoAckSub.getSubscriptionID();
+
+        //create a 0-10 subscription
+        ServerConnection conn = new ServerConnection(1);
+        ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), getRegistry());
+        conn.setVirtualHost(getVirtualHost());
+        conn.setConnectionConfig(engine);
+        ServerSessionDelegate sesDel = new ServerSessionDelegate();
+        Binary name = new Binary(new byte[]{new Byte("1")});
+        ServerSession session = new ServerSession(conn, sesDel, name, 0, engine);
+
+        Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT,
+                MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null);
+        assertEquals("Unexpected Subscription ID allocated", previousId + 1, sub_0_10.getSubscriptionID());
+    }
+
+}

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Mon Aug  8 22:55:35 2011
@@ -44,14 +44,13 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.MockChannel;
 
 
 public class InternalBrokerBaseCase extends QpidTestCase
 {
     private IApplicationRegistry _registry;
     private MessageStore _messageStore;
-    private MockChannel _channel;
+    private AMQChannel _channel;
     private InternalTestProtocolSession _session;
     private VirtualHost _virtualHost;
     private AMQQueue _queue;
@@ -111,7 +110,7 @@ public class InternalBrokerBaseCase exte
         _session = new InternalTestProtocolSession(_virtualHost);
         CurrentActor.set(_session.getLogActor());
 
-        _channel = new MockChannel(_session, 1, _messageStore);
+        _channel = new AMQChannel(_session, 1, _messageStore);
 
         _session.addChannel(_channel);
     }
@@ -283,12 +282,12 @@ public class InternalBrokerBaseCase exte
         _messageStore = messageStore;
     }
 
-    public MockChannel getChannel()
+    public AMQChannel getChannel()
     {
         return _channel;
     }
 
-    public void setChannel(MockChannel channel)
+    public void setChannel(AMQChannel channel)
     {
         _channel = channel;
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org