You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Hiram Chirino (JIRA)" <ji...@apache.org> on 2010/09/23 17:27:40 UTC
[jira] Updated: (AMQ-2937) kahadb log files not deleted when
exceptions are thrown by consumers
[ https://issues.apache.org/activemq/browse/AMQ-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hiram Chirino updated AMQ-2937:
-------------------------------
Description:
kahadb log files are not being cleaned up, and are building up over a number of days. I've created a simple test that can reproduce the problem, and it only reproduces the problem if my message consumer throws errors (in the test case they are thrown 1 in 100 times).
The messages are re-consumed, and none of them are added to a DLQ. I can provide the test code if this helps. I'm using AMQ v5.4.0, persistent queues, and using AMQ embedded with Spring.
Test code:
(The test needs to be kept running until several log files have been created)
activemq.xml
{code}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="amq-broker"
useJmx="true"
destroyApplicationContextOnStop="true"
useShutdownHook="true">
<persistenceAdapter>
<kahaDB
directory="/data"
indexWriteBatchSize="1000"
enableIndexWriteAsync="true"
enableJournalDiskSyncs="false" />
</persistenceAdapter>
<transportConnectors>
<transportConnector uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />
</transportConnectors>
<!-- JMX active -->
<managementContext>
<managementContext createConnector="true" />
</managementContext>
<plugins>
<statisticsBrokerPlugin />
</plugins>
</broker>
</beans>
{code}
'Unit' Test (long running test...)
{code}
public class AmqKahaDbTest extends IntegrationTestBase {
public void testKahaDbLogs() throws Exception {
for(int i = 0; i < 1000000; i++) {
sendMessage(i);
Thread.sleep(20);
}
}
private void sendMessage(final int i) {
jmsTemplate.send(testDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("testField1", "Test Message " + i);
return message;
}
});
}
private JmsTemplate jmsTemplate;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
private Destination testDestination;
public void setTestDestination(Destination testDestination) {
this.testDestination = testDestination;
}
}
{code}
Message listener:
{code}
public class TestMessageListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof MapMessage) {
//read the type of subtask
MapMessage mapMessage = (MapMessage)message;
String messageText;
try {
messageText = mapMessage.getString("testField1");
long sleepTime = (long)(Math.random() * 100d);
log.info("Test message consume start, sleep time: " + sleepTime);
Thread.sleep(sleepTime);
if(sleepTime == 50) {
//if commented out then the old log files are deleted
throw new RuntimeException("Random error!!!");
}
log.info("Test message consume start: " + messageText);
} catch (Exception e) {
log.error("Error consuming message", e);
throw new RuntimeException(e);
}
} else {
throw new IllegalArgumentException("Message must be of type Test");
}
}
private Logger log = Logger.getLogger(TestMessageListener.class);
}
{code}
Spring config:
{code}
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:activemq.xml" />
<property name="start" value="true" />
</bean>
<amq:connectionFactory id="amqConnectionFactory">
<property name="brokerURL" value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
</amq:connectionFactory>
<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="testDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="test.queue" />
</bean>
<bean id="testListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="testDestination"/>
<property name="messageListener" ref="testMessageListener"/>
<property name="sessionTransacted" value="true"/>
<property name="maxConcurrentConsumers" value="10" />
<property name="exceptionListener" ref="jmsExceptionListener" />
</bean>
<bean id="testMessageListener" class="TestMessageListener" />
{code}
was:
kahadb log files are not being cleaned up, and are building up over a number of days. I've created a simple test that can reproduce the problem, and it only reproduces the problem if my message consumer throws errors (in the test case they are thrown 1 in 100 times).
The messages are re-consumed, and none of them are added to a DLQ. I can provide the test code if this helps. I'm using AMQ v5.4.0, persistent queues, and using AMQ embedded with Spring.
Test code:
(The test needs to be kept running until several log files have been created)
activemq.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="amq-broker"
useJmx="true"
destroyApplicationContextOnStop="true"
useShutdownHook="true">
<persistenceAdapter>
<kahaDB
directory="/data"
indexWriteBatchSize="1000"
enableIndexWriteAsync="true"
enableJournalDiskSyncs="false" />
</persistenceAdapter>
<transportConnectors>
<transportConnector uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />
</transportConnectors>
<!-- JMX active -->
<managementContext>
<managementContext createConnector="true" />
</managementContext>
<plugins>
<statisticsBrokerPlugin />
</plugins>
</broker>
</beans>
'Unit' Test (long running test...)
public class AmqKahaDbTest extends IntegrationTestBase {
public void testKahaDbLogs() throws Exception {
for(int i = 0; i < 1000000; i++) {
sendMessage(i);
Thread.sleep(20);
}
}
private void sendMessage(final int i) {
jmsTemplate.send(testDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("testField1", "Test Message " + i);
return message;
}
});
}
private JmsTemplate jmsTemplate;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
private Destination testDestination;
public void setTestDestination(Destination testDestination) {
this.testDestination = testDestination;
}
}
Message listener:
public class TestMessageListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof MapMessage) {
//read the type of subtask
MapMessage mapMessage = (MapMessage)message;
String messageText;
try {
messageText = mapMessage.getString("testField1");
long sleepTime = (long)(Math.random() * 100d);
log.info("Test message consume start, sleep time: " + sleepTime);
Thread.sleep(sleepTime);
if(sleepTime == 50) {
//if commented out then the old log files are deleted
throw new RuntimeException("Random error!!!");
}
log.info("Test message consume start: " + messageText);
} catch (Exception e) {
log.error("Error consuming message", e);
throw new RuntimeException(e);
}
} else {
throw new IllegalArgumentException("Message must be of type Test");
}
}
private Logger log = Logger.getLogger(TestMessageListener.class);
}
Spring config:
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:activemq.xml" />
<property name="start" value="true" />
</bean>
<amq:connectionFactory id="amqConnectionFactory">
<property name="brokerURL" value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
</amq:connectionFactory>
<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="testDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="test.queue" />
</bean>
<bean id="testListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="testDestination"/>
<property name="messageListener" ref="testMessageListener"/>
<property name="sessionTransacted" value="true"/>
<property name="maxConcurrentConsumers" value="10" />
<property name="exceptionListener" ref="jmsExceptionListener" />
</bean>
<bean id="testMessageListener" class="TestMessageListener" />
adding wiki markup to preserve code formating.
> kahadb log files not deleted when exceptions are thrown by consumers
> --------------------------------------------------------------------
>
> Key: AMQ-2937
> URL: https://issues.apache.org/activemq/browse/AMQ-2937
> Project: ActiveMQ
> Issue Type: Bug
> Components: Message Store
> Affects Versions: 5.4.1
> Environment: OSX 10.5.8, Spring 2.5.6
> Reporter: James Mason
>
> kahadb log files are not being cleaned up, and are building up over a number of days. I've created a simple test that can reproduce the problem, and it only reproduces the problem if my message consumer throws errors (in the test case they are thrown 1 in 100 times).
> The messages are re-consumed, and none of them are added to a DLQ. I can provide the test code if this helps. I'm using AMQ v5.4.0, persistent queues, and using AMQ embedded with Spring.
> Test code:
> (The test needs to be kept running until several log files have been created)
> activemq.xml
> {code}
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
> http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
> http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
> <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="amq-broker"
> useJmx="true"
> destroyApplicationContextOnStop="true"
> useShutdownHook="true">
>
> <persistenceAdapter>
> <kahaDB
> directory="/data"
> indexWriteBatchSize="1000"
> enableIndexWriteAsync="true"
> enableJournalDiskSyncs="false" />
> </persistenceAdapter>
> <transportConnectors>
> <transportConnector uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />
> </transportConnectors>
> <!-- JMX active -->
> <managementContext>
> <managementContext createConnector="true" />
> </managementContext>
> <plugins>
> <statisticsBrokerPlugin />
> </plugins>
> </broker>
> </beans>
> {code}
> 'Unit' Test (long running test...)
> {code}
> public class AmqKahaDbTest extends IntegrationTestBase {
> public void testKahaDbLogs() throws Exception {
> for(int i = 0; i < 1000000; i++) {
> sendMessage(i);
>
> Thread.sleep(20);
> }
> }
>
> private void sendMessage(final int i) {
> jmsTemplate.send(testDestination, new MessageCreator() {
> public Message createMessage(Session session) throws JMSException {
> MapMessage message = session.createMapMessage();
>
> message.setString("testField1", "Test Message " + i);
>
> return message;
> }
> });
> }
>
> private JmsTemplate jmsTemplate;
> public void setJmsTemplate(JmsTemplate jmsTemplate) {
> this.jmsTemplate = jmsTemplate;
> }
>
> private Destination testDestination;
> public void setTestDestination(Destination testDestination) {
> this.testDestination = testDestination;
> }
> }
> {code}
> Message listener:
> {code}
> public class TestMessageListener implements MessageListener {
>
> public void onMessage(Message message) {
> if (message instanceof MapMessage) {
>
> //read the type of subtask
> MapMessage mapMessage = (MapMessage)message;
> String messageText;
> try {
> messageText = mapMessage.getString("testField1");
> long sleepTime = (long)(Math.random() * 100d);
>
> log.info("Test message consume start, sleep time: " + sleepTime);
>
> Thread.sleep(sleepTime);
>
> if(sleepTime == 50) {
> //if commented out then the old log files are deleted
> throw new RuntimeException("Random error!!!");
> }
>
> log.info("Test message consume start: " + messageText);
>
> } catch (Exception e) {
> log.error("Error consuming message", e);
> throw new RuntimeException(e);
> }
>
> } else {
> throw new IllegalArgumentException("Message must be of type Test");
> }
> }
>
> private Logger log = Logger.getLogger(TestMessageListener.class);
> }
> {code}
> Spring config:
> {code}
> <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
> <property name="config" value="classpath:activemq.xml" />
> <property name="start" value="true" />
> </bean>
> <amq:connectionFactory id="amqConnectionFactory">
> <property name="brokerURL" value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
> </amq:connectionFactory>
>
> <!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
> <constructor-arg ref="amqConnectionFactory" />
> <property name="exceptionListener" ref="jmsExceptionListener" />
> <property name="sessionCacheSize" value="100" />
> </bean>
> <bean id="testDestination" class="org.apache.activemq.command.ActiveMQQueue">
> <constructor-arg index="0" value="test.queue" />
> </bean>
>
> <bean id="testListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
> <property name="connectionFactory" ref="connectionFactory"/>
> <property name="destination" ref="testDestination"/>
> <property name="messageListener" ref="testMessageListener"/>
> <property name="sessionTransacted" value="true"/>
> <property name="maxConcurrentConsumers" value="10" />
> <property name="exceptionListener" ref="jmsExceptionListener" />
> </bean>
>
> <bean id="testMessageListener" class="TestMessageListener" />
> {code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.