You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by guerra <jg...@gmail.com> on 2012/10/11 14:17:02 UTC

On-disk persisted message queues on consumer failure

Hi,

I've a ActiveMQ broker to insert messages into the Oracle database. I have a
working version with a JDBC camel component and it is inserting messages
into it. So, no problems so far.

Now the scenario is, JMS messages have the PERSISTED flag enabled, KahaDB is
also setup and apparently messages are persisted (rotate the log files looks
ok), but when I shutdown the database, the broker basically queue the
messages on memory rather than disk. Is there any way to enable the way to
dump messages to disk instead?.

Does it cover this link http://activemq.apache.org/persistence.html the
scenario I described before? 


###################### activemq.xml ##############################
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="telemetry" persistent="true"
dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">

 <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry memoryLimit="1mb" producerFlowControl="false"
topic="&gt;">
                  <pendingSubscriberPolicy>
                    <vmCursor/>
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry memoryLimit="1mb" producerFlowControl="false"
queue="&gt;">
                  
                  <pendingQueuePolicy>
                    <fileQueueCursor/>
                  </pendingQueuePolicy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


 <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>
 <persistenceAdapter>
            <kahaDB directory="/opt/shared/telemetry/data/kahadb"
journalMaxFileLength="1mb"/>
        </persistenceAdapter>

 <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="64 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="5 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="1 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
 <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:40000"
updateClusterClients="true"/>
        </transportConnectors>

</broker>

###################### camel.xml ##############################
<route>
                <from uri="activemq:queue:ntcs.telemetry.in"/>
                <to uri="seda:jdbc.queue"/>
</route>
<route>
                <description>NTCS JDBC internal queue</description>
                <from uri="seda:jdbc.queue"/>
                <split>
                        <xpath>/ntcs-telemetry/telemetry</xpath>
                        <bean ref="insertTelemetry"/>
                        <to uri="jdbc:oracle-ds"/>
                </split>
</route>



--
View this message in context: http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: java.util.concurrent.RejectedExecutionException:

Posted by Timothy Bish <ta...@gmail.com>.
On Fri, 2012-10-12 at 12:59 +0200, Jamie wrote: 
> Hi Everyone
> 
> I am running Activemq 5.7.0.
> 
> The following error message is outputted:
> 
> Exception in thread "ActiveMQ Session Task-14" 
> java.util.concurrent.RejectedExecutionException: Task 
> org.apache.activemq.thread.PooledTaskRunner$1@65e074b rejected from 
> java.util.concurrent.ThreadPoolExecutor@730d031[Running, pool size = 16, 
> active threads = 1, queued tasks = 0, completed tasks = 641]
>      at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2013)
>      at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:816)
>      at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1337)
>      at 
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:151)
>      at 
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:47)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>      at java.lang.Thread.run(Thread.java:722)
> 
> Also, running ActiveMQ in a profiler, I see that some of my receive 
> threads are blocking on 
> org.apache.activemq.ActiveMQMessageConsumer.receive(), although there 
> are thousands of messages in the queue.
> 
> Any ideas?
> 
> Jamie

Is this something you can reproduce in a test case?

-- 
Tim Bish
Sr Software Engineer | RedHat Inc.
tim.bish@redhat.com | www.fusesource.com | www.redhat.com 
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/


java.util.concurrent.RejectedExecutionException:

Posted by Jamie <ja...@mailarchiva.com>.
Hi Everyone

I am running Activemq 5.7.0.

The following error message is outputted:

Exception in thread "ActiveMQ Session Task-14" 
java.util.concurrent.RejectedExecutionException: Task 
org.apache.activemq.thread.PooledTaskRunner$1@65e074b rejected from 
java.util.concurrent.ThreadPoolExecutor@730d031[Running, pool size = 16, 
active threads = 1, queued tasks = 0, completed tasks = 641]
     at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2013)
     at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:816)
     at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1337)
     at 
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:151)
     at 
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:47)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
     at java.lang.Thread.run(Thread.java:722)

Also, running ActiveMQ in a profiler, I see that some of my receive 
threads are blocking on 
org.apache.activemq.ActiveMQMessageConsumer.receive(), although there 
are thousands of messages in the queue.

Any ideas?

Jamie

Re: On-disk persisted message queues on consumer failure

Posted by guerra <jg...@gmail.com>.
I am looking at another approach to the problem which is the exception
capture. I am sketching a route something like this (below), because this is
another approach that it would suit me very well too, the point for me is
not to loose any data anyway!

