You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/04 00:28:13 UTC

svn commit: r1564130 [2/2] - in /qpid/branches/java-broker-amqp-1-0-management/java: broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/subscription/ broker-core/src/test/java/org/apache/qpid/server...

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Mon Feb  3 23:28:12 2014
@@ -24,6 +24,7 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicGetBody;
 import org.apache.qpid.framing.BasicGetEmptyBody;
 import org.apache.qpid.framing.MethodRegistry;
@@ -33,8 +34,10 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.SubscriptionTarget_0_8;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -42,9 +45,10 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.EnumSet;
+
 public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
 {
     private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -151,17 +155,24 @@ public class BasicGetMethodHandler imple
             }
         };
 
-        Subscription sub;
+        SubscriptionTarget_0_8 target;
+        EnumSet<Subscription.Option> options = EnumSet.of(Subscription.Option.TRANSIENT, Subscription.Option.ACQUIRES,
+                                                          Subscription.Option.SEES_REQUEUES);
         if(acks)
         {
-            sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
+            target = SubscriptionTarget_0_8.createAckTarget(channel,
+                                                            AMQShortString.EMPTY_STRING, null,
+                                                            singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
         else
         {
-            sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+            target = SubscriptionTarget_0_8.createNoAckTarget(channel,
+                                                              AMQShortString.EMPTY_STRING, null,
+                                                              singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
 
-        queue.registerSubscription(sub,false);
+        Subscription sub = queue.registerSubscription(target, null, AMQMessage.class, "", options);
         sub.flush();
         queue.unregisterSubscription(sub);
         return(!singleMessageCredit.hasCredit());

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Mon Feb  3 23:28:12 2014
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.test.utils.QpidTestCase;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.Set;
 
 /**
@@ -47,6 +48,7 @@ import java.util.Set;
  */
 public class AckTest extends QpidTestCase
 {
+    private SubscriptionTarget_0_8 _subscriptionTarget;
     private Subscription _subscription;
 
     private AMQProtocolSession _protocolSession;
@@ -86,7 +88,6 @@ public class AckTest extends QpidTestCas
 
     private void publishMessages(int count, boolean persistent) throws AMQException
     {
-        _queue.registerSubscription(_subscription,false);
         for (int i = 1; i <= count; i++)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -178,7 +179,10 @@ public class AckTest extends QpidTestCas
      */
     public void testAckChannelAssociationTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager());
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
+                                                               Subscription.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount, true);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -202,7 +206,16 @@ public class AckTest extends QpidTestCas
     public void testNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+        _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
+                                                                       DEFAULT_CONSUMER_TAG,
+                                                                       null,
+                                                                       new LimitlessCreditManager());
+        _subscription = _queue.registerSubscription(_subscriptionTarget,
+                                                    null,
+                                                    AMQMessage.class,
+                                                    DEFAULT_CONSUMER_TAG.toString(),
+                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
+                                                               Subscription.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -218,7 +231,13 @@ public class AckTest extends QpidTestCas
     public void testPersistentNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
+                                                                       DEFAULT_CONSUMER_TAG,
+                                                                       null,
+                                                                       new LimitlessCreditManager());
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount, true);
 
@@ -235,7 +254,15 @@ public class AckTest extends QpidTestCas
      */
     public void testSingleAckReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+                                                                     DEFAULT_CONSUMER_TAG,
+                                                                     null,
+                                                                     new LimitlessCreditManager());
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
+                                                               Subscription.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -264,7 +291,15 @@ public class AckTest extends QpidTestCas
      */
     public void testMultiAckReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+                                                                     DEFAULT_CONSUMER_TAG,
+                                                                     null,
+                                                                     new LimitlessCreditManager());
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
+                                                               Subscription.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -290,7 +325,15 @@ public class AckTest extends QpidTestCas
      */
     public void testMultiAckAllReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+                                                                     DEFAULT_CONSUMER_TAG,
+                                                                     null,
+                                                                     new LimitlessCreditManager());
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
+                                                               Subscription.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -319,8 +362,12 @@ public class AckTest extends QpidTestCas
         // Send 10 messages
         Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
 
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
-                                                                            DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+
+        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
+                                                               Subscription.Option.ACQUIRES));
+
         final int msgCount = 1;
         publishMessages(msgCount);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Mon Feb  3 23:28:12 2014
@@ -37,6 +37,9 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
  *
@@ -105,9 +108,11 @@ public class ExtractResendAndRequeueTest
      */
     private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
     {
-        Subscription subscription = new MockSubscription();
+        Subscription subscription = mock(Subscription.class);
+        when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
+        when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
 
-        // Aquire messages in subscription
+        // Acquire messages in subscription
         for (QueueEntry entry : messageList)
         {
             entry.acquire(subscription);
@@ -157,7 +162,7 @@ public class ExtractResendAndRequeueTest
         Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
 
         // Close subscription
-        subscription.close();
+        when(subscription.isClosed()).thenReturn(true);
 
         final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
         final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Feb  3 23:28:12 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -110,6 +111,7 @@ public class SendingLink_1_0 implements 
         QueueDestination qd = null;
         AMQQueue queue = null;
 
+        EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
 
 
         boolean noLocal = false;
@@ -173,14 +175,11 @@ public class SendingLink_1_0 implements 
             source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
 
             _target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
-            _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
-                                                                               Message_1_0.class,
-                                                                               source.getDistributionMode() != StdDistMode.COPY,
-                                                                               source.getDistributionMode() != StdDistMode.COPY,
-                                                                               getEndpoint().getName(),
-                                                                               false,
-                                                                               _target);
-            _target.setSubscription(_subscription);
+            if(source.getDistributionMode() != StdDistMode.COPY)
+            {
+                options.add(Subscription.Option.ACQUIRES);
+                options.add(Subscription.Option.SEES_REQUEUES);
+            }
 
         }
         else if(destination instanceof ExchangeDestination)
@@ -373,26 +372,27 @@ public class SendingLink_1_0 implements 
 
 
             _target = new SubscriptionTarget_1_0(this, true);
-            _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
-                                                                               Message_1_0.class,
-                                                                               true,
-                                                                               true,
-                                                                               getEndpoint().getName(),
-                                                                               false,
-                                                                               _target);
-            _target.setSubscription(_subscription);
+            options.add(Subscription.Option.ACQUIRES);
+            options.add(Subscription.Option.SEES_REQUEUES);
 
         }
 
-        if(_subscription != null)
+        if(_target != null)
         {
+            if(noLocal)
+            {
+                options.add(Subscription.Option.NO_LOCAL);
+            }
+
+
             _subscription.setNoLocal(noLocal);
 
 
             try
             {
-
-                queue.registerSubscription(_subscription, false);
+                _subscription = queue.registerSubscription(_target,
+                                                           messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+                                                           Message_1_0.class, getEndpoint().getName(), options);
             }
             catch (AMQException e)
             {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java Mon Feb  3 23:28:12 2014
@@ -74,11 +74,6 @@ class SubscriptionTarget_1_0 extends Abs
         _acquires = acquires;
     }
 
-    public void setSubscription(Subscription sub)
-    {
-        _subscription = sub;
-    }
-
     public Subscription getSubscription()
     {
         return _subscription;
@@ -505,6 +500,18 @@ class SubscriptionTarget_1_0 extends Abs
     }
 
     @Override
+    public void subscriptionRegistered(final Subscription sub)
+    {
+        _subscription = sub;
+    }
+
+    @Override
+    public void subscriptionRemoved(final Subscription sub)
+    {
+
+    }
+
+    @Override
     public long getUnacknowledgedBytes()
     {
         // TODO



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org