You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Hervé BARRAULT <he...@gmail.com> on 2011/07/29 08:57:39 UTC

ActiveMQ performances and JDBC persistence

Hi,
I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 server)
and i am doing some performance tests.

I'm sending 16000 messages through web services (using one port) and it
takes about 1 min to manage all messages and fill the JMS queue.

When i use a consumer to dequeue this queue (without adding new messages)
and fill another one, it takes about 6 min and 30 seconds. I was expecting
that i have some code which slow down the consumption.

But i have tried to use the purge method and it takes the same time.

Is there a way to increase the consumption rate ?

Thanks for answers.

Re: ActiveMQ performances and JDBC persistence

Posted by Hervé BARRAULT <he...@gmail.com>.
Hi,

Is the transaction mechanism in ActiveMQ is for the whole broker or concerns
only the queues involved in the transaction ?

Regards
Hervé






2011/8/16 Hervé BARRAULT <he...@gmail.com>

> Hi,
> I have the following configuration :
> 2 producers send message in a queue using camel producer template.
> I have one consumer which finally send 2 messages in 2 different queues.
> (using camel route)
> and for those queues i have for each 1 consumer.
>
> Globally, It works with 3 queues.
>
> I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control
> is disabled in this case).
> When trying some performance tests :
>
> When the 2 producers are working it seems ok (bad performances but no
> contention).
>
> When the producers stop (i have a lot of enqueued messages), I see thread
> contention for VMTransport threads.
>
> I see 5 live threads (these threads are changing).
>
>
>
> Stacks at 04:36:09 PM (uptime 1:33:37)
>
> VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01
> java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int,
> int)
> java.net.SocketInputStream.read(byte[], int, int)
> org.postgresql.core.VisibleBufferedInputStream.readMore(int)
> org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
> org.postgresql.core.VisibleBufferedInputStream.read()
> org.postgresql.core.PGStream.ReceiveChar()
> org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int)
> org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList,
> ResultHandler, int, int, int)
>
> org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query)
> org.postgresql.jdbc2.AbstractJdbc2Connection.commit()
> org.apache.commons.dbcp.DelegatingConnection.commit()
>
> org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit()
> org.apache.activemq.store.jdbc.TransactionContext.commit()
>
> org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(ConnectionContext)
> org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit()
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [BLOCKED] CPU time: 0:18
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [BLOCKED] CPU time: 0:01
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
>
> VMTransport [WAITING] CPU time: 0:09
> sun.misc.Unsafe.park(boolean, long)
> java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
> boolean, long)
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
> boolean, long)
> java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
> java.util.concurrent.ThreadPoolExecutor.getTask()
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [WAITING] CPU time: 0:01
> sun.misc.Unsafe.park(boolean, long)
> java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
> boolean, long)
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
> boolean, long)
> java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
> java.util.concurrent.ThreadPoolExecutor.getTask()
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
>
> I have some difficulties to understand why i have 2 BLOCKED threads and 1
> Running in this case.
>
> Thanks for help.
>
>
>
>
> 2011/8/10 Hervé BARRAULT <he...@gmail.com>
>
>> Hi, i have done another experiment :
>> I have tried something to see the influence of messages on ActiveMQ
>> behavior.
>>
>> I try to send 20 messages per seconds during 20 seconds into the broker
>> and use a processor to dequeue the messages.
>> If there is no other messages, it is ok
>> If i create a false queue with 16k messages it is always working
>> If i create a false queue with 24k messages it takes about 25 seconds to
>> do all the job.
>> If i create a false queue with 32k messages it takes about 55 seconds to
>> do all the job.
>>
>> Is there something to do to avoid that a queue has an influence on other
>> queues?
>> Regards
>>
>>
>>
>> 2011/8/1 Hervé BARRAULT <he...@gmail.com>
>>
>>> Hi,
>>> I have looked to the purge mechanism to try to understand why it takes so
>>> much time to clean a queue.
>>>
>>> I have noticed something :
>>>
>>> ####
>>> CLASS org.apache.activemq.store.jdbc.Statements
>>> public String getFindMessageSequenceIdStatement() {
>>>     if (findMessageSequenceIdStatement == null) {
>>>         findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
>>> getFullMessageTableName()
>>>             + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
>>>     }
>>>     return findMessageSequenceIdStatement;
>>> }
>>>
>>> public String getRemoveMessageStatement() {
>>>         if (removeMessageStatement == null) {
>>>             removeMessageStatement = "DELETE FROM " +
>>> getFullMessageTableName() + " WHERE ID=?";
>>>         }
>>>         return removeMessageStatement;
>>>     }
>>> ###
>>>
>>> ###
>>> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
>>>
>>> private long getStoreSequenceIdForMessageId(MessageId messageId) throws
>>> IOException {
>>>         long result = -1;
>>>         TransactionContext c =
>>> persistenceAdapter.getTransactionContext();
>>>         try {
>>>             result = adapter.getStoreSequenceId(c, destination,
>>> messageId)[0];
>>>         } catch (SQLException e) {
>>>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>>             throw IOExceptionSupport.create("Failed to get store
>>> sequenceId for messageId: " + messageId +", on: " + destination + ". Reason:
>>> " + e, e);
>>>         } finally {
>>>             c.close();
>>>         }
>>>         return result;
>>>     }
>>>
>>> public void removeMessage(ConnectionContext context, MessageAck ack)
>>> throws IOException {
>>>
>>>         long seq =
>>> getStoreSequenceIdForMessageId(ack.getLastMessageId());
>>>
>>>         // Get a connection and remove the message from the DB
>>>         TransactionContext c =
>>> persistenceAdapter.getTransactionContext(context);
>>>         try {
>>>             adapter.doRemoveMessage(c, seq);
>>>         } catch (SQLException e) {
>>>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>>             throw IOExceptionSupport.create("Failed to broker message: "
>>> + ack.getLastMessageId() + " in container: " + e, e);
>>>         } finally {
>>>             c.close();
>>>         }
>>>     }
>>> ###
>>>
>>> Could it be better to use only one request to remove a row, which is :
>>> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
>>> MSGID_SEQ=? AND CONTAINER=?" ?
>>> or is there a case where it is not working ?
>>>
>>> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and
>>> CONTAINER (this index could be UNIQUE ?) ?
>>>
>>> I have checked that it takes 1min30 to dequeue my 16000 messages using
>>> the purge method with these two modifications. I noticed that the page size
>>> is 200 messages and it waits every 200 row to read from the database to be
>>> able to purge it, i never noticed this limit before.
>>>
>>> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
>>> purge a queue (does it can also apply to other components reading the queue
>>> ?).
>>>
>>> I don't have enough JMS broker and database mechanism knowledges to say
>>> if it is a good or a bad idea.
>>>
>>> Anyone can help me for this ?
>>>
>>> Thanks for answers.
>>> Hervé
>>>
>>>
>>> 2011/7/29 Hervé BARRAULT <he...@gmail.com>
>>>
>>>> Hi,
>>>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2
>>>> server) and i am doing some performance tests.
>>>>
>>>> I'm sending 16000 messages through web services (using one port) and it
>>>> takes about 1 min to manage all messages and fill the JMS queue.
>>>>
>>>> When i use a consumer to dequeue this queue (without adding new
>>>> messages) and fill another one, it takes about 6 min and 30 seconds. I was
>>>> expecting that i have some code which slow down the consumption.
>>>>
>>>> But i have tried to use the purge method and it takes the same time.
>>>>
>>>> Is there a way to increase the consumption rate ?
>>>>
>>>> Thanks for answers.
>>>>
>>>
>>>
>>
>

