You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Czeslaw (JIRA)" <ji...@apache.org> on 2016/08/04 10:31:20 UTC

[jira] [Comment Edited] (AMQ-6067) OutOfMemoryError when expiring big amount of topic messages

    [ https://issues.apache.org/jira/browse/AMQ-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15407533#comment-15407533 ] 

Czeslaw edited comment on AMQ-6067 at 8/4/16 10:30 AM:
-------------------------------------------------------

Hi,
I see under debugger that for version 5.13.2 in org.apache.activemq.broker.region.Topic the expireMessagesTask does not control check max expire page size which is passed as a parameter.

private final Runnable expireMessagesTask = new Runnable() {
    @Override
    public void run() {
        List<Message> browsedMessages = new InsertionCountList<Message>();
        doBrowse(browsedMessages, getMaxExpirePageSize());
    }
};


In my case I have default settings for max expire page size eq 400. In doBrowse method are collected all messages belongs to topic into browseList and never is checked max ( or called hasSpace internal method).

private void doBrowse(final List<Message> browseList, final int max) {
    try {
        if (topicStore != null) {
            final List<Message> toExpire = new ArrayList<Message>();
            topicStore.recover(new MessageRecoveryListener() {
                @Override
                public boolean recoverMessage(Message message) throws Exception {
                    if (message.isExpired()) {
                        toExpire.add(message);
                    }
                    browseList.add(message);
                    return true;
                }

                @Override
                public boolean recoverMessageReference(MessageId messageReference) throws Exception {
                    return true;
                }

                @Override
                public boolean hasSpace() {
                    return browseList.size() < max;
                }

                @Override
                public boolean isDuplicate(MessageId id) {
                    return false;
                }
            });
            final ConnectionContext connectionContext = createConnectionContext();
   ...
}
 
In other words if you have 1 000 000 messages under ACTIVEMQ_MSGS table for same topic all messages will be collected into memory ( browseList) a this can case OutOfMemoryError.



was (Author: cszczotka):
Hi,
I see under debugger that for version 5.13.2 in org.apache.activemq.broker.region.Topic the expireMessagesTask does not control check max expire page size which is passed as a parameter.

private final Runnable expireMessagesTask = new Runnable() {
    @Override
    public void run() {
        List<Message> browsedMessages = new InsertionCountList<Message>();
        doBrowse(browsedMessages, getMaxExpirePageSize());
    }
};


In my case I have default settings for max expire page size eq 400. In doBrowse method are collected all messages belongs to topic into browseList and never is checked max ( or call hasSpace internal method).

private void doBrowse(final List<Message> browseList, final int max) {
    try {
        if (topicStore != null) {
            final List<Message> toExpire = new ArrayList<Message>();
            topicStore.recover(new MessageRecoveryListener() {
                @Override
                public boolean recoverMessage(Message message) throws Exception {
                    if (message.isExpired()) {
                        toExpire.add(message);
                    }
                    browseList.add(message);
                    return true;
                }

                @Override
                public boolean recoverMessageReference(MessageId messageReference) throws Exception {
                    return true;
                }

                @Override
                public boolean hasSpace() {
                    return browseList.size() < max;
                }

                @Override
                public boolean isDuplicate(MessageId id) {
                    return false;
                }
            });
            final ConnectionContext connectionContext = createConnectionContext();
   ...
}
 
In other words if you have 1 000 000 messages under ACTIVEMQ_MSGS table for same topic all messages will be collected into memory ( browseList) a this can case OutOfMemoryError.


> OutOfMemoryError when expiring big amount of topic messages
> -----------------------------------------------------------
>
>                 Key: AMQ-6067
>                 URL: https://issues.apache.org/jira/browse/AMQ-6067
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JDBC
>    Affects Versions: 5.10.0
>            Reporter: Petr Havránek
>              Labels: durable, durable_subscription, expiration, jdbc, timeToLive,
>
> There is a problem in
> {noformat}
> org.apache.activemq.broker.region.Topic.expireMessagesTask
> {noformat}
> When there are big amount of topic messages that are going to be expired, this {{expireMessagesTask}} loads all of the messages to memory. This causes
> {noformat}
> 2015-11-24 11:05:46.359 WARN  [ActiveMQ Broker[JmsEngineActivemqBroker] Scheduler] [Topic] Failed to browse Topic: test-topic
> java.lang.OutOfMemoryError: Java heap space
> 	at oracle.sql.BLOB.getBytes(BLOB.java:204)
> 	at oracle.jdbc.driver.T4CBlobAccessor.getBytes(T4CBlobAccessor.java:464)
> 	at oracle.jdbc.driver.OracleResultSetImpl.getBytes(OracleResultSetImpl.java:676)
> 	at org.apache.commons.dbcp.DelegatingResultSet.getBytes(DelegatingResultSet.java:203)
> 	at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.getBinaryData(DefaultJDBCAdapter.java:80)
> 	at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecover(DefaultJDBCAdapter.java:418)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
> 	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
> 	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
> 	at org.springframework.aop.interceptor.AbstractTraceInterceptor.invoke(AbstractTraceInterceptor.java:113)
> 	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
> 	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
> 	at $Proxy14.doRecover(Unknown Source)
> 	at org.apache.activemq.store.jdbc.JDBCMessageStore.recover(JDBCMessageStore.java:236)
> 	at org.apache.activemq.store.ProxyTopicMessageStore.recover(ProxyTopicMessageStore.java:62)
> 	at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:594)
> 	at org.apache.activemq.broker.region.Topic.access$100(Topic.java:65)
> 	at org.apache.activemq.broker.region.Topic$6.run(Topic.java:733)
> 	at org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
> 	at java.util.TimerThread.mainLoop(Timer.java:512)
> 	at java.util.TimerThread.run(Timer.java:462)
> {noformat}
> The problem happens when using JDBC persistency with ActiveMQ 5.10.0. After a short look to source code, the same problem could be also with 5.12.1.
> Test case:
> - run ActiveMQ broker with JDBC persistency
> - create subscription to a topic, but do not receive the messages
> - send enough number of messages with short TimeToLive
> - when expireMessagesTask is scheduled, it tries to load all of the messages and causes the OutOfMemoryError
> It would be fine if
> {noformat}
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(MessageRecoveryListener)
> {noformat}
> will be updated like this:
> {code:java}
> public void recover(final MessageRecoveryListener listener) throws Exception {
>   // Get all the Message ids out of the database.
>   TransactionContext c = persistenceAdapter.getTransactionContext();
>   try {
>     c = persistenceAdapter.getTransactionContext();
>     adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
>       public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
>         if (listener.hasSpace()) {
>           Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
>           msg.getMessageId().setBrokerSequenceId(sequenceId);
>           return listener.recoverMessage(msg);
>         } else {
>           logger.debug("Recovery limit of the messages has exceeded.");
>           return false;
>         }                    
>       }
>       public boolean recoverMessageReference(String reference) throws Exception {
>         if (listener.hasSpace()) {
>           return listener.recoverMessageReference(new MessageId(reference));
>         } else {
>           logger.debug("Recovery limit of the message references has exceeded.");
>           return false;
>         }
>       }
>     });
>   } catch (SQLException e) {
>     JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>     throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
>   } finally {
>     c.close();
>   }
> }
> {code}
> But I am not sure if this limitation is the best way, because there will be some messages that should be expired, but need to wait. So better solution might be to do this job in more separated transactions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)