You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Matt Pavlovich (Jira)" <ji...@apache.org> on 2021/03/03 20:51:00 UTC

[jira] [Closed] (AMQ-4109) Negative queue counters

     [ https://issues.apache.org/jira/browse/AMQ-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matt Pavlovich closed AMQ-4109.
-------------------------------
    Resolution: Auto Closed

Closed due to inactivity

> Negative queue counters
> -----------------------
>
>                 Key: AMQ-4109
>                 URL: https://issues.apache.org/jira/browse/AMQ-4109
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.7.0
>            Reporter: Markus Hahn
>            Assignee: Matt Pavlovich
>            Priority: Major
>              Labels: close-pending
>
> http://{server}:8161/admin/queues.jsp
> I get negative numbers in the "Number Of Pending Messages". Running a simple PTP scenario, but am purging the queue right in the middle of the action.
> Below's the experiment. 5.7.0 was run out of the box on a CentOS 6.3 with Java 6.
> _________________________________________________________________
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnection;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class PTPTest {
> 	//static String _url = "failover://tcp://activemqtest:61616";
> 	static String _url = ActiveMQConnection.DEFAULT_BROKER_URL;
> 	
> 	public void enqueue() throws Exception {
> 		
> 		Connection connection = null;
> 		try 
> 		{
> 			System.out.println("enqueuing...");
> 			
> 			ConnectionFactory connectionFactory =
> 	             new ActiveMQConnectionFactory(_url);
> 			connection = connectionFactory.createConnection();
> 	        connection.start();
> 	        Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
> 	        //Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> 	        Destination dest = s.createQueue("Q2");
> 	        MessageProducer mp = s.createProducer(dest);
> 	        mp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
> 	        for(int num = 1; num <= 100;num++) {
> 	        	String txt = "this is message #" + num;
> 		        TextMessage tmsg = s.createTextMessage(txt);
> 		        mp.send(tmsg);
> 		        Thread.sleep(10);
> 		        //System.out.printf(">>> %s\n", txt);	
> 	        }
> 	    }
> 		finally {
> 	         if (null != connection) {
> 	        	 connection.close();
> 	         }
> 		}
> 	}
> 	
> 	public void dequeue(int id) throws Exception {
> 		
> 		Connection connection = null;
> 		
> 		try {
> 			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_url);
> 			connection = connectionFactory.createConnection();
> 			connection.start();
> 			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> 			Destination destination = session.createQueue("Q2");
> 			MessageConsumer consumer = session.createConsumer(destination);
> 			for(;;) {
> 				Message msg = consumer.receive();
> 				TextMessage tmsg = (TextMessage)msg;
> 				String txt = tmsg.getText();
> 				System.out.printf("<<< [%d] '%s', %s, %s\n", 
> 						id, txt, 
> 						tmsg.getJMSMessageID(), 
> 						tmsg.getJMSDestination());
> 				Thread.sleep(1000);
> 			}
> 		}
> 		finally {
> 			connection.close();
> 		}
> 	}
> 	
> 	public void exec() throws Exception {
> 		Thread ethrd, dthrds[];
> 		ethrd = new Thread() {
> 			public void run() {
> 				try {
> 					enqueue();
> 				}
> 				catch (Exception e) {
> 					e.printStackTrace();
> 				}
> 			}
> 		};
> 		ethrd.start();
> 		Thread.sleep(1000);
> 		final int D_COUNT = 4;
> 		dthrds = new Thread[D_COUNT];
> 		for (int i = 0; i < dthrds.length; i++) {
> 			final int ii = i;
> 			dthrds[i] = new Thread() {
> 				public void run() {
> 					try {
> 						dequeue(ii);
> 					}
> 					catch (Exception e) {
> 						e.printStackTrace();
> 					}
> 				}
> 			};
> 		}
> 		for (Thread dthrd : dthrds) {
> 			dthrd.start();
> 		}
> 		
> 		Thread.sleep(60000);
> 			
> 		ethrd.interrupt();
> 		ethrd.join();
> 		for (Thread dthrd : dthrds) {
> 			dthrd.interrupt();
> 			dthrd.join();
> 		} 
> 	}
> 	
> 	public static void main(String[] args) throws Exception {
> 		new PTPTest().exec();
> 		System.out.println("\nDONE.");
> 	}
> }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)