Re: ActiveMQ performances and JDBC persistence

Posted by Hervé BARRAULT <he...@gmail.com>.
Hi,
I have the following configuration :
2 producers send message in a queue using camel producer template.
I have one consumer which finally send 2 messages in 2 different queues.
(using camel route)
and for those queues i have for each 1 consumer.

Globally, It works with 3 queues.

I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control
is disabled in this case).
When trying some performance tests :

When the 2 producers are working it seems ok (bad performances but no
contention).

When the producers stop (i have a lot of enqueued messages), I see thread
contention for VMTransport threads.

I see 5 live threads (these threads are changing).



Stacks at 04:36:09 PM (uptime 1:33:37)

VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01
java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int,
int)
java.net.SocketInputStream.read(byte[], int, int)
org.postgresql.core.VisibleBufferedInputStream.readMore(int)
org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
org.postgresql.core.VisibleBufferedInputStream.read()
org.postgresql.core.PGStream.ReceiveChar()
org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int)
org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList,
ResultHandler, int, int, int)
org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query)
org.postgresql.jdbc2.AbstractJdbc2Connection.commit()
org.apache.commons.dbcp.DelegatingConnection.commit()
org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit()
org.apache.activemq.store.jdbc.TransactionContext.commit()
org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(ConnectionContext)
org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit()
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()

