You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Hervé BARRAULT <he...@gmail.com> on 2011/07/29 08:57:39 UTC
ActiveMQ performances and JDBC persistence
Hi,
I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 server)
and i am doing some performance tests.
I'm sending 16000 messages through web services (using one port) and it
takes about 1 min to manage all messages and fill the JMS queue.
When i use a consumer to dequeue this queue (without adding new messages)
and fill another one, it takes about 6 min and 30 seconds. I was expecting
that i have some code which slow down the consumption.
But i have tried to use the purge method and it takes the same time.
Is there a way to increase the consumption rate ?
Thanks for answers.
Re: ActiveMQ performances and JDBC persistence
Posted by Hervé BARRAULT <he...@gmail.com>.
Hi,
Is the transaction mechanism in ActiveMQ is for the whole broker or concerns
only the queues involved in the transaction ?
Regards
Hervé
2011/8/16 Hervé BARRAULT <he...@gmail.com>
> Hi,
> I have the following configuration :
> 2 producers send message in a queue using camel producer template.
> I have one consumer which finally send 2 messages in 2 different queues.
> (using camel route)
> and for those queues i have for each 1 consumer.
>
> Globally, It works with 3 queues.
>
> I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control
> is disabled in this case).
> When trying some performance tests :
>
> When the 2 producers are working it seems ok (bad performances but no
> contention).
>
> When the producers stop (i have a lot of enqueued messages), I see thread
> contention for VMTransport threads.
>
> I see 5 live threads (these threads are changing).
>
>
>
> Stacks at 04:36:09 PM (uptime 1:33:37)
>
> VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01
> java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int,
> int)
> java.net.SocketInputStream.read(byte[], int, int)
> org.postgresql.core.VisibleBufferedInputStream.readMore(int)
> org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
> org.postgresql.core.VisibleBufferedInputStream.read()
> org.postgresql.core.PGStream.ReceiveChar()
> org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int)
> org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList,
> ResultHandler, int, int, int)
>
> org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query)
> org.postgresql.jdbc2.AbstractJdbc2Connection.commit()
> org.apache.commons.dbcp.DelegatingConnection.commit()
>
> org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit()
> org.apache.activemq.store.jdbc.TransactionContext.commit()
>
> org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(ConnectionContext)
> org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit()
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [BLOCKED] CPU time: 0:18
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [BLOCKED] CPU time: 0:01
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
>
> VMTransport [WAITING] CPU time: 0:09
> sun.misc.Unsafe.park(boolean, long)
> java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
> boolean, long)
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
> boolean, long)
> java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
> java.util.concurrent.ThreadPoolExecutor.getTask()
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [WAITING] CPU time: 0:01
> sun.misc.Unsafe.park(boolean, long)
> java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
> boolean, long)
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
> boolean, long)
> java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
> java.util.concurrent.ThreadPoolExecutor.getTask()
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
>
> I have some difficulties to understand why i have 2 BLOCKED threads and 1
> Running in this case.
>
> Thanks for help.
>
>
>
>
> 2011/8/10 Hervé BARRAULT <he...@gmail.com>
>
>> Hi, i have done another experiment :
>> I have tried something to see the influence of messages on ActiveMQ
>> behavior.
>>
>> I try to send 20 messages per seconds during 20 seconds into the broker
>> and use a processor to dequeue the messages.
>> If there is no other messages, it is ok
>> If i create a false queue with 16k messages it is always working
>> If i create a false queue with 24k messages it takes about 25 seconds to
>> do all the job.
>> If i create a false queue with 32k messages it takes about 55 seconds to
>> do all the job.
>>
>> Is there something to do to avoid that a queue has an influence on other
>> queues?
>> Regards
>>
>>
>>
>> 2011/8/1 Hervé BARRAULT <he...@gmail.com>
>>
>>> Hi,
>>> I have looked to the purge mechanism to try to understand why it takes so
>>> much time to clean a queue.
>>>
>>> I have noticed something :
>>>
>>> ####
>>> CLASS org.apache.activemq.store.jdbc.Statements
>>> public String getFindMessageSequenceIdStatement() {
>>> if (findMessageSequenceIdStatement == null) {
>>> findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
>>> getFullMessageTableName()
>>> + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
>>> }
>>> return findMessageSequenceIdStatement;
>>> }
>>>
>>> public String getRemoveMessageStatement() {
>>> if (removeMessageStatement == null) {
>>> removeMessageStatement = "DELETE FROM " +
>>> getFullMessageTableName() + " WHERE ID=?";
>>> }
>>> return removeMessageStatement;
>>> }
>>> ###
>>>
>>> ###
>>> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
>>>
>>> private long getStoreSequenceIdForMessageId(MessageId messageId) throws
>>> IOException {
>>> long result = -1;
>>> TransactionContext c =
>>> persistenceAdapter.getTransactionContext();
>>> try {
>>> result = adapter.getStoreSequenceId(c, destination,
>>> messageId)[0];
>>> } catch (SQLException e) {
>>> JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>> throw IOExceptionSupport.create("Failed to get store
>>> sequenceId for messageId: " + messageId +", on: " + destination + ". Reason:
>>> " + e, e);
>>> } finally {
>>> c.close();
>>> }
>>> return result;
>>> }
>>>
>>> public void removeMessage(ConnectionContext context, MessageAck ack)
>>> throws IOException {
>>>
>>> long seq =
>>> getStoreSequenceIdForMessageId(ack.getLastMessageId());
>>>
>>> // Get a connection and remove the message from the DB
>>> TransactionContext c =
>>> persistenceAdapter.getTransactionContext(context);
>>> try {
>>> adapter.doRemoveMessage(c, seq);
>>> } catch (SQLException e) {
>>> JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>> throw IOExceptionSupport.create("Failed to broker message: "
>>> + ack.getLastMessageId() + " in container: " + e, e);
>>> } finally {
>>> c.close();
>>> }
>>> }
>>> ###
>>>
>>> Could it be better to use only one request to remove a row, which is :
>>> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
>>> MSGID_SEQ=? AND CONTAINER=?" ?
>>> or is there a case where it is not working ?
>>>
>>> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and
>>> CONTAINER (this index could be UNIQUE ?) ?
>>>
>>> I have checked that it takes 1min30 to dequeue my 16000 messages using
>>> the purge method with these two modifications. I noticed that the page size
>>> is 200 messages and it waits every 200 row to read from the database to be
>>> able to purge it, i never noticed this limit before.
>>>
>>> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
>>> purge a queue (does it can also apply to other components reading the queue
>>> ?).
>>>
>>> I don't have enough JMS broker and database mechanism knowledges to say
>>> if it is a good or a bad idea.
>>>
>>> Anyone can help me for this ?
>>>
>>> Thanks for answers.
>>> Hervé
>>>
>>>
>>> 2011/7/29 Hervé BARRAULT <he...@gmail.com>
>>>
>>>> Hi,
>>>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2
>>>> server) and i am doing some performance tests.
>>>>
>>>> I'm sending 16000 messages through web services (using one port) and it
>>>> takes about 1 min to manage all messages and fill the JMS queue.
>>>>
>>>> When i use a consumer to dequeue this queue (without adding new
>>>> messages) and fill another one, it takes about 6 min and 30 seconds. I was
>>>> expecting that i have some code which slow down the consumption.
>>>>
>>>> But i have tried to use the purge method and it takes the same time.
>>>>
>>>> Is there a way to increase the consumption rate ?
>>>>
>>>> Thanks for answers.
>>>>
>>>
>>>
>>
>
Re: ActiveMQ performances and JDBC persistence
Posted by Hervé BARRAULT <he...@gmail.com>.
Hi,
I have the following configuration :
2 producers send message in a queue using camel producer template.
I have one consumer which finally send 2 messages in 2 different queues.
(using camel route)
and for those queues i have for each 1 consumer.
Globally, It works with 3 queues.
I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control
is disabled in this case).
When trying some performance tests :
When the 2 producers are working it seems ok (bad performances but no
contention).
When the producers stop (i have a lot of enqueued messages), I see thread
contention for VMTransport threads.
I see 5 live threads (these threads are changing).
Stacks at 04:36:09 PM (uptime 1:33:37)
VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01
java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int,
int)
java.net.SocketInputStream.read(byte[], int, int)
org.postgresql.core.VisibleBufferedInputStream.readMore(int)
org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
org.postgresql.core.VisibleBufferedInputStream.read()
org.postgresql.core.PGStream.ReceiveChar()
org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int)
org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList,
ResultHandler, int, int, int)
org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query)
org.postgresql.jdbc2.AbstractJdbc2Connection.commit()
org.apache.commons.dbcp.DelegatingConnection.commit()
org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit()
org.apache.activemq.store.jdbc.TransactionContext.commit()
org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(ConnectionContext)
org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit()
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()
VMTransport [BLOCKED] CPU time: 0:18
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()
VMTransport [BLOCKED] CPU time: 0:01
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()
VMTransport [WAITING] CPU time: 0:09
sun.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
boolean, long)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
boolean, long)
java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
java.util.concurrent.ThreadPoolExecutor.getTask()
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()
VMTransport [WAITING] CPU time: 0:01
sun.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
boolean, long)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
boolean, long)
java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
java.util.concurrent.ThreadPoolExecutor.getTask()
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()
I have some difficulties to understand why i have 2 BLOCKED threads and 1
Running in this case.
Thanks for help.
2011/8/10 Hervé BARRAULT <he...@gmail.com>
> Hi, i have done another experiment :
> I have tried something to see the influence of messages on ActiveMQ
> behavior.
>
> I try to send 20 messages per seconds during 20 seconds into the broker and
> use a processor to dequeue the messages.
> If there is no other messages, it is ok
> If i create a false queue with 16k messages it is always working
> If i create a false queue with 24k messages it takes about 25 seconds to do
> all the job.
> If i create a false queue with 32k messages it takes about 55 seconds to do
> all the job.
>
> Is there something to do to avoid that a queue has an influence on other
> queues?
> Regards
>
>
>
> 2011/8/1 Hervé BARRAULT <he...@gmail.com>
>
>> Hi,
>> I have looked to the purge mechanism to try to understand why it takes so
>> much time to clean a queue.
>>
>> I have noticed something :
>>
>> ####
>> CLASS org.apache.activemq.store.jdbc.Statements
>> public String getFindMessageSequenceIdStatement() {
>> if (findMessageSequenceIdStatement == null) {
>> findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
>> getFullMessageTableName()
>> + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
>> }
>> return findMessageSequenceIdStatement;
>> }
>>
>> public String getRemoveMessageStatement() {
>> if (removeMessageStatement == null) {
>> removeMessageStatement = "DELETE FROM " +
>> getFullMessageTableName() + " WHERE ID=?";
>> }
>> return removeMessageStatement;
>> }
>> ###
>>
>> ###
>> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
>>
>> private long getStoreSequenceIdForMessageId(MessageId messageId) throws
>> IOException {
>> long result = -1;
>> TransactionContext c = persistenceAdapter.getTransactionContext();
>> try {
>> result = adapter.getStoreSequenceId(c, destination,
>> messageId)[0];
>> } catch (SQLException e) {
>> JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>> throw IOExceptionSupport.create("Failed to get store
>> sequenceId for messageId: " + messageId +", on: " + destination + ". Reason:
>> " + e, e);
>> } finally {
>> c.close();
>> }
>> return result;
>> }
>>
>> public void removeMessage(ConnectionContext context, MessageAck ack)
>> throws IOException {
>>
>> long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
>>
>> // Get a connection and remove the message from the DB
>> TransactionContext c =
>> persistenceAdapter.getTransactionContext(context);
>> try {
>> adapter.doRemoveMessage(c, seq);
>> } catch (SQLException e) {
>> JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>> throw IOExceptionSupport.create("Failed to broker message: " +
>> ack.getLastMessageId() + " in container: " + e, e);
>> } finally {
>> c.close();
>> }
>> }
>> ###
>>
>> Could it be better to use only one request to remove a row, which is :
>> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
>> MSGID_SEQ=? AND CONTAINER=?" ?
>> or is there a case where it is not working ?
>>
>> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and CONTAINER
>> (this index could be UNIQUE ?) ?
>>
>> I have checked that it takes 1min30 to dequeue my 16000 messages using the
>> purge method with these two modifications. I noticed that the page size is
>> 200 messages and it waits every 200 row to read from the database to be able
>> to purge it, i never noticed this limit before.
>>
>> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
>> purge a queue (does it can also apply to other components reading the queue
>> ?).
>>
>> I don't have enough JMS broker and database mechanism knowledges to say if
>> it is a good or a bad idea.
>>
>> Anyone can help me for this ?
>>
>> Thanks for answers.
>> Hervé
>>
>>
>> 2011/7/29 Hervé BARRAULT <he...@gmail.com>
>>
>>> Hi,
>>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2
>>> server) and i am doing some performance tests.
>>>
>>> I'm sending 16000 messages through web services (using one port) and it
>>> takes about 1 min to manage all messages and fill the JMS queue.
>>>
>>> When i use a consumer to dequeue this queue (without adding new messages)
>>> and fill another one, it takes about 6 min and 30 seconds. I was expecting
>>> that i have some code which slow down the consumption.
>>>
>>> But i have tried to use the purge method and it takes the same time.
>>>
>>> Is there a way to increase the consumption rate ?
>>>
>>> Thanks for answers.
>>>
>>
>>
>
Re: ActiveMQ performances and JDBC persistence
Posted by Hervé BARRAULT <he...@gmail.com>.
Hi, i have done another experiment :
I have tried something to see the influence of messages on ActiveMQ
behavior.
I try to send 20 messages per seconds during 20 seconds into the broker and
use a processor to dequeue the messages.
If there is no other messages, it is ok
If i create a false queue with 16k messages it is always working
If i create a false queue with 24k messages it takes about 25 seconds to do
all the job.
If i create a false queue with 32k messages it takes about 55 seconds to do
all the job.
Is there something to do to avoid that a queue has an influence on other
queues?
Regards
2011/8/1 Hervé BARRAULT <he...@gmail.com>
> Hi,
> I have looked to the purge mechanism to try to understand why it takes so
> much time to clean a queue.
>
> I have noticed something :
>
> ####
> CLASS org.apache.activemq.store.jdbc.Statements
> public String getFindMessageSequenceIdStatement() {
> if (findMessageSequenceIdStatement == null) {
> findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
> getFullMessageTableName()
> + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
> }
> return findMessageSequenceIdStatement;
> }
>
> public String getRemoveMessageStatement() {
> if (removeMessageStatement == null) {
> removeMessageStatement = "DELETE FROM " +
> getFullMessageTableName() + " WHERE ID=?";
> }
> return removeMessageStatement;
> }
> ###
>
> ###
> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
>
> private long getStoreSequenceIdForMessageId(MessageId messageId) throws
> IOException {
> long result = -1;
> TransactionContext c = persistenceAdapter.getTransactionContext();
> try {
> result = adapter.getStoreSequenceId(c, destination,
> messageId)[0];
> } catch (SQLException e) {
> JDBCPersistenceAdapter.log("JDBC Failure: ", e);
> throw IOExceptionSupport.create("Failed to get store sequenceId
> for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
> } finally {
> c.close();
> }
> return result;
> }
>
> public void removeMessage(ConnectionContext context, MessageAck ack) throws
> IOException {
>
> long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
>
> // Get a connection and remove the message from the DB
> TransactionContext c =
> persistenceAdapter.getTransactionContext(context);
> try {
> adapter.doRemoveMessage(c, seq);
> } catch (SQLException e) {
> JDBCPersistenceAdapter.log("JDBC Failure: ", e);
> throw IOExceptionSupport.create("Failed to broker message: " +
> ack.getLastMessageId() + " in container: " + e, e);
> } finally {
> c.close();
> }
> }
> ###
>
> Could it be better to use only one request to remove a row, which is :
> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
> MSGID_SEQ=? AND CONTAINER=?" ?
> or is there a case where it is not working ?
>
> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and CONTAINER
> (this index could be UNIQUE ?) ?
>
> I have checked that it takes 1min30 to dequeue my 16000 messages using the
> purge method with these two modifications. I noticed that the page size is
> 200 messages and it waits every 200 row to read from the database to be able
> to purge it, i never noticed this limit before.
>
> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
> purge a queue (does it can also apply to other components reading the queue
> ?).
>
> I don't have enough JMS broker and database mechanism knowledges to say if
> it is a good or a bad idea.
>
> Anyone can help me for this ?
>
> Thanks for answers.
> Hervé
>
>
> 2011/7/29 Hervé BARRAULT <he...@gmail.com>
>
>> Hi,
>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 server)
>> and i am doing some performance tests.
>>
>> I'm sending 16000 messages through web services (using one port) and it
>> takes about 1 min to manage all messages and fill the JMS queue.
>>
>> When i use a consumer to dequeue this queue (without adding new messages)
>> and fill another one, it takes about 6 min and 30 seconds. I was expecting
>> that i have some code which slow down the consumption.
>>
>> But i have tried to use the purge method and it takes the same time.
>>
>> Is there a way to increase the consumption rate ?
>>
>> Thanks for answers.
>>
>
>
Re: ActiveMQ performances and JDBC persistence
Posted by Hervé BARRAULT <he...@gmail.com>.
Hi,
I have looked to the purge mechanism to try to understand why it takes so
much time to clean a queue.
I have noticed something :
####
CLASS org.apache.activemq.store.jdbc.Statements
public String getFindMessageSequenceIdStatement() {
if (findMessageSequenceIdStatement == null) {
findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
getFullMessageTableName()
+ " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
}
return findMessageSequenceIdStatement;
}
public String getRemoveMessageStatement() {
if (removeMessageStatement == null) {
removeMessageStatement = "DELETE FROM " +
getFullMessageTableName() + " WHERE ID=?";
}
return removeMessageStatement;
}
###
###
CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
private long getStoreSequenceIdForMessageId(MessageId messageId) throws
IOException {
long result = -1;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.getStoreSequenceId(c, destination,
messageId)[0];
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId
for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws
IOException {
long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
// Get a connection and remove the message from the DB
TransactionContext c =
persistenceAdapter.getTransactionContext(context);
try {
adapter.doRemoveMessage(c, seq);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " +
ack.getLastMessageId() + " in container: " + e, e);
} finally {
c.close();
}
}
###
Could it be better to use only one request to remove a row, which is :
"DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
MSGID_SEQ=? AND CONTAINER=?" ?
or is there a case where it is not working ?
Could it be better to create an index based on MSGID_PROD, MSGID_SEQ
and CONTAINER
(this index could be UNIQUE ?) ?
I have checked that it takes 1min30 to dequeue my 16000 messages using the
purge method with these two modifications. I noticed that the page size is
200 messages and it waits every 200 row to read from the database to be able
to purge it, i never noticed this limit before.
So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
purge a queue (does it can also apply to other components reading the queue
?).
I don't have enough JMS broker and database mechanism knowledges to say if
it is a good or a bad idea.
Anyone can help me for this ?
Thanks for answers.
Hervé
2011/7/29 Hervé BARRAULT <he...@gmail.com>
> Hi,
> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 server)
> and i am doing some performance tests.
>
> I'm sending 16000 messages through web services (using one port) and it
> takes about 1 min to manage all messages and fill the JMS queue.
>
> When i use a consumer to dequeue this queue (without adding new messages)
> and fill another one, it takes about 6 min and 30 seconds. I was expecting
> that i have some code which slow down the consumption.
>
> But i have tried to use the purge method and it takes the same time.
>
> Is there a way to increase the consumption rate ?
>
> Thanks for answers.
>