You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Hiram Chirino (JIRA)" <ji...@apache.org> on 2010/09/23 17:27:40 UTC

[jira] Updated: (AMQ-2937) kahadb log files not deleted when exceptions are thrown by consumers

     [ https://issues.apache.org/activemq/browse/AMQ-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hiram Chirino updated AMQ-2937:
-------------------------------

    Description: 
kahadb log files are not being cleaned up, and are building up over a number of days. I've created a simple test that can reproduce the problem, and it only reproduces the problem if my message consumer throws errors (in the test case they are thrown 1 in 100 times). 

The messages are re-consumed, and none of them are added to a DLQ. I can provide the test code if this helps. I'm using AMQ v5.4.0, persistent queues, and using AMQ embedded with Spring. 

Test code:
(The test needs to be kept running until several log files have been created)


activemq.xml
{code}
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

	<broker xmlns="http://activemq.apache.org/schema/core"
		brokerName="amq-broker" 
		useJmx="true" 
		destroyApplicationContextOnStop="true"
		useShutdownHook="true">
		
		<persistenceAdapter>
			<kahaDB
				directory="/data"
				indexWriteBatchSize="1000" 
				enableIndexWriteAsync="true"
				enableJournalDiskSyncs="false" />
		</persistenceAdapter>

		<transportConnectors>
			<transportConnector uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />
		</transportConnectors>

		<!-- JMX active -->
		<managementContext>
			<managementContext createConnector="true" />
		</managementContext>

		<plugins>
			<statisticsBrokerPlugin />
		</plugins>
	</broker>
</beans>
{code}



'Unit' Test (long running test...)

{code}
public class AmqKahaDbTest extends IntegrationTestBase {

	public void testKahaDbLogs() throws Exception {
		for(int i = 0; i < 1000000; i++) {
			sendMessage(i);
			
			Thread.sleep(20);
		}
	}
	
	private void sendMessage(final int i) {
		jmsTemplate.send(testDestination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				MapMessage message = session.createMapMessage();
				
				message.setString("testField1", "Test Message " + i);
				
				return message;
			}
		});
	}
	
	private JmsTemplate jmsTemplate;
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	
	private Destination testDestination;
	public void setTestDestination(Destination testDestination) {
		this.testDestination = testDestination;
	}
}
{code}




Message listener:
{code}
public class TestMessageListener implements MessageListener {
	
	public void onMessage(Message message) {
        if (message instanceof MapMessage) {
        	
	        	//read the type of subtask
	        	MapMessage mapMessage = (MapMessage)message;
	        	String messageText;
				try {
					messageText = mapMessage.getString("testField1");
					long sleepTime = (long)(Math.random() * 100d);
					
					log.info("Test message consume start, sleep time: " + sleepTime);
					
					Thread.sleep(sleepTime);
					
					if(sleepTime == 50) {
						//if commented out then the old log files are deleted
						throw new RuntimeException("Random error!!!");
					}
					
					log.info("Test message consume start: " + messageText);
					
				} catch (Exception e) {
					log.error("Error consuming message", e);
					throw new RuntimeException(e);
				}
	        	
    	} else {
            throw new IllegalArgumentException("Message must be of type Test");
        }
    }
	
	private Logger log = Logger.getLogger(TestMessageListener.class);
}
{code}



Spring config:
{code}
	<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
	    <property name="config" value="classpath:activemq.xml" />
	    <property name="start" value="true" />
	</bean>

	<amq:connectionFactory id="amqConnectionFactory">
		<property name="brokerURL" value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
	</amq:connectionFactory>
	
	<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
	    <constructor-arg ref="amqConnectionFactory" />
	    <property name="exceptionListener" ref="jmsExceptionListener" />
	    <property name="sessionCacheSize" value="100" />
	</bean>

	<bean id="testDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="test.queue" />
	</bean>
	
	<bean id="testListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	    <property name="connectionFactory" ref="connectionFactory"/>
	    <property name="destination" ref="testDestination"/>
	    <property name="messageListener" ref="testMessageListener"/>
	    <property name="sessionTransacted" value="true"/>
	    <property name="maxConcurrentConsumers" value="10" />
	    <property name="exceptionListener" ref="jmsExceptionListener" />
	</bean>
	
	<bean id="testMessageListener" class="TestMessageListener" />
{code}



  was:
kahadb log files are not being cleaned up, and are building up over a number of days. I've created a simple test that can reproduce the problem, and it only reproduces the problem if my message consumer throws errors (in the test case they are thrown 1 in 100 times). 

