You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/05/26 10:49:11 UTC
svn commit: r778622 - in
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors:
CursorSupport.java NegativeQueueTest.java
Author: rajdavies
Date: Tue May 26 08:49:11 2009
New Revision: 778622
URL: http://svn.apache.org/viewvc?rev=778622&view=rev
Log:
Added test case from Richard Yager for https://issues.apache.org/activemq/browse/AMQ-1918
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?rev=778622&r1=778621&r2=778622&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Tue May 26 08:49:11 2009
@@ -16,6 +16,11 @@
*/
package org.apache.activemq.broker.region.cursors;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -23,7 +28,6 @@
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -33,16 +37,6 @@
import javax.jms.Session;
import javax.jms.TextMessage;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerTest;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
/**
* @version $Revision: 1.3 $
*/
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=778622&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java Tue May 26 08:49:11 2009
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.StoreUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.TempUsage;
+
+/**
+ * Modified CursorSupport Unit test to reproduce the negative queue issue.
+ *
+ * Keys to reproducing:
+ * 1) Consecutive queues with listener on first sending to second queue
+ * 2) Push each queue to the memory limit
+ * This seems to help reproduce the issue more consistently, but
+ * we have seen times in our production environment where the
+ * negative queue can occur without. Our memory limits are
+ * very high in production and it still happens in varying
+ * frequency.
+ * 3) Prefetch
+ * Lowering the prefetch down to 10 and below seems to help
+ * reduce occurrences.
+ * 4) # of consumers per queue
+ * The issue occurs less with fewer consumers
+ *
+ * Things that do not affect reproduction:
+ * 1) Spring - we use spring in our production applications, but this test case works
+ * with or without it.
+ * 2) transacted
+ *
+ */
+public class NegativeQueueTest extends TestCase {
+
+ public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
+
+ private static final String QUEUE_1_NAME = "conn.test.queue.1";
+ private static final String QUEUE_2_NAME = "conn.test.queue.2";
+
+ private static final long QUEUE_MEMORY_LIMIT = 2097152;
+ private static final long MEMORY_USAGE = 400000000;
+ private static final long TEMP_USAGE = 200000000;
+ private static final long STORE_USAGE = 1000000000;
+ private static final int MESSAGE_COUNT = 2000;
+
+ protected static final boolean TRANSACTED = true;
+ protected static final boolean DEBUG = false;
+ protected static int NUM_CONSUMERS = 20;
+ protected static int PREFETCH_SIZE = 1000;
+
+ protected BrokerService broker;
+ protected String bindAddress = "tcp://localhost:60706";
+
+ public void testWithDefaultPrefetch() throws Exception{
+ PREFETCH_SIZE = 1000;
+ NUM_CONSUMERS = 20;
+ blastAndConsume();
+ }
+
+ public void testWithDefaultPrefetchFiveConsumers() throws Exception{
+ PREFETCH_SIZE = 1000;
+ NUM_CONSUMERS = 5;
+ blastAndConsume();
+ }
+
+ public void testWithDefaultPrefetchTwoConsumers() throws Exception{
+ PREFETCH_SIZE = 1000;
+ NUM_CONSUMERS = 2;
+ blastAndConsume();
+ }
+
+ public void testWithDefaultPrefetchOneConsumer() throws Exception{
+ PREFETCH_SIZE = 1000;
+ NUM_CONSUMERS = 1;
+ blastAndConsume();
+ }
+
+ public void testWithMediumPrefetch() throws Exception{
+ PREFETCH_SIZE = 50;
+ NUM_CONSUMERS = 20;
+ blastAndConsume();
+ }
+
+ public void testWithSmallPrefetch() throws Exception{
+ PREFETCH_SIZE = 10;
+ NUM_CONSUMERS = 20;
+ blastAndConsume();
+ }
+
+ public void testWithNoPrefetch() throws Exception{
+ PREFETCH_SIZE = 1;
+ blastAndConsume();
+ }
+
+ public void blastAndConsume() throws Exception {
+ ConnectionFactory factory = createConnectionFactory();
+
+ //get proxy queues for statistics lookups
+ Connection proxyConnection = factory.createConnection();
+ proxyConnection.start();
+ Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_1_NAME));
+ final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_2_NAME));
+
+ // LOAD THE QUEUE
+ Connection producerConnection = factory.createConnection();
+ producerConnection.start();
+ Session session = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
+ Destination queue = session.createQueue(QUEUE_1_NAME);
+ MessageProducer producer = session.createProducer(queue);
+ List<TextMessage> senderList = new ArrayList<TextMessage>();
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ TextMessage msg = session.createTextMessage(formatter.format(new Date()));
+ senderList.add(msg);
+ producer.send(msg);
+ if(TRANSACTED) session.commit();
+ if(DEBUG && i%100 == 0){
+ int index = (i/100)+1;
+ System.out.print(index-((index/10)*10));
+ }
+ }
+
+ //get access to the Queue info
+ if(DEBUG){
+ System.out.println("");
+ System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize());
+ System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage());
+ System.out.println("Queue1 Memory Available = "+proxyQueue1.getMemoryLimit());
+ }
+
+ // FLUSH THE QUEUE
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+ Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS];
+ List<Message> consumerList1 = new ArrayList<Message>();
+ Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS];
+ Connection[] producerConnections2 = new Connection[NUM_CONSUMERS];
+ List<Message> consumerList2 = new ArrayList<Message>();
+
+ for(int ix=0; ix<NUM_CONSUMERS; ix++){
+ producerConnections2[ix] = factory.createConnection();
+ producerConnections2[ix].start();
+ consumerConnections1[ix] = getConsumerConnection(factory);
+ Session consumerSession = consumerConnections1[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME));
+ consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
+ }
+
+ latch1.await(300000, TimeUnit.MILLISECONDS);
+ if(DEBUG){
+ System.out.println("");
+ System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
+ System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage());
+ System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit());
+ }
+
+ for(int ix=0; ix<NUM_CONSUMERS; ix++){
+ consumerConnections2[ix] = getConsumerConnection(factory);
+ Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME));
+ consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2));
+ }
+
+ latch2.await(300000, TimeUnit.MILLISECONDS);
+ producerConnection.close();
+ for(int ix=0; ix<NUM_CONSUMERS; ix++){
+ consumerConnections1[ix].close();
+ consumerConnections2[ix].close();
+ producerConnections2[ix].close();
+ }
+
+ //let the consumer statistics on queue2 have time to update
+ Thread.sleep(500);
+
+ if(DEBUG){
+ System.out.println("");
+ System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize());
+ System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage());
+ System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
+ System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage());
+ }
+
+ assertEquals("Queue1 has gone negative,",0, proxyQueue1.getQueueSize());
+ assertEquals("Queue2 has gone negative,",0, proxyQueue2.getQueueSize());
+ proxyConnection.close();
+
+ }
+
+ private QueueViewMBean getProxyToQueueViewMBean(Queue queue)
+ throws MalformedObjectNameException, JMSException {
+
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" +
+ queue.getQueueName() + ",BrokerName=localhost");
+ QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(
+ broker.getManagementContext().getMBeanServer(),
+ queueViewMBeanName, QueueViewMBean.class, true);
+
+ return proxy;
+ }
+
+ protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
+ Connection connection = fac.createConnection();
+ connection.start();
+ return connection;
+ }
+
+ protected void setUp() throws Exception {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
+ Properties props = new Properties();
+ props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE);
+ props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE);
+ props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE);
+ cf.setProperties(props);
+ return cf;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ configureBroker(answer);
+ answer.start();
+ return answer;
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception {
+ PolicyEntry policy = new PolicyEntry();
+ policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
+ policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ answer.setDestinationPolicy(pMap);
+ answer.setDeleteAllMessagesOnStartup(true);
+ answer.addConnector(bindAddress);
+
+ MemoryUsage memoryUsage = new MemoryUsage();
+ memoryUsage.setLimit(MEMORY_USAGE);
+ memoryUsage.setPercentUsageMinDelta(20);
+
+ TempUsage tempUsage = new TempUsage();
+ tempUsage.setLimit(TEMP_USAGE);
+
+ StoreUsage storeUsage = new StoreUsage();
+ storeUsage.setLimit(STORE_USAGE);
+
+ SystemUsage systemUsage = new SystemUsage();
+ systemUsage.setMemoryUsage(memoryUsage);
+ systemUsage.setTempUsage(tempUsage);
+ systemUsage.setStoreUsage(storeUsage);
+ answer.setSystemUsage(systemUsage);
+ }
+
+ /**
+ * Message listener that is given the Session for transacted consumers
+ */
+ class SessionAwareMessageListener implements MessageListener{
+ private List<Message> consumerList;
+ private CountDownLatch latch;
+ private Session consumerSession;
+ private Session producerSession;
+ private MessageProducer producer;
+
+ public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList){
+ this(null, consumerSession, null, latch, consumerList);
+ }
+
+ public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName,
+ CountDownLatch latch, List<Message> consumerList){
+ this.consumerList = consumerList;
+ this.latch = latch;
+ this.consumerSession = consumerSession;
+
+ if(producerConnection != null){
+ try {
+ producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
+ Destination queue = producerSession.createQueue(outQueueName);
+ producer = producerSession.createProducer(queue);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void onMessage(Message msg) {
+ try {
+ if(producer == null){
+ // sleep to act as a slow consumer
+ // which will force a mix of direct and polled dispatching
+ // using the cursor on the broker
+ Thread.sleep(50);
+ }else{
+ producer.send(msg);
+ if(TRANSACTED) producerSession.commit();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ synchronized(consumerList){
+ consumerList.add(msg);
+ if(DEBUG && consumerList.size()%100 == 0) {
+ int index = consumerList.size()/100;
+ System.out.print(index-((index/10)*10));
+ }
+ if (consumerList.size() == MESSAGE_COUNT) {
+ latch.countDown();
+ }
+ }
+ if(TRANSACTED){
+ try {
+ consumerSession.commit();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain