You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/08/08 14:19:42 UTC
svn commit: r683949 - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/txn/
broker/src/test/java/org/apache/qpid/server/ack/
broker/src/test/java/org/apache/qpid/server/exc...
Author: ritchiem
Date: Fri Aug 8 05:19:41 2008
New Revision: 683949
URL: http://svn.apache.org/viewvc?rev=683949&view=rev
Log:
QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. Added a new InternalBrokerBaseCase for performing testing on the broker without using the client libraries. This allows for testing closer to AMQP. Merged from M2.1.x
Added:
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
Removed:
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Fri Aug 8 05:19:41 2008
@@ -28,6 +28,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.StoreContext;
public interface UnacknowledgedMessageMap
{
@@ -55,8 +56,8 @@
QueueEntry remove(long deliveryTag);
- void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException;
-
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
+
Collection<QueueEntry> cancelAllMessages();
void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Fri Aug 8 05:19:41 2008
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -160,7 +161,8 @@
}
}
- public void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
+
{
synchronized (_lock)
{
@@ -175,6 +177,10 @@
throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ unacked.getValue().discard(storeContext);
+
it.remove();
_unackedSize -= unacked.getValue().getMessage().getSize();
@@ -182,7 +188,6 @@
unacked.getValue().restoreCredit();
- destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
break;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Fri Aug 8 05:19:41 2008
@@ -154,28 +154,13 @@
throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
}
- LinkedList<QueueEntry> acked = new LinkedList<QueueEntry>();
- unacknowledgedMessageMap.drainTo(acked, deliveryTag);
- for (QueueEntry msg : acked)
- {
- if (debug)
- {
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
- }
- if(msg.getMessage().isPersistent())
- {
- beginTranIfNecessary();
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
- }
+ unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
}
}
else
{
QueueEntry msg;
- msg = unacknowledgedMessageMap.remove(deliveryTag);
+ msg = unacknowledgedMessageMap.get(deliveryTag);
if (msg == null)
{
@@ -197,6 +182,9 @@
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
+ unacknowledgedMessageMap.remove(deliveryTag);
+
+
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java?rev=683949&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java Fri Aug 8 05:19:41 2008
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+
+import java.util.List;
+
+public class AcknowledgeTest extends InternalBrokerBaseCase
+{
+
+ public void testTransactionalSingleAck() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(1, 1, 1, false, 0);
+ }
+
+ public void testTransactionalMultiAck() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(10, 1, 5, true, 5);
+ }
+
+ public void testTransactionalAckAll() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(10, 1, 0, true, 0);
+ }
+
+ public void testNonTransactionalSingleAck() throws AMQException
+ {
+ runMessageAck(1, 1, 1, false, 0);
+ }
+
+ public void testNonTransactionalMultiAck() throws AMQException
+ {
+ runMessageAck(10, 1, 5, true, 5);
+ }
+
+ public void testNonTransactionalAckAll() throws AMQException
+ {
+ runMessageAck(10, 1, 0, true, 0);
+ }
+
+ protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException
+ {
+ //Check store is empty
+ checkStoreContents(0);
+
+ //Send required messsages to the queue
+ publishMessages(_session, _channel, sendMessageCount);
+
+ if (_channel.isTransactional())
+ {
+ _channel.commit();
+ }
+
+ //Ensure they are stored
+ checkStoreContents(sendMessageCount);
+
+ //Check that there are no unacked messages
+ assertEquals("Channel should have no unacked msgs ", 0, _channel.getUnacknowledgedMessageMap().size());
+
+ //Subscribe to the queue
+ AMQShortString subscriber = subscribe(_session, _channel, _queue);
+
+ _queue.deliverAsync();
+
+ //Wait for the messages to be delivered
+ _session.awaitDelivery(sendMessageCount);
+
+ //Check that they are all waiting to be acknoledged
+ assertEquals("Channel should have unacked msgs", sendMessageCount, _channel.getUnacknowledgedMessageMap().size());
+
+ List<InternalTestProtocolSession.DeliveryPair> messages = _session.getDelivers(_channel.getChannelId(), subscriber, sendMessageCount);
+
+ //Double check we received the right number of messages
+ assertEquals(sendMessageCount, messages.size());
+
+ //Check that the first message has the expected deliveryTag
+ assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag());
+
+ //Send required Acknowledgement
+ _channel.acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple);
+
+ if (_channel.isTransactional())
+ {
+ _channel.commit();
+ }
+
+ // Check Remaining Acknowledgements
+ assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, _channel.getUnacknowledgedMessageMap().size());
+
+ //Check store contents are also correct.
+ checkStoreContents(remainingUnackedMessages);
+ }
+
+}
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Fri Aug 8 05:19:41 2008
@@ -31,7 +31,7 @@
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -48,7 +48,7 @@
MessageStore _store;
StoreContext _context;
- TestMinaProtocolSession _protocolSession;
+ InternalTestProtocolSession _protocolSession;
public void setUp() throws AMQException
@@ -57,7 +57,7 @@
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
_store = new MemoryMessageStore();
_context = new StoreContext();
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Fri Aug 8 05:19:41 2008
@@ -34,8 +34,8 @@
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -185,7 +185,7 @@
*/
public void testQueueDepthAlertWithSubscribers() throws Exception
{
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
_protocolSession.addChannel(channel);
@@ -296,7 +296,7 @@
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Aug 8 05:19:41 2008
@@ -33,8 +33,8 @@
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -129,7 +129,7 @@
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+ InternalTestProtocolSession protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
@@ -314,7 +314,7 @@
null);
_queueMBean = new AMQQueueMBean(_queue);
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
private void sendMessages(int messageCount, boolean persistent) throws AMQException
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=683949&r1=683948&r2=683949&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Fri Aug 8 05:19:41 2008
@@ -95,7 +95,7 @@
env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
env.put("queue.queue", QUEUE);
-
+
Context context = factory.getInitialContext(env);
Queue queue = (Queue) context.lookup("queue");