You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/03/13 17:04:01 UTC

svn commit: r517745 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/queue/ broker/src/test/java/org/apache/qpid/server/queue/ systests/src/main/java/org/apache/qpid/server/queue/

Author: bhupendrab
Date: Tue Mar 13 09:04:00 2007
New Revision: 517745

URL: http://svn.apache.org/viewvc?view=rev&rev=517745
Log:
QPID-411 : ClearQueue functionality of AMQQueue doesn't reset the queue depth
AMQQueueMBeanTest.java moved to Broker tests

Added:
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java   (with props)
Removed:
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=517745&r1=517744&r2=517745
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Mar 13 09:04:00 2007
@@ -378,6 +378,7 @@
                 msg = getNextMessage();
                 count++;
             }
+            _totalMessageSize.set(0L);
         }
         _lock.unlock();
         return count;

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?view=diff&rev=517745&r1=517744&r2=517745
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue Mar 13 09:04:00 2007
@@ -220,7 +220,12 @@
 
         assertTrue(_queueMBean.getQueueDepth() == totalSize);
         protocolSession.closeSession();
+
+        // Check the clear queue
+        _queueMBean.clearQueue();
+        assertTrue(_queueMBean.getQueueDepth() == 0);
     }
+
     protected AMQMessage message(final boolean immediate, long size) throws AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()

Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=auto&rev=517745
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Mar 13 09:04:00 2007
@@ -0,0 +1,239 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MemoryMessageStore;
+
+import javax.management.JMException;
+import java.util.LinkedList;
+import java.util.HashSet;
+
+/**
+ * Test class to test AMQQueueMBean attribtues and operations
+ */
+public class AMQQueueMBeanTest extends TestCase
+{
+    private static long MESSAGE_SIZE = 1000;
+    private AMQQueue _queue;
+    private AMQQueueMBean _queueMBean;
+    private MessageStore _messageStore = new MemoryMessageStore();
+    private StoreContext _storeContext = new StoreContext();
+    private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+                                                                                     null,
+                                                                                     new LinkedList<RequiredDeliveryException>(),
+                                                                                     new HashSet<Long>());
+    private VirtualHost _virtualHost;
+
+    public void testMessageCount() throws Exception
+    {
+        int messageCount = 10;
+        sendMessages(messageCount);
+        assertTrue(_queueMBean.getMessageCount() == messageCount);
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+        long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+        assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+        _queueMBean.deleteMessageFromTop();
+        assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        _queueMBean.clearQueue();
+        assertTrue(_queueMBean.getMessageCount() == 0);
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+    }
+
+    public void testConsumerCount() throws AMQException
+    {
+        SubscriptionManager mgr = _queue.getSubscribers();
+        assertFalse(mgr.hasActiveSubscribers());
+        assertTrue(_queueMBean.getActiveConsumerCount() == 0);
+
+
+        TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+        AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
+        protocolSession.addChannel(channel);
+
+        _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);
+        assertTrue(_queueMBean.getActiveConsumerCount() == 1);
+
+        SubscriptionSet _subscribers = (SubscriptionSet) mgr;
+        SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
+        Subscription s1 =  subscriptionFactory.createSubscription(channel.getChannelId(),
+                                                                  protocolSession,
+                                                                  new AMQShortString("S1"),
+                                                                  false,
+                                                                  null,
+                                                                  true,
+                                                                  _queue);
+
+        Subscription s2 =  subscriptionFactory.createSubscription(channel.getChannelId(),
+                                                                  protocolSession,
+                                                                  new AMQShortString("S2"),
+                                                                  false,
+                                                                  null,
+                                                                  true,
+                                                                  _queue);
+        _subscribers.addSubscriber(s1);
+        _subscribers.addSubscriber(s2);
+        assertTrue(_queueMBean.getActiveConsumerCount() == 3);
+        assertTrue(_queueMBean.getConsumerCount() == 3);
+
+        s1.close();
+        assertTrue(_queueMBean.getActiveConsumerCount() == 2);
+        assertTrue(_queueMBean.getConsumerCount() == 3);
+    }
+
+    public void testGeneralProperties()
+    {
+        long maxQueueDepth = 1000; // in bytes
+        _queueMBean.setMaximumMessageCount(50000);
+        _queueMBean.setMaximumMessageSize(2000l);
+        _queueMBean.setMaximumQueueDepth(maxQueueDepth);
+
+        assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
+        assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
+        assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10));
+
+        assertTrue(_queueMBean.getName().equals("testQueue"));
+        assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
+        assertFalse(_queueMBean.isAutoDelete());
+        assertFalse(_queueMBean.isDurable());
+    }
+
+    public void testExceptions() throws Exception
+    {
+        try
+        {
+            _queueMBean.viewMessages(0, 3);
+            fail();
+        }
+        catch (JMException ex)
+        {
+
+        }
+
+        try
+        {
+            _queueMBean.viewMessages(2, 1);
+            fail();
+        }
+        catch (JMException ex)
+        {
+
+        }
+
+        try
+        {
+            _queueMBean.viewMessages(-1, 1);
+            fail();
+        }
+        catch (JMException ex)
+        {
+
+        }
+
+        AMQMessage msg = message(false);
+        long id = msg.getMessageId();
+        _queue.clearQueue(_storeContext);
+
+        msg.enqueue(_queue);
+        msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+        _queue.process(_storeContext, msg, false);
+        _queueMBean.viewMessageContent(id);
+        try
+        {
+            _queueMBean.viewMessageContent(id + 1);
+            fail();
+        }
+        catch (JMException ex)
+        {
+
+        }
+    }
+
+    private AMQMessage message(final boolean immediate) throws AMQException
+    {
+        MessagePublishInfo publish = new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return null;
+            }
+
+            public boolean isImmediate()
+            {
+                return immediate;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return null;
+            }
+        };
+                              
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
+        return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
+    }
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+        _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+        _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
+        _queueMBean = new AMQQueueMBean(_queue);
+    }
+
+    private void sendMessages(int messageCount) throws AMQException
+    {
+        AMQMessage[] messages = new AMQMessage[messageCount];
+        for (int i = 0; i < messages.length; i++)
+        {
+            messages[i] = message(false);
+            messages[i].enqueue(_queue);
+            messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+        }
+        for (int i = 0; i < messageCount; i++)
+        {
+            _queue.process(_storeContext, messages[i], false);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
------------------------------------------------------------------------------
    svn:eol-style = native