Any feedback is definitely welcomed.
Thanks

<route>
                <description>NTCS Oracle Insertion Queue</description>
                <from uri="seda:jdbc.queue"/>
                <onException>
                        <exception>java.sql.SQLException</exception>
                        
                        <handled>
                                <constant>true</constant>
                        </handled>
                        <process ref="telemetryFailureProcessor"/> this ...
                        <to uri:jdbc:spare-database.queue/> or this.
                        <rollback markRollbackOnly="true"/>
                </onException>
                <transacted/>
                <split>
                        <xpath>/ntcs-telemetry/telemetry</xpath>
                        <bean ref="insertTransactedTelemetry"/>
                </split>
        </route>




--
View this message in context: http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706p4657860.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: On-disk persisted message queues on consumer failure

Posted by Christian Posta <ch...@gmail.com>.
I'll give it a shot when I get a sec. I'll let you know.

On Tue, Oct 16, 2012 at 6:13 AM, guerra <jg...@gmail.com> wrote:

> Hi Christian
>
> I've set up the database route as transacted, please see camel.xml below.
> The configuration is working nicely and I've tested doing a wrong insertion
> and I can see the transaction manager doing the rollback.
>
> However, I don't see the link between uncommited transactions and KahaDB. I
> mean, I can't figure out how an uncommited transaction "goes" into KahaDB
> storage and in fact, it doesn't change the behaviour as it was before
> setting up the transaction manager for this route. If I shutdown the
> database, Camel starts to build up the queue again as it was before and
> KahaDB apparently doesn't realize about the new status with the database.
> So, something is not working!!  Likely I am missing some configuration or
> something, Anyway I'd be glad if somebody can shed some light on this.!
>
> Cheers
>
>     ******** camel.xml *********
> <route>
>                 <description>NTCS Incoming Queue</description>
>                 <from uri="activemq:queue:ntcs.telemetry.in"/>
>                 <to uri="seda:jdbc.queue"/>
>         </route>
>         <route>
>                 <description>NTCS Oracle Insertion Queue</description>
>                 <from uri="seda:jdbc.queue"/>
>                 <transacted/>
>                 <split>
>                         <xpath>/ntcs-telemetry/telemetry</xpath>
>                         <bean ref="insertTransactedTelemetry"/>
>                 </split>
>         </route>
>     </camelContext>
>
> <bean id="oracle-ntcs-ds" class="oracle.ucp.jdbc.PoolDataSourceFactory"
> factory-method="getPoolDataSource">
>           <property name="URL" value="jdbc:oracle:thin:@(DESCRIPTION =
> (LOAD_BALANCE=ON)(ADDRESS = (PROTOCOL = TCP)(HOST =
> ntcs-cluster-scan.tng.iac.es)(PORT = 1521)) (CONNECT_DATA = (SERVER =
> DEDICATED) (SERVICE_NAME = ntcs.tng.iac.es)))" />
>
>             <property name="user" value="archa" />
>             <property name="password" value="ast$arte" />
>             <property name="connectionFactoryClassName"
> value="oracle.jdbc.pool.OracleDataSource" />
>             <property name="connectionPoolName" value="NTCS_POOL" />
>             <property name="connectionWaitTimeout" value="20" />
>             <property name="minPoolSize" value="4" />
>             <property name="maxPoolSize" value="10" />
>             <property name="initialPoolSize" value="4" />
>             <property name="inactiveConnectionTimeout" value="20" />
>             <property name="timeoutCheckInterval" value="60" />
>             <property name="fastConnectionFailoverEnabled" value="true" />
>             <property name="ONSConfiguration"
>
> value="nodes=ntcs-oracl1:6200,ntcs-oracl2:6200,ntcs-oracl3:6200,ntcs-oracl4:6200"
> />
>             <property name="validateConnectionOnBorrow" value="true" />
>             <property name="maxStatements" value="0" />
>     </bean>
>
>
>     <tx:annotation-driven transaction-manager="txManager"/>
>
>
>
>     <bean id="txManager"
> class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
>         <property name="dataSource" ref="oracle-ntcs-ds"/>
>     </bean>
>
>     <bean id="insertTransactedTelemetry"
> class="tng.ntcs.camel.bean.NTCSInsertTransactedTelemetry">
>         <property name="datasource" ref="oracle-ntcs-ds"/>
>     </bean>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706p4657850.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog

Re: On-disk persisted message queues on consumer failure

Posted by guerra <jg...@gmail.com>.
Hi Christian

I've set up the database route as transacted, please see camel.xml below.
The configuration is working nicely and I've tested doing a wrong insertion
and I can see the transaction manager doing the rollback.

However, I don't see the link between uncommited transactions and KahaDB. I
mean, I can't figure out how an uncommited transaction "goes" into KahaDB
storage and in fact, it doesn't change the behaviour as it was before
setting up the transaction manager for this route. If I shutdown the
database, Camel starts to build up the queue again as it was before and
KahaDB apparently doesn't realize about the new status with the database.
So, something is not working!!  Likely I am missing some configuration or
something, Anyway I'd be glad if somebody can shed some light on this.!

Cheers

    ******** camel.xml *********
<route>
                <description>NTCS Incoming Queue</description>
                <from uri="activemq:queue:ntcs.telemetry.in"/>
                <to uri="seda:jdbc.queue"/>
        </route>
        <route>
                <description>NTCS Oracle Insertion Queue</description>
                <from uri="seda:jdbc.queue"/>
                <transacted/>
                <split>
                        <xpath>/ntcs-telemetry/telemetry</xpath>
                        <bean ref="insertTransactedTelemetry"/>
                </split>
        </route>
    </camelContext>

<bean id="oracle-ntcs-ds" class="oracle.ucp.jdbc.PoolDataSourceFactory"
factory-method="getPoolDataSource">
          <property name="URL" value="jdbc:oracle:thin:@(DESCRIPTION =
(LOAD_BALANCE=ON)(ADDRESS = (PROTOCOL = TCP)(HOST =
ntcs-cluster-scan.tng.iac.es)(PORT = 1521)) (CONNECT_DATA = (SERVER =
DEDICATED) (SERVICE_NAME = ntcs.tng.iac.es)))" />

            <property name="user" value="archa" />
            <property name="password" value="ast$arte" />
            <property name="connectionFactoryClassName"
value="oracle.jdbc.pool.OracleDataSource" />
            <property name="connectionPoolName" value="NTCS_POOL" />
            <property name="connectionWaitTimeout" value="20" />
            <property name="minPoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="initialPoolSize" value="4" />
            <property name="inactiveConnectionTimeout" value="20" />
            <property name="timeoutCheckInterval" value="60" />
            <property name="fastConnectionFailoverEnabled" value="true" />
            <property name="ONSConfiguration"
value="nodes=ntcs-oracl1:6200,ntcs-oracl2:6200,ntcs-oracl3:6200,ntcs-oracl4:6200"
/>
            <property name="validateConnectionOnBorrow" value="true" />
            <property name="maxStatements" value="0" />
    </bean>

    
    <tx:annotation-driven transaction-manager="txManager"/>

    
    
    <bean id="txManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="oracle-ntcs-ds"/>
    </bean>

    <bean id="insertTransactedTelemetry"
class="tng.ntcs.camel.bean.NTCSInsertTransactedTelemetry">
        <property name="datasource" ref="oracle-ntcs-ds"/>
    </bean>



--
View this message in context: http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706p4657850.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: On-disk persisted message queues on consumer failure

Posted by Christian Posta <ch...@gmail.com>.
Yah, excellent. If it still doesn't work, post your route and configs and I
can give it a shot for you on my side.

Christian

On Thu, Oct 11, 2012 at 8:33 AM, guerra <jg...@gmail.com> wrote:

> Thanks and yes, I think you are fully right. It must be camel is building
> up
> the memory. In fact that is what it shows up on the camel monitor anyway!.
> I
> was confused myself!
>
> I was reading your link and it looks promising alright. I looked up the
> "Database Sample" and it seems that it could solve my scenario. I am going
> to setup the Spring TM to mark the route as transacted and we'll see how it
> goes.
> Thus theoretically, if I shutdown the database, ActiveMQ should use KahaDB
> to persist those non committed messages by Camel. Anyway, I'll post the
> results in a few days.
>
> Cheers
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706p4657730.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog

Re: On-disk persisted message queues on consumer failure

Posted by guerra <jg...@gmail.com>.
Thanks and yes, I think you are fully right. It must be camel is building up
the memory. In fact that is what it shows up on the camel monitor anyway!. I
was confused myself!

