You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/03/12 16:24:48 UTC

svn commit: r517250 - in /incubator/qpid/branches/perftesting/qpid/java: broker/src/main/java/org/apache/qpid/server/queue/ broker/src/test/java/org/apache/qpid/server/protocol/ broker/src/test/java/org/apache/qpid/server/queue/ systests/src/test/java/...

Author: bhupendrab
Date: Mon Mar 12 08:24:47 2007
New Revision: 517250

URL: http://svn.apache.org/viewvc?view=rev&rev=517250
Log:
QPID-408 Queue Depth should be reduced when message is polled from the queue.
Failure of AMQQueueMBeanTest is also fixed

Added:
    incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java   (with props)
    incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java   (with props)
    incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java   (with props)
Removed:
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
Modified:
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=517250&r1=517249&r2=517250
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Mar 12 08:24:47 2007
@@ -271,6 +271,13 @@
 
             //remove sent message from our queue.
             messageQueue.poll();
+            // (QPID-408) If the queue is not resend Queue, then the message size on the queue
+            // should also be decreased becasue the message from the queue is being polled
+            // The messageQueue being polled can be either resend queue, predelivery queue or main queue  
+            if (messageQueue != sub.getResendQueue())
+            {
+                _totalMessageSize.addAndGet(-message.getSize());
+            }
 
             //If we don't remove the message from _messages
             // Otherwise the Async send will never end            
@@ -381,7 +388,6 @@
                     }
                     for (Subscription sub : _subscriptions.getSubscriptions())
                     {
-
                         // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
                         if (_queue.isShared() && msg.getDeliveredToConsumer())
                         {

Added: incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java?view=auto&rev=517250
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java Mon Mar 12 08:24:47 2007
@@ -0,0 +1,295 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+import java.net.SocketAddress;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please update those.
+ */
+public class TestIoSession implements IoSession
+{
+    private final ConcurrentMap attributes = new ConcurrentHashMap();
+
+    public TestIoSession()
+    {
+    }
+
+    public IoService getService()
+    {
+        return null;
+    }
+
+    public IoServiceConfig getServiceConfig()
+    {
+        return new TestIoConfig();
+    }
+
+    public IoHandler getHandler()
+    {
+        return null;
+    }
+
+    public IoSessionConfig getConfig()
+    {
+        return null;
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return null;
+    }
+
+    public WriteFuture write(Object message)
+    {
+        return null;
+    }
+
+    public CloseFuture close()
+    {
+        return null;
+    }
+
+    public Object getAttachment()
+    {
+        return getAttribute("");
+    }
+
+    public Object setAttachment(Object attachment)
+    {
+        return setAttribute("",attachment);
+    }
+
+    public Object getAttribute(String key)
+    {
+        return attributes.get(key);
+    }
+
+    public Object setAttribute(String key, Object value)
+    {
+        return attributes.put(key,value);
+    }
+
+    public Object setAttribute(String key)
+    {
+        return attributes.put(key, Boolean.TRUE);
+    }
+
+    public Object removeAttribute(String key)
+    {
+        return attributes.remove(key);
+    }
+
+    public boolean containsAttribute(String key)
+    {
+        return attributes.containsKey(key);
+    }
+
+    public Set getAttributeKeys()
+    {
+        return attributes.keySet();
+    }
+
+    public TransportType getTransportType()
+    {
+        return null;
+    }
+
+    public boolean isConnected()
+    {
+        return false;
+    }
+
+    public boolean isClosing()
+    {
+        return false;
+    }
+
+    public CloseFuture getCloseFuture()
+    {
+        return null;
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return new InetSocketAddress("127.0.0.1", 1234);
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return null;
+    }
+
+    public SocketAddress getServiceAddress()
+    {
+        return null;
+    }
+
+    public int getIdleTime(IdleStatus status)
+    {
+        return 0;
+    }
+
+    public long getIdleTimeInMillis(IdleStatus status)
+    {
+        return 0;
+    }
+
+    public void setIdleTime(IdleStatus status, int idleTime)
+    {
+
+    }
+
+    public int getWriteTimeout()
+    {
+        return 0;
+    }
+
+    public long getWriteTimeoutInMillis()
+    {
+        return 0;
+    }
+
+    public void setWriteTimeout(int writeTimeout)
+    {
+
+    }
+
+    public TrafficMask getTrafficMask()
+    {
+        return null;
+    }
+
+    public void setTrafficMask(TrafficMask trafficMask)
+    {
+
+    }
+
+    public void suspendRead()
+    {
+
+    }
+
+    public void suspendWrite()
+    {
+
+    }
+
+    public void resumeRead()
+    {
+
+    }
+
+    public void resumeWrite()
+    {
+
+    }
+
+    public long getReadBytes()
+    {
+        return 0;
+    }
+
+    public long getWrittenBytes()
+    {
+        return 0;
+    }
+
+    public long getReadMessages()
+    {
+        return 0;
+    }
+
+    public long getWrittenMessages()
+    {
+        return 0;
+    }
+
+    public long getWrittenWriteRequests()
+    {
+        return 0;
+    }
+
+    public int getScheduledWriteRequests()
+    {
+        return 0;
+    }
+
+    public int getScheduledWriteBytes()
+    {
+        return 0;
+    }
+
+    public long getCreationTime()
+    {
+        return 0;
+    }
+
+    public long getLastIoTime()
+    {
+        return 0;
+    }
+
+    public long getLastReadTime()
+    {
+        return 0;
+    }
+
+    public long getLastWriteTime()
+    {
+        return 0;
+    }
+
+    public boolean isIdle(IdleStatus status)
+    {
+        return false;
+    }
+
+    public int getIdleCount(IdleStatus status)
+    {
+        return 0;
+    }
+
+    public long getLastIdleTime(IdleStatus status)
+    {
+        return 0; 
+    }
+
+    /**
+     * Test implementation of IoServiceConfig
+     */
+    private class TestIoConfig extends SocketAcceptorConfig
+    {
+        public ThreadModel getThreadModel()
+        {
+            return ReadWriteThreadModel.getInstance();
+        }
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java?view=auto&rev=517250
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java Mon Mar 12 08:24:47 2007
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+public class TestMinaProtocolSession extends AMQMinaProtocolSession
+{
+    public TestMinaProtocolSession() throws AMQException
+    {
+        super(new TestIoSession(),
+              ApplicationRegistry.getInstance().getQueueRegistry(),
+              ApplicationRegistry.getInstance().getExchangeRegistry(),
+              new AMQCodecFactory(true));
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?view=auto&rev=517250
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Mon Mar 12 08:24:47 2007
@@ -0,0 +1,236 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.management.Notification;
+
+/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
+public class AMQQueueAlertTest extends TestCase
+{
+    private final static int MAX_MESSAGE_COUNT = 50;
+    private final static long MAX_MESSAGE_AGE = 250;   // 0.25 sec
+    private final static long MAX_MESSAGE_SIZE = 2000;  // 2 KB
+    private final static long MAX_QUEUE_DEPTH = 10000;  // 10 KB
+    private AMQQueue _queue;
+    private AMQQueueMBean _queueMBean;
+    private QueueRegistry _queueRegistry;
+    private MessageStore _messageStore = new MemoryMessageStore();
+
+    /**
+     * Tests if the alert gets thrown when message count increases the threshold limit
+     *
+     * @throws Exception
+     */
+    public void testMessageCountAlert() throws Exception
+    {
+        _queue = new AMQQueue("testQueue1", false, "AMQueueAlertTest", false, _queueRegistry);
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+
+        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+
+        sendMessages(MAX_MESSAGE_COUNT, 256l);
+        assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
+
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+
+        String notificationMsg = lastNotification.getMessage();
+        assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_COUNT_ALERT.name()));
+    }
+
+    /**
+     * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent
+     *
+     * @throws Exception
+     */
+    public void testMessageSizeAlert() throws Exception
+    {
+        _queue = new AMQQueue("testQueue2", false, "AMQueueAlertTest", false, _queueRegistry);
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+        _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
+
+        sendMessages(1, MAX_MESSAGE_SIZE * 2);
+        assertTrue(_queueMBean.getMessageCount() == 1);
+
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+
+        String notificationMsg = lastNotification.getMessage();
+        assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_SIZE_ALERT.name()));
+    }
+
+    /**
+     * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
+     *
+     * @throws Exception
+     */
+    public void testQueueDepthAlertNoSubscribers() throws Exception
+    {
+        _queue = new AMQQueue("testQueue3", false, "AMQueueAlertTest", false, _queueRegistry);
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+        _queueMBean.setMaximumMessageCount(9999);   // Set a high value, because this is not being tested
+        _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+        while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
+        {
+            sendMessages(1, MAX_MESSAGE_SIZE);
+        }
+
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+
+        String notificationMsg = lastNotification.getMessage();
+        assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+    }
+
+    /*
+     This test sends some messages to the queue with subscribers needing message to be acknowledged.
+     The messages will not be acknowledged and will be required twice. Why we are checking this is because
+     the bug reported said that the queueDepth keeps increasing when messages are requeued.
+     The QueueDepth should decrease when messages are delivered from the queue (QPID-408)
+    */
+    public void testQueueDepthAlertWithSubscribers() throws Exception
+    {
+        AMQChannel channel = new AMQChannel(2, _messageStore, null);
+        AMQMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+        protocolSession.addChannel(channel);
+
+        // Create queue
+        _queue = new AMQQueue("testQueue" + Math.random(), false, "AMQueueAlertTest", false, _queueRegistry);
+        _queue.registerProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag", true, null);
+        _queue.deliverAsync();
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+        _queueMBean.setMaximumMessageCount(9999);   // Set a high value, because this is not being tested
+        _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+        // Send messages(no of message to be little more than what can cause a Queue_Depth alert)
+        int messageCount = Math.round(MAX_QUEUE_DEPTH/MAX_MESSAGE_SIZE) + 10;
+        long totalSize = (messageCount * MAX_MESSAGE_SIZE) >> 10;
+        sendMessages(messageCount, MAX_MESSAGE_SIZE);
+
+        // Check queueDepth. There should be no messages on the queue and as the subscriber is listening
+        // so there should be no Queue_Deoth alert raised
+        assertTrue(_queueMBean.getQueueDepth() == 0);
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNull(lastNotification);
+
+        // Kill the subscriber and check for the queue depth values.
+        // Messages are unacknowledged, so those should get requeued. All messages should be on the Queue
+        _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag");
+        channel.requeue();
+
+        assertTrue(_queueMBean.getQueueDepth() == totalSize);
+
+        lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+        String notificationMsg = lastNotification.getMessage();
+        assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+
+
+        // Connect a consumer again and check QueueDepth values. The queue should get emptied.
+        // Messages will get delivered but still are unacknowledged.
+        _queue.registerProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag", true, null);
+        _queue.deliverAsync();
+        while (_queue.getMessageCount() != 0)
+        {
+            Thread.sleep(100);
+        }
+        assertTrue(_queueMBean.getQueueDepth() == 0);
+
+        // Kill the subscriber again. Now those messages should get requeued again. Check if the queue depth
+        // value is correct.
+        _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag");
+        channel.requeue();
+
+        assertTrue(_queueMBean.getQueueDepth() == totalSize);
+        channel.commit();
+    }
+
+    /**
+     * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of
+     * message age
+     *
+     * @throws Exception
+     */
+    public void testMessageAgeAlert() throws Exception
+    {
+        _queue = new AMQQueue("testQueue4", false, "AMQueueAlertTest", false, _queueRegistry);
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+        _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
+
+        sendMessages(1, MAX_MESSAGE_SIZE);
+
+        // Ensure message sits on queue long enough to age.
+        Thread.sleep(MAX_MESSAGE_AGE * 2);
+
+        sendMessages(1, MAX_MESSAGE_SIZE);
+        assertTrue(_queueMBean.getMessageCount() == 2);
+
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+
+        String notificationMsg = lastNotification.getMessage();
+        assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name()));
+    }
+
+    private AMQMessage message(boolean immediate, long size) throws AMQException
+    {
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Establish some way to determine the version for the test.
+        BasicPublishBody publish = new BasicPublishBody((byte) 8, (byte) 0);
+        publish.immediate = immediate;
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        contentHeaderBody.bodySize = size;   // in bytes
+        AMQMessage message = new AMQMessage(_messageStore, publish);//, contentHeaderBody, null);
+        message.setContentHeaderBody(contentHeaderBody);
+        return message;
+    }
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
+    }
+
+    private void sendMessages(int messageCount, long size) throws AMQException
+    {
+        AMQMessage[] messages = new AMQMessage[messageCount];
+        for (int i = 0; i < messages.length; i++)
+        {
+            messages[i] = message(false, size);
+        }
+        for (int i = 0; i < messageCount; i++)
+        {
+            _queue.deliver(messages[i]);
+        }
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=517250&r1=517249&r2=517250
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Mon Mar 12 08:24:47 2007
@@ -32,6 +32,7 @@
  */
 public class AMQQueueMBeanTest extends TestCase
 {
+    private static final long MESSAGE_SIZE = 1000; // bytes
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
     private QueueRegistry _queueRegistry;
@@ -45,7 +46,8 @@
         sendMessages(messageCount);
         assertTrue(_queueMBean.getMessageCount() == messageCount);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-        assertTrue(_queueMBean.getQueueDepth() == 10);
+        long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+        assertTrue(_queueMBean.getQueueDepth() == queueDepth);
 
         _queueMBean.deleteMessageFromTop();
         assertTrue(_queueMBean.getMessageCount() == messageCount - 1);
@@ -84,13 +86,14 @@
 
     public void testGeneralProperties()
     {
+        long maxQueueDepth = 1000; // in bytes
         _queueMBean.setMaximumMessageCount(50000);
         _queueMBean.setMaximumMessageSize(2000l);
-        _queueMBean.setMaximumQueueDepth(1000l);
+        _queueMBean.setMaximumQueueDepth(maxQueueDepth);
 
         assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
         assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
-        assertTrue(_queueMBean.getMaximumQueueDepth() == 1000);
+        assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10));
 
         assertTrue(_queueMBean.getName().equals("testQueue"));
         assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
@@ -153,7 +156,7 @@
         BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
         publish.immediate = immediate;
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
-        contentHeaderBody.bodySize = 1000;   // in bytes       
+        contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes       
         return new AMQMessage(_messageStore, publish, contentHeaderBody, null);
     }
 
@@ -172,7 +175,6 @@
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message(false);
-            ;
         }
         for (int i = 0; i < messageCount; i++)
         {