You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/10/09 19:43:41 UTC

svn commit: r703212 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/subscription/ broker/src/test/java/org/apache/qpid/server/protocol/ broker/src/test/java/org/apache/qpid/...

Author: ritchiem
Date: Thu Oct  9 10:43:41 2008
New Revision: 703212

URL: http://svn.apache.org/viewvc?rev=703212&view=rev
Log:
QPID-1331 : Modified the BrowserSubscription to be consistent with the NoAck Subscription.
Added Test QueueBrowserUsesNoAckTest to validate the change.
Note that the Credit Manager Suspends the subscriber not the channel when credit is exhausted. JIRA to follow.
So to check if the subscription was suspended I needed to make a MockChannel and give it access to the subscriber map in the
Channel.
The test also needed to be able to interrogate the state of the Subscription which was not part of the Subscription interface, but was used by all subscriptions. So promoted to the interface and implemented the stubs in the various helper/test classes.

Added the ability to browse() via the InternalBrokerBaseCase and prevented a NPE when there were no messages returned via getDelivers.


Added:
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/util/
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=703212&r1=703211&r2=703212&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Oct  9 10:43:41 2008
@@ -93,8 +93,7 @@
     private IncomingMessage _currentMessage;
 
     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
-    private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
-
+    protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
 
     private final MessageStore _messageStore;
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=703212&r1=703211&r2=703212&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Thu Oct  9 10:43:41 2008
@@ -82,6 +82,8 @@
 
     void setStateListener(final StateListener listener);
 
+    public State getState();
+
     QueueEntry getLastSeenEntry();
 
     boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=703212&r1=703211&r2=703212&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Thu Oct  9 10:43:41 2008
@@ -79,6 +79,7 @@
         }
 
 
+        @Override
         public boolean isBrowser()
         {
             return true;
@@ -91,6 +92,7 @@
          * @param msg   The message to send
          * @throws AMQException
          */
+        @Override
         public void send(QueueEntry msg) throws AMQException
         {
             // We don't decrement the reference here as we don't want to consume the message
@@ -103,6 +105,13 @@
             }
 
         }
+
+        @Override
+        public boolean wouldSuspend(QueueEntry msg)
+        {
+            return false;
+        }
+
     }
 
     public static class NoAckSubscription extends SubscriptionImpl
@@ -118,6 +127,7 @@
         }
 
 
+        @Override
         public boolean isBrowser()
         {
             return false;
@@ -130,6 +140,7 @@
          * @param entry   The message to send
          * @throws AMQException
          */
+        @Override
         public void send(QueueEntry entry) throws AMQException
         {
 
@@ -166,6 +177,7 @@
             }
         }
 
+        @Override
         public boolean wouldSuspend(QueueEntry msg)
         {
             return false;
@@ -185,6 +197,7 @@
             super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
         }
 
+        @Override
         public boolean isBrowser()
         {
             return false;
@@ -198,6 +211,7 @@
          * @param entry   The message to send
          * @throws AMQException
          */
+        @Override
         public void send(QueueEntry entry) throws AMQException
         {
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=703212&r1=703211&r2=703212&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Thu Oct  9 10:43:41 2008
@@ -72,7 +72,14 @@
     {
         synchronized (_channelDelivers)
         {
-            List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+            List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+            
+            if (all == null)
+            {
+                return new ArrayList<DeliveryPair>(0);
+            }
+            
+            List<DeliveryPair> msgs = all.subList(0, count);
 
             List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=703212&r1=703211&r2=703212&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Thu Oct  9 10:43:41 2008
@@ -29,8 +29,6 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
-import org.apache.qpid.server.subscription.Subscription.State;
-import org.apache.qpid.server.subscription.Subscription.StateListener;
 
 public class MockSubscription implements Subscription
 {
@@ -40,15 +38,15 @@
     private AMQQueue queue = null;
     private StateListener listener = null;
     private QueueEntry lastSeen = null;
-    private State state = State.ACTIVE;
+    private State _state = State.ACTIVE;
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
 
     @Override
     public void close()
     {
         closed = true;
-        listener.stateChange(this, state , State.CLOSED);
-        state = State.CLOSED;
+        listener.stateChange(this, _state, State.CLOSED);
+        _state = State.CLOSED;
     }
 
     @Override
@@ -179,6 +177,11 @@
         this.listener = listener;
     }
 
+    public State getState()
+    {
+        return _state;
+    }
+
     @Override
     public boolean wouldSuspend(QueueEntry msg)
     {

Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java?rev=703212&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java Thu Oct  9 10:43:41 2008
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+
+public class QueueBrowserUsesNoAckTest extends InternalBrokerBaseCase
+{
+
+    public void testQueueBrowserUsesNoAck() throws AMQException
+    {
+        int sendMessageCount = 2;
+        int prefetch = 1;
+
+        //Check store is empty
+        checkStoreContents(0);
+
+        //Send required messsages to the queue
+        publishMessages(_session, _channel, sendMessageCount);
+
+        //Ensure they are stored
+        checkStoreContents(sendMessageCount);
+
+        //Check that there are no unacked messages
+        assertEquals("Channel should have no unacked msgs ", 0,
+                     _channel.getUnacknowledgedMessageMap().size());
+
+        //Set the prefetch on the session to be less than the sent messages
+        _channel.setCredit(0, prefetch);
+
+        //browse the queue
+        AMQShortString browser = browse(_channel, _queue);
+
+        _queue.deliverAsync();
+
+        //Wait for messages to fill the prefetch
+        _session.awaitDelivery(prefetch);
+
+        //Get those messages
+        List<InternalTestProtocolSession.DeliveryPair> messages =
+                _session.getDelivers(_channel.getChannelId(), browser,
+                                     prefetch);
+
+        //Ensure we recevied the prefetched messages
+        assertEquals(prefetch, messages.size());
+
+        //Check the process didn't suspend the subscription as this would
+        // indicate we are using the prefetch credit. i.e. using acks not No-Ack
+        assertTrue("The subscription has been suspended",
+                   !_channel.getSubscription(browser).getState()
+                           .equals(Subscription.State.SUSPENDED));       
+    }
+
+}

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=703212&r1=703211&r2=703212&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Thu Oct  9 10:43:41 2008
@@ -36,15 +36,18 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.util.MockChannel;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 
 public class InternalBrokerBaseCase extends TestCase
 {
     protected IApplicationRegistry _registry;
     protected MessageStore _messageStore;
-    protected AMQChannel _channel;
+    protected MockChannel _channel;
     protected InternalTestProtocolSession _session;
     protected VirtualHost _virtualHost;
     protected StoreContext _storeContext = new StoreContext();
@@ -74,7 +77,7 @@
 
         _session.setVirtualHost(_virtualHost);
 
-        _channel = new AMQChannel(_session, 1, _messageStore);
+        _channel = new MockChannel(_session, 1, _messageStore);
 
         _session.addChannel(_channel);
     }
@@ -113,6 +116,29 @@
         return null;
     }
 
+    protected AMQShortString browse(AMQChannel channel, AMQQueue queue)
+    {
+        try
+        {
+            FieldTable filters = new FieldTable();
+            filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
+
+            return channel.subscribeToQueue(null, queue, true, filters, false, true);
+        }
+        catch (AMQException e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+        catch (ConsumerTagNotUniqueException e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+        //Keep the compiler happy
+        return null;
+    }
+
     public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException
     {
         MessagePublishInfo info = new MessagePublishInfo()

Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java?rev=703212&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java Thu Oct  9 10:43:41 2008
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+public class MockChannel extends AMQChannel
+{
+    public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+            throws AMQException
+    {
+        super(session, channelId, messageStore);
+    }
+
+    public Subscription getSubscription(AMQShortString subscription)
+    {
+        return _tag2SubscriptionMap.get(subscription);
+    }
+    
+}

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=703212&r1=703211&r2=703212&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Thu Oct  9 10:43:41 2008
@@ -110,6 +110,12 @@
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
+    
+    @Override
+    public State getState()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
 
     public QueueEntry getLastSeenEntry()
     {