The messages are re-consumed, and none of them are added to a DLQ. I can provide the test code if this helps. I'm using AMQ v5.4.0, persistent queues, and using AMQ embedded with Spring. 

Test code:
(The test needs to be kept running until several log files have been created)


activemq.xml
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

	<broker xmlns="http://activemq.apache.org/schema/core"
		brokerName="amq-broker" 
		useJmx="true" 
		destroyApplicationContextOnStop="true"
		useShutdownHook="true">
		
		<persistenceAdapter>
			<kahaDB
				directory="/data"
				indexWriteBatchSize="1000" 
				enableIndexWriteAsync="true"
				enableJournalDiskSyncs="false" />
		</persistenceAdapter>

		<transportConnectors>
			<transportConnector uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />
		</transportConnectors>

		<!-- JMX active -->
		<managementContext>
			<managementContext createConnector="true" />
		</managementContext>

		<plugins>
			<statisticsBrokerPlugin />
		</plugins>
	</broker>
</beans>




'Unit' Test (long running test...)

public class AmqKahaDbTest extends IntegrationTestBase {

	public void testKahaDbLogs() throws Exception {
		for(int i = 0; i < 1000000; i++) {
			sendMessage(i);
			
			Thread.sleep(20);
		}
	}
	
	private void sendMessage(final int i) {
		jmsTemplate.send(testDestination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				MapMessage message = session.createMapMessage();
				
				message.setString("testField1", "Test Message " + i);
				
				return message;
			}
		});
	}
	
	private JmsTemplate jmsTemplate;
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	
	private Destination testDestination;
	public void setTestDestination(Destination testDestination) {
		this.testDestination = testDestination;
	}
}





Message listener:

public class TestMessageListener implements MessageListener {
	
	public void onMessage(Message message) {
        if (message instanceof MapMessage) {
        	
	        	//read the type of subtask
	        	MapMessage mapMessage = (MapMessage)message;
	        	String messageText;
				try {
					messageText = mapMessage.getString("testField1");
					long sleepTime = (long)(Math.random() * 100d);
					
					log.info("Test message consume start, sleep time: " + sleepTime);
					
					Thread.sleep(sleepTime);
					
					if(sleepTime == 50) {
						//if commented out then the old log files are deleted
						throw new RuntimeException("Random error!!!");
					}
					
					log.info("Test message consume start: " + messageText);
					
				} catch (Exception e) {
					log.error("Error consuming message", e);
					throw new RuntimeException(e);
				}
	        	
    	} else {
            throw new IllegalArgumentException("Message must be of type Test");
        }
    }
	
	private Logger log = Logger.getLogger(TestMessageListener.class);
}




Spring config:

	<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
	    <property name="config" value="classpath:activemq.xml" />
	    <property name="start" value="true" />
	</bean>

	<amq:connectionFactory id="amqConnectionFactory">
		<property name="brokerURL" value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
	</amq:connectionFactory>
	
	<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
	    <constructor-arg ref="amqConnectionFactory" />
	    <property name="exceptionListener" ref="jmsExceptionListener" />
	    <property name="sessionCacheSize" value="100" />
	</bean>

	<bean id="testDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="test.queue" />
	</bean>
	
	<bean id="testListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	    <property name="connectionFactory" ref="connectionFactory"/>
	    <property name="destination" ref="testDestination"/>
	    <property name="messageListener" ref="testMessageListener"/>
	    <property name="sessionTransacted" value="true"/>
	    <property name="maxConcurrentConsumers" value="10" />
	    <property name="exceptionListener" ref="jmsExceptionListener" />
	</bean>
	
	<bean id="testMessageListener" class="TestMessageListener" />





adding wiki markup to preserve code formating.