VMTransport [BLOCKED] CPU time: 0:18
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()

VMTransport [BLOCKED] CPU time: 0:01
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()


VMTransport [WAITING] CPU time: 0:09
sun.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
boolean, long)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
boolean, long)
java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
java.util.concurrent.ThreadPoolExecutor.getTask()
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()

VMTransport [WAITING] CPU time: 0:01
sun.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
boolean, long)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
boolean, long)
java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
java.util.concurrent.ThreadPoolExecutor.getTask()
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()


I have some difficulties to understand why i have 2 BLOCKED threads and 1
Running in this case.

Thanks for help.



2011/8/10 Hervé BARRAULT <he...@gmail.com>

> Hi, i have done another experiment :
> I have tried something to see the influence of messages on ActiveMQ
> behavior.
>
> I try to send 20 messages per seconds during 20 seconds into the broker and
> use a processor to dequeue the messages.
> If there is no other messages, it is ok
> If i create a false queue with 16k messages it is always working
> If i create a false queue with 24k messages it takes about 25 seconds to do
> all the job.
> If i create a false queue with 32k messages it takes about 55 seconds to do
> all the job.
>
> Is there something to do to avoid that a queue has an influence on other
> queues?
> Regards
>
>
>
> 2011/8/1 Hervé BARRAULT <he...@gmail.com>
>
>> Hi,
>> I have looked to the purge mechanism to try to understand why it takes so
>> much time to clean a queue.
>>
>> I have noticed something :
>>
>> ####
>> CLASS org.apache.activemq.store.jdbc.Statements
>> public String getFindMessageSequenceIdStatement() {
>>     if (findMessageSequenceIdStatement == null) {
>>         findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
>> getFullMessageTableName()
>>             + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
>>     }
>>     return findMessageSequenceIdStatement;
>> }
>>
>> public String getRemoveMessageStatement() {
>>         if (removeMessageStatement == null) {
>>             removeMessageStatement = "DELETE FROM " +
>> getFullMessageTableName() + " WHERE ID=?";
>>         }
>>         return removeMessageStatement;
>>     }
>> ###
>>
>> ###
>> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
>>
>> private long getStoreSequenceIdForMessageId(MessageId messageId) throws
>> IOException {
>>         long result = -1;
>>         TransactionContext c = persistenceAdapter.getTransactionContext();
>>         try {
>>             result = adapter.getStoreSequenceId(c, destination,
>> messageId)[0];
>>         } catch (SQLException e) {
>>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>             throw IOExceptionSupport.create("Failed to get store
>> sequenceId for messageId: " + messageId +", on: " + destination + ". Reason:
>> " + e, e);
>>         } finally {
>>             c.close();
>>         }
>>         return result;
>>     }
>>
>> public void removeMessage(ConnectionContext context, MessageAck ack)
>> throws IOException {
>>
>>         long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
>>
>>         // Get a connection and remove the message from the DB
>>         TransactionContext c =
>> persistenceAdapter.getTransactionContext(context);
>>         try {
>>             adapter.doRemoveMessage(c, seq);
>>         } catch (SQLException e) {
>>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>             throw IOExceptionSupport.create("Failed to broker message: " +
>> ack.getLastMessageId() + " in container: " + e, e);
>>         } finally {
>>             c.close();
>>         }
>>     }
>> ###
>>
>> Could it be better to use only one request to remove a row, which is :
>> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
>> MSGID_SEQ=? AND CONTAINER=?" ?
>> or is there a case where it is not working ?
>>
>> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and CONTAINER
>> (this index could be UNIQUE ?) ?
>>
>> I have checked that it takes 1min30 to dequeue my 16000 messages using the
>> purge method with these two modifications. I noticed that the page size is
>> 200 messages and it waits every 200 row to read from the database to be able
>> to purge it, i never noticed this limit before.
>>
>> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
>> purge a queue (does it can also apply to other components reading the queue
>> ?).
>>
>> I don't have enough JMS broker and database mechanism knowledges to say if
>> it is a good or a bad idea.
>>
>> Anyone can help me for this ?
>>
>> Thanks for answers.
>> Hervé
>>
>>
>> 2011/7/29 Hervé BARRAULT <he...@gmail.com>
>>
>>> Hi,
>>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2
>>> server) and i am doing some performance tests.
>>>
>>> I'm sending 16000 messages through web services (using one port) and it
>>> takes about 1 min to manage all messages and fill the JMS queue.
>>>
>>> When i use a consumer to dequeue this queue (without adding new messages)
>>> and fill another one, it takes about 6 min and 30 seconds. I was expecting
>>> that i have some code which slow down the consumption.
>>>
>>> But i have tried to use the purge method and it takes the same time.
>>>
>>> Is there a way to increase the consumption rate ?
>>>
>>> Thanks for answers.
>>>
>>
>>
>

Re: ActiveMQ performances and JDBC persistence

Posted by Hervé BARRAULT <he...@gmail.com>.
Hi, i have done another experiment :
I have tried something to see the influence of messages on ActiveMQ
behavior.

I try to send 20 messages per seconds during 20 seconds into the broker and
use a processor to dequeue the messages.
If there is no other messages, it is ok
If i create a false queue with 16k messages it is always working
If i create a false queue with 24k messages it takes about 25 seconds to do
all the job.
If i create a false queue with 32k messages it takes about 55 seconds to do
all the job.

Is there something to do to avoid that a queue has an influence on other
queues?
Regards


2011/8/1 Hervé BARRAULT <he...@gmail.com>

> Hi,
> I have looked to the purge mechanism to try to understand why it takes so
> much time to clean a queue.
>
> I have noticed something :
>
> ####
> CLASS org.apache.activemq.store.jdbc.Statements
> public String getFindMessageSequenceIdStatement() {
>     if (findMessageSequenceIdStatement == null) {
>         findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
> getFullMessageTableName()
>             + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
>     }
>     return findMessageSequenceIdStatement;
> }
>
> public String getRemoveMessageStatement() {
>         if (removeMessageStatement == null) {
>             removeMessageStatement = "DELETE FROM " +
> getFullMessageTableName() + " WHERE ID=?";
>         }
>         return removeMessageStatement;
>     }
> ###
>
> ###
> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
>
> private long getStoreSequenceIdForMessageId(MessageId messageId) throws
> IOException {
>         long result = -1;
>         TransactionContext c = persistenceAdapter.getTransactionContext();
>         try {
>             result = adapter.getStoreSequenceId(c, destination,
> messageId)[0];
>         } catch (SQLException e) {
>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>             throw IOExceptionSupport.create("Failed to get store sequenceId
> for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
>         } finally {
>             c.close();
>         }
>         return result;
>     }
>
> public void removeMessage(ConnectionContext context, MessageAck ack) throws
> IOException {
>
>         long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
>
>         // Get a connection and remove the message from the DB
>         TransactionContext c =
> persistenceAdapter.getTransactionContext(context);
>         try {
>             adapter.doRemoveMessage(c, seq);
>         } catch (SQLException e) {
>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>             throw IOExceptionSupport.create("Failed to broker message: " +
> ack.getLastMessageId() + " in container: " + e, e);
>         } finally {
>             c.close();
>         }
>     }
> ###
>
> Could it be better to use only one request to remove a row, which is :
> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
> MSGID_SEQ=? AND CONTAINER=?" ?
> or is there a case where it is not working ?
>
> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and CONTAINER
> (this index could be UNIQUE ?) ?
>
> I have checked that it takes 1min30 to dequeue my 16000 messages using the
> purge method with these two modifications. I noticed that the page size is
> 200 messages and it waits every 200 row to read from the database to be able
> to purge it, i never noticed this limit before.
>
> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
> purge a queue (does it can also apply to other components reading the queue
> ?).
>
> I don't have enough JMS broker and database mechanism knowledges to say if
> it is a good or a bad idea.
>
> Anyone can help me for this ?
>
> Thanks for answers.
> Hervé
>
>
> 2011/7/29 Hervé BARRAULT <he...@gmail.com>
>
>> Hi,
>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 server)
>> and i am doing some performance tests.
>>
>> I'm sending 16000 messages through web services (using one port) and it
>> takes about 1 min to manage all messages and fill the JMS queue.
>>
>> When i use a consumer to dequeue this queue (without adding new messages)
>> and fill another one, it takes about 6 min and 30 seconds. I was expecting
>> that i have some code which slow down the consumption.
>>
>> But i have tried to use the purge method and it takes the same time.
>>
>> Is there a way to increase the consumption rate ?
>>
>> Thanks for answers.
>>
>
>