I was reading your link and it looks promising alright. I looked up the
"Database Sample" and it seems that it could solve my scenario. I am going
to setup the Spring TM to mark the route as transacted and we'll see how it
goes.
Thus theoretically, if I shutdown the database, ActiveMQ should use KahaDB
to persist those non committed messages by Camel. Anyway, I'll post the
results in a few days.

Cheers





--
View this message in context: http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706p4657730.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: On-disk persisted message queues on consumer failure

Posted by Christian Posta <ch...@gmail.com>.
Within the broker, if your messages are marked persistent, they will be
stored on disk (in your KahaDB) until they are delivered to a consumer and
the consumer acks them.

It sounds like your camel route is building them up in memory, not the
broker.

Run your route in a transaction so that when camel consumes the message, it
won't ack it unless it's been persisted to the DB.

http://camel.apache.org/transactional-client.html

On Thu, Oct 11, 2012 at 6:25 AM, guerra <jg...@gmail.com> wrote:

> The broker is queueing up messages in RAM. When I start the database again
> the messages are dequeued and sent okay to the database, but from memory. I
> am using the camel monitor and the seda queue is building up messages
> continuously . My point is why this messages are not stored on disk and if
> there is a way to do it.
> To be honest, I am not sure whether I am missing something or I am doing
> something wrong, but my tests shed this behaviour.
> Thanks
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706p4657720.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog

Re: On-disk persisted message queues on consumer failure

Posted by guerra <jg...@gmail.com>.
The broker is queueing up messages in RAM. When I start the database again
the messages are dequeued and sent okay to the database, but from memory. I
am using the camel monitor and the seda queue is building up messages
continuously . My point is why this messages are not stored on disk and if
there is a way to do it.
To be honest, I am not sure whether I am missing something or I am doing
something wrong, but my tests shed this behaviour.
Thanks



--
View this message in context: http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706p4657720.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: On-disk persisted message queues on consumer failure

Posted by Christian Posta <ch...@gmail.com>.
So it sounds like the camel route is consuming from an activemq queue and
dumping to DB. And then when the DB goes down, the camel route is consuming
them and holding them in memory (since the DB is down)? Or it's not
consuming them and they're getting queued up in the broker?

On Thu, Oct 11, 2012 at 5:17 AM, guerra <jg...@gmail.com> wrote:

> Hi,
>
> I've a ActiveMQ broker to insert messages into the Oracle database. I have
> a
> working version with a JDBC camel component and it is inserting messages
> into it. So, no problems so far.
>
> Now the scenario is, JMS messages have the PERSISTED flag enabled, KahaDB
> is
> also setup and apparently messages are persisted (rotate the log files
> looks
> ok), but when I shutdown the database, the broker basically queue the
> messages on memory rather than disk. Is there any way to enable the way to
> dump messages to disk instead?.
>
> Does it cover this link http://activemq.apache.org/persistence.html the
> scenario I described before?
>
>
> ###################### activemq.xml ##############################
> <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="telemetry" persistent="true"
> dataDirectory="${activemq.base}/data"
> destroyApplicationContextOnStop="true">
>
>  <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry memoryLimit="1mb" producerFlowControl="false"
> topic="&gt;">
>                   <pendingSubscriberPolicy>
>                     <vmCursor/>
>                   </pendingSubscriberPolicy>
>                 </policyEntry>
>                 <policyEntry memoryLimit="1mb" producerFlowControl="false"
> queue="&gt;">
>
>                   <pendingQueuePolicy>
>                     <fileQueueCursor/>
>                   </pendingQueuePolicy>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>
>  <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>  <persistenceAdapter>
>             <kahaDB directory="/opt/shared/telemetry/data/kahadb"
> journalMaxFileLength="1mb"/>
>         </persistenceAdapter>
>
>  <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="64 mb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="5 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="1 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>  <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:40000"
> updateClusterClients="true"/>
>         </transportConnectors>
>
> </broker>
>
> ###################### camel.xml ##############################
> <route>
>                 <from uri="activemq:queue:ntcs.telemetry.in"/>
>                 <to uri="seda:jdbc.queue"/>
> </route>
> <route>
>                 <description>NTCS JDBC internal queue</description>
>                 <from uri="seda:jdbc.queue"/>
>                 <split>
>                         <xpath>/ntcs-telemetry/telemetry</xpath>
>                         <bean ref="insertTelemetry"/>
>                         <to uri="jdbc:oracle-ds"/>
>                 </split>
> </route>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/On-disk-persisted-message-queues-on-consumer-failure-tp4657706.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog