You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Bruce Snyder (JIRA)" <ji...@apache.org> on 2008/09/17 10:37:52 UTC
[jira] Commented: (AMQ-1940) Negative queue size (reproducible)
[ https://issues.apache.org/activemq/browse/AMQ-1940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=45756#action_45756 ]
Bruce Snyder commented on AMQ-1940:
-----------------------------------
I've created the following test case out of the attached {{Main.java}} class:
{code}
package org.apache.activemq.broker.region;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
public class QueuePurgeTest extends TestCase {
BrokerService broker;
ConnectionFactory factory;
Connection connection;
Session session;
Queue queue;
MessageConsumer consumer;
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setUseJmx(true);
broker.setPersistent(false);
broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory("vm://localhost");
connection = factory.createConnection();
connection.start();
}
protected void tearDown() throws Exception {
consumer.close();
session.close();
connection.stop();
connection.close();
broker.stop();
}
public void testPurgeQueueWithActiveConsumer() throws Exception {
createProducerAndSendMessages();
QueueViewMBean proxy = getProxyToQueueViewMBean();
createConsumer();
proxy.purge();
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0, proxy.getQueueSize());
}
private QueueViewMBean getProxyToQueueViewMBean()
throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" +
queue.getQueueName() + ",BrokerName=localhost");
QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(
broker.getManagementContext().getMBeanServer(),
queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
private void createProducerAndSendMessages() throws Exception {
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue("test1");
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10000; i++) {
TextMessage message = session.createTextMessage("message "+i);
producer.send(message);
}
producer.close();
}
private void createConsumer() throws Exception {
consumer = session.createConsumer(queue);
// wait for buffer fill out
Thread.sleep(5*1000);
for(int i = 0; i < 100; ++i) {
Message message = consumer.receive();
message.acknowledge();
}
}
}
{code}
What I'm finding is that failure is intermittent, but I am able to see it once in a while:
{panel}
junit.framework.AssertionFailedError: Queue size is not zero expected:<0> but was:<-60>
at junit.framework.Assert.fail(Assert.java:47)
at junit.framework.Assert.failNotEquals(Assert.java:282)
at junit.framework.Assert.assertEquals(Assert.java:64)
at junit.framework.Assert.assertEquals(Assert.java:136)
at org.apache.activemq.broker.region.QueuePurgeTest.testPurgeQueueWithActiveConsumer(QueuePurgeTest.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:585)
at junit.framework.TestCase.runTest(TestCase.java:154)
at junit.framework.TestCase.runBare(TestCase.java:127)
at junit.framework.TestResult$1.protect(TestResult.java:106)
at junit.framework.TestResult.runProtected(TestResult.java:124)
at junit.framework.TestResult.run(TestResult.java:109)
at junit.framework.TestCase.run(TestCase.java:118)
at junit.framework.TestSuite.runTest(TestSuite.java:208)
at junit.framework.TestSuite.run(TestSuite.java:203)
at org.eclipse.jdt.internal.junit.runner.junit3.JUnit3TestReference.run(JUnit3TestReference.java:130)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)
{panel}
> Negative queue size (reproducible)
> ----------------------------------
>
> Key: AMQ-1940
> URL: https://issues.apache.org/activemq/browse/AMQ-1940
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.2.0
> Environment: Found on Windows but reproduced under Linux
> Reporter: Vadim Chekan
> Priority: Critical
> Attachments: Main.java
>
>
> When you "purge" queue from web admin console, it zeroes queue message
> counter. But if you have an active consumer at that time which
> pre-fetched messages than your consumer will keep sending ack as it
> process messages from its buffer. ActiveMQ will keep decrement counter
> upon receiving each ack. So when consumer is done queue will show
> MINUS<consumer buffer size>.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.