> kahadb log files not deleted when exceptions are thrown by consumers
> --------------------------------------------------------------------
>
>                 Key: AMQ-2937
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2937
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Message Store
>    Affects Versions: 5.4.1
>         Environment: OSX 10.5.8, Spring 2.5.6
>            Reporter: James Mason
>
> kahadb log files are not being cleaned up, and are building up over a number of days. I've created a simple test that can reproduce the problem, and it only reproduces the problem if my message consumer throws errors (in the test case they are thrown 1 in 100 times). 
> The messages are re-consumed, and none of them are added to a DLQ. I can provide the test code if this helps. I'm using AMQ v5.4.0, persistent queues, and using AMQ embedded with Spring. 
> Test code:
> (The test needs to be kept running until several log files have been created)
> activemq.xml
> {code}
> <beans xmlns="http://www.springframework.org/schema/beans"
> 	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> 	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
>   http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
> 	<broker xmlns="http://activemq.apache.org/schema/core"
> 		brokerName="amq-broker" 
> 		useJmx="true" 
> 		destroyApplicationContextOnStop="true"
> 		useShutdownHook="true">
> 		
> 		<persistenceAdapter>
> 			<kahaDB
> 				directory="/data"
> 				indexWriteBatchSize="1000" 
> 				enableIndexWriteAsync="true"
> 				enableJournalDiskSyncs="false" />
> 		</persistenceAdapter>
> 		<transportConnectors>
> 			<transportConnector uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />
> 		</transportConnectors>
> 		<!-- JMX active -->
> 		<managementContext>
> 			<managementContext createConnector="true" />
> 		</managementContext>
> 		<plugins>
> 			<statisticsBrokerPlugin />
> 		</plugins>
> 	</broker>
> </beans>
> {code}
> 'Unit' Test (long running test...)
> {code}
> public class AmqKahaDbTest extends IntegrationTestBase {
> 	public void testKahaDbLogs() throws Exception {
> 		for(int i = 0; i < 1000000; i++) {
> 			sendMessage(i);
> 			
> 			Thread.sleep(20);
> 		}
> 	}
> 	
> 	private void sendMessage(final int i) {
> 		jmsTemplate.send(testDestination, new MessageCreator() {
> 			public Message createMessage(Session session) throws JMSException {
> 				MapMessage message = session.createMapMessage();
> 				
> 				message.setString("testField1", "Test Message " + i);
> 				
> 				return message;
> 			}
> 		});
> 	}
> 	
> 	private JmsTemplate jmsTemplate;
> 	public void setJmsTemplate(JmsTemplate jmsTemplate) {
> 		this.jmsTemplate = jmsTemplate;
> 	}
> 	
> 	private Destination testDestination;
> 	public void setTestDestination(Destination testDestination) {
> 		this.testDestination = testDestination;
> 	}
> }
> {code}
> Message listener:
> {code}
> public class TestMessageListener implements MessageListener {
> 	
> 	public void onMessage(Message message) {
>         if (message instanceof MapMessage) {
>         	
> 	        	//read the type of subtask
> 	        	MapMessage mapMessage = (MapMessage)message;
> 	        	String messageText;
> 				try {
> 					messageText = mapMessage.getString("testField1");
> 					long sleepTime = (long)(Math.random() * 100d);
> 					
> 					log.info("Test message consume start, sleep time: " + sleepTime);
> 					
> 					Thread.sleep(sleepTime);
> 					
> 					if(sleepTime == 50) {
> 						//if commented out then the old log files are deleted
> 						throw new RuntimeException("Random error!!!");
> 					}
> 					
> 					log.info("Test message consume start: " + messageText);
> 					
> 				} catch (Exception e) {
> 					log.error("Error consuming message", e);
> 					throw new RuntimeException(e);
> 				}
> 	        	
>     	} else {
>             throw new IllegalArgumentException("Message must be of type Test");
>         }
>     }
> 	
> 	private Logger log = Logger.getLogger(TestMessageListener.class);
> }
> {code}
> Spring config:
> {code}
> 	<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
> 	    <property name="config" value="classpath:activemq.xml" />
> 	    <property name="start" value="true" />
> 	</bean>
> 	<amq:connectionFactory id="amqConnectionFactory">
> 		<property name="brokerURL" value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
> 	</amq:connectionFactory>
> 	
> 	<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
> 	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
> 	    <constructor-arg ref="amqConnectionFactory" />
> 	    <property name="exceptionListener" ref="jmsExceptionListener" />
> 	    <property name="sessionCacheSize" value="100" />
> 	</bean>
> 	<bean id="testDestination" class="org.apache.activemq.command.ActiveMQQueue">
> 		<constructor-arg index="0" value="test.queue" />
> 	</bean>
> 	
> 	<bean id="testListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
> 	    <property name="connectionFactory" ref="connectionFactory"/>
> 	    <property name="destination" ref="testDestination"/>
> 	    <property name="messageListener" ref="testMessageListener"/>
> 	    <property name="sessionTransacted" value="true"/>
> 	    <property name="maxConcurrentConsumers" value="10" />
> 	    <property name="exceptionListener" ref="jmsExceptionListener" />
> 	</bean>
> 	
> 	<bean id="testMessageListener" class="TestMessageListener" />
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.