You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/08/08 14:19:42 UTC

svn commit: r683949 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/txn/ broker/src/test/java/org/apache/qpid/server/ack/ broker/src/test/java/org/apache/qpid/server/exc...

Author: ritchiem
Date: Fri Aug  8 05:19:41 2008
New Revision: 683949

URL: http://svn.apache.org/viewvc?rev=683949&view=rev
Log:
 QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. Added a new InternalBrokerBaseCase for performing testing on the broker without using the client libraries. This allows for testing closer to AMQP. Merged from M2.1.x

Added:
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
Removed:
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Fri Aug  8 05:19:41 2008
@@ -28,6 +28,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.StoreContext;
 
 public interface UnacknowledgedMessageMap
 {
@@ -55,8 +56,8 @@
 
     QueueEntry remove(long deliveryTag);
 
-    void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException;
-
+    public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
+    
     Collection<QueueEntry> cancelAllMessages();
 
     void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Fri Aug  8 05:19:41 2008
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.ack;
 
+import org.apache.qpid.server.store.StoreContext;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -160,7 +161,8 @@
         }
     }
 
-    public void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException
+    public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
+   
     {
         synchronized (_lock)
         {
@@ -175,6 +177,10 @@
                     throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
                                            " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
                 }
+
+                //Message has been ack so discard it. This will dequeue and decrement the reference.
+                unacked.getValue().discard(storeContext);
+
                 it.remove();
 
                 _unackedSize -= unacked.getValue().getMessage().getSize();
@@ -182,7 +188,6 @@
                 unacked.getValue().restoreCredit();
 
 
-                destination.add(unacked.getValue());
                 if (unacked.getKey() == deliveryTag)
                 {
                     break;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Fri Aug  8 05:19:41 2008
@@ -154,28 +154,13 @@
                     throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
                 }
 
-                LinkedList<QueueEntry> acked = new LinkedList<QueueEntry>();
-                unacknowledgedMessageMap.drainTo(acked, deliveryTag);
-                for (QueueEntry msg : acked)
-                {
-                        if (debug)
-                        {
-                            _log.debug("Discarding message: " + msg.getMessage().getMessageId());
-                        }
-                        if(msg.getMessage().isPersistent())
-                        {
-                            beginTranIfNecessary();
-                        }
-
-                        //Message has been ack so discard it. This will dequeue and decrement the reference.
-                        msg.discard(_storeContext);
-                }
+                unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
             }
         }
         else
         {
             QueueEntry msg;
-            msg = unacknowledgedMessageMap.remove(deliveryTag);
+            msg = unacknowledgedMessageMap.get(deliveryTag);
 
             if (msg == null)
             {
@@ -197,6 +182,9 @@
             //Message has been ack so discard it. This will dequeue and decrement the reference.
             msg.discard(_storeContext);
 
+            unacknowledgedMessageMap.remove(deliveryTag);
+
+
             if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +

Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java?rev=683949&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java Fri Aug  8 05:19:41 2008
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+
+import java.util.List;
+
+public class AcknowledgeTest extends InternalBrokerBaseCase
+{
+
+    public void testTransactionalSingleAck() throws AMQException
+    {
+        _channel.setLocalTransactional();
+        runMessageAck(1, 1, 1, false, 0);
+    }
+
+    public void testTransactionalMultiAck() throws AMQException
+    {
+        _channel.setLocalTransactional();
+        runMessageAck(10, 1, 5, true, 5);
+    }
+
+    public void testTransactionalAckAll() throws AMQException
+    {
+        _channel.setLocalTransactional();
+        runMessageAck(10, 1, 0, true, 0);
+    }
+
+    public void testNonTransactionalSingleAck() throws AMQException
+    {
+        runMessageAck(1, 1, 1, false, 0);
+    }
+
+    public void testNonTransactionalMultiAck() throws AMQException
+    {
+        runMessageAck(10, 1, 5, true, 5);
+    }
+
+    public void testNonTransactionalAckAll() throws AMQException
+    {
+        runMessageAck(10, 1, 0, true, 0);
+    }
+
+    protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException
+    {
+        //Check store is empty
+        checkStoreContents(0);
+
+        //Send required messsages to the queue
+        publishMessages(_session, _channel, sendMessageCount);
+
+        if (_channel.isTransactional())
+        {
+            _channel.commit();
+        }
+
+        //Ensure they are stored
+        checkStoreContents(sendMessageCount);
+
+        //Check that there are no unacked messages
+        assertEquals("Channel should have no unacked msgs ", 0, _channel.getUnacknowledgedMessageMap().size());
+
+        //Subscribe to the queue
+        AMQShortString subscriber = subscribe(_session, _channel, _queue);
+
+        _queue.deliverAsync();
+
+        //Wait for the messages to be delivered
+        _session.awaitDelivery(sendMessageCount);
+
+        //Check that they are all waiting to be acknoledged
+        assertEquals("Channel should have unacked msgs", sendMessageCount, _channel.getUnacknowledgedMessageMap().size());
+
+        List<InternalTestProtocolSession.DeliveryPair> messages = _session.getDelivers(_channel.getChannelId(), subscriber, sendMessageCount);
+
+        //Double check we received the right number of messages
+        assertEquals(sendMessageCount, messages.size());
+
+        //Check that the first message has the expected deliveryTag
+        assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag());
+
+        //Send required Acknowledgement
+        _channel.acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple);
+
+        if (_channel.isTransactional())
+        {
+            _channel.commit();
+        }
+
+        // Check Remaining Acknowledgements
+        assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, _channel.getUnacknowledgedMessageMap().size());
+
+        //Check store contents are also correct.
+        checkStoreContents(remainingUnackedMessages);
+    }
+
+}

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Fri Aug  8 05:19:41 2008
@@ -31,7 +31,7 @@
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -48,7 +48,7 @@
     MessageStore _store;
     StoreContext _context;
 
-    TestMinaProtocolSession _protocolSession;
+    InternalTestProtocolSession _protocolSession;
 
 
     public void setUp() throws AMQException
@@ -57,7 +57,7 @@
         _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
         _store = new MemoryMessageStore();
         _context = new StoreContext();
-        _protocolSession = new TestMinaProtocolSession();
+        _protocolSession = new InternalTestProtocolSession();
     }
 
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Fri Aug  8 05:19:41 2008
@@ -34,8 +34,8 @@
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -185,7 +185,7 @@
     */
     public void testQueueDepthAlertWithSubscribers() throws Exception
     {
-        _protocolSession = new TestMinaProtocolSession();
+        _protocolSession = new InternalTestProtocolSession();
         AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
         _protocolSession.addChannel(channel);
 
@@ -296,7 +296,7 @@
         super.setUp();
         IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
         _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
-        _protocolSession = new TestMinaProtocolSession();
+        _protocolSession = new InternalTestProtocolSession();
 
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Aug  8 05:19:41 2008
@@ -33,8 +33,8 @@
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactory;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -129,7 +129,7 @@
         assertTrue(_queueMBean.getActiveConsumerCount() == 0);
 
 
-        TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+        InternalTestProtocolSession protocolSession = new InternalTestProtocolSession();
         AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
         protocolSession.addChannel(channel);
 
@@ -314,7 +314,7 @@
                                                     null);
         _queueMBean = new AMQQueueMBean(_queue);
 
-        _protocolSession = new TestMinaProtocolSession();
+        _protocolSession = new InternalTestProtocolSession();
     }
 
     private void sendMessages(int messageCount, boolean persistent) throws AMQException

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Fri Aug  8 05:19:41 2008
@@ -95,7 +95,7 @@
 
         env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
         env.put("queue.queue", QUEUE);
-
+                                           
         Context context = factory.getInitialContext(env);
 
         Queue queue = (Queue) context.lookup("queue");