Re: ActiveMQ performances and JDBC persistence

Posted by Hervé BARRAULT <he...@gmail.com>.
Hi,
I have looked to the purge mechanism to try to understand why it takes so
much time to clean a queue.

I have noticed something :

####
CLASS org.apache.activemq.store.jdbc.Statements
public String getFindMessageSequenceIdStatement() {
    if (findMessageSequenceIdStatement == null) {
        findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
getFullMessageTableName()
            + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
    }
    return findMessageSequenceIdStatement;
}

public String getRemoveMessageStatement() {
        if (removeMessageStatement == null) {
            removeMessageStatement = "DELETE FROM " +
getFullMessageTableName() + " WHERE ID=?";
        }
        return removeMessageStatement;
    }
###

###
CLASS org.apache.activemq.store.jdbc.JDBCMessageStore

private long getStoreSequenceIdForMessageId(MessageId messageId) throws
IOException {
        long result = -1;
        TransactionContext c = persistenceAdapter.getTransactionContext();
        try {
            result = adapter.getStoreSequenceId(c, destination,
messageId)[0];
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to get store sequenceId
for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
        } finally {
            c.close();
        }
        return result;
    }

public void removeMessage(ConnectionContext context, MessageAck ack) throws
IOException {

        long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());

        // Get a connection and remove the message from the DB
        TransactionContext c =
persistenceAdapter.getTransactionContext(context);
        try {
            adapter.doRemoveMessage(c, seq);
        } catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to broker message: " +
ack.getLastMessageId() + " in container: " + e, e);
        } finally {
            c.close();
        }
    }
###

Could it be better to use only one request to remove a row, which is :
"DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
MSGID_SEQ=? AND CONTAINER=?" ?
or is there a case where it is not working ?

Could it be better to create an index based on MSGID_PROD, MSGID_SEQ
and CONTAINER
(this index could be UNIQUE ?) ?

I have checked that it takes 1min30 to dequeue my 16000 messages using the
purge method with these two modifications. I noticed that the page size is
200 messages and it waits every 200 row to read from the database to be able
to purge it, i never noticed this limit before.

So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
purge a queue (does it can also apply to other components reading the queue
?).

I don't have enough JMS broker and database mechanism knowledges to say if
it is a good or a bad idea.

Anyone can help me for this ?

Thanks for answers.
Hervé

2011/7/29 Hervé BARRAULT <he...@gmail.com>

> Hi,
> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 server)
> and i am doing some performance tests.
>
> I'm sending 16000 messages through web services (using one port) and it
> takes about 1 min to manage all messages and fill the JMS queue.
>
> When i use a consumer to dequeue this queue (without adding new messages)
> and fill another one, it takes about 6 min and 30 seconds. I was expecting
> that i have some code which slow down the consumption.
>
> But i have tried to use the purge method and it takes the same time.
>
> Is there a way to increase the consumption rate ?
>
> Thanks for answers.
>