You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by angeloslenis <le...@gmail.com> on 2018/11/07 14:32:24 UTC

Consume rate drops dracstically with message process latency, even with many consumers per queue.

Hello,

TLDR: When we introduce some latency on the consumers, the maximum consume
rate drops drastically, even though we have enough consumer threads to
handle the rate.


We are currently evaluating ActiveMQ, focusing on produce/consume rate:
The setup is the following:
- Virtual Topics: 100 
- Consumer Queues: 3 per virtual topic amounting to 300 queues.
- Produce rate: 10 messages/second on each Virtual topic, amounting to
1000messages/second
-- PooledConnectionFactory with 16 connections
- Consumers: 4 different consumers on each queue, amounting to 1200
consumers (i.e. sessions + consumer)
-- another PooledConnectionFactory with 16 connections
- Broker configuration: default settings, producerFlowControl is enabled
- Everything runs on a single machine, i7 2.5Ghz, 16GB memory

When the consumers process messages very fast we can achieve a consume rate
of 3000 messages/sec which is the expected one for the 1000messages/sec that
we produce (3 queues per topic).

When we introduce some latency on the consumers, the maximum consume rate
drops drastically, even though we have enough consumers (== threads) to
handle the rate:

- Latency 10 msec, Consume Rate drops to 1300 messages/sec
- Latency 20 msec, Consume Rate drops to 650 messages/sec
- Latency 100 msec, Consume Rate drops to 130 messages/sec

In all cases produce rate also drops to approximately 1/3 of consume rate,
due to flow control.

Any advice on why the consume rate drops?


Below are code samples, with the connections and producers/consumers
configurations:

// ------------------------------------
// Producer Connection Settings
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connectionUrl(amqConfiguration));
connectionFactory.setUserName(amqConfiguration.getPublisherUsername());
connectionFactory.setPassword(amqConfiguration.getPublisherPassword());
connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Producer");
connectionFactory.setAlwaysSyncSend(true);
connectionFactory.setDispatchAsync(false);
connectionFactory.setSendTimeout(5000);

publisherConnectionFactory = new PooledConnectionFactory(connectionFactory);
publisherConnectionFactory.setMaxConnections(16);
publisherConnectionFactory.start();

// ------------------------------------
// Producer: this is called by different threads  to produce 10 messages/sec
on each topic
private synchronized void publish(String topic) throws JMSException,
TException {
    // 1. Create connection
    Connection producerConnection =
publisherConnectionFactory.createConnection();
    // 2. Create Session
    Session producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
    // 3. Create Destination
    Destination destination = producerSession.createTopic(topic);
    // ^^ e.g. "VirtualTopic.ProducerYYY"
    // 4. Create Producer
    MessageProducer producer = producerSession.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    // 5. Create message
    MapMessage producerMessage = producerSession.createMapMessage();
    {
        // Add some payload
        byte[] payload = new
byte[monitorParameters.getProducerMessageSize()];
        new Random().nextBytes(payload);
        producerMessage.setObject("payload", payload);
    }

    // 6. Send Message
    producer.send(producerMessage);

    producer.close();
    producerSession.close();
    producerConnection.close();
}

// ------------------------------------
// Consumer Connection Settings
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connectionUrl(amqConfiguration));
connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Consumer");
connectionFactory.setUserName(amqConfiguration.getConsumerUsername());
connectionFactory.setPassword(amqConfiguration.getConsumerPassword());
connectionFactory.setAlwaysSessionAsync(true);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.setDispatchAsync(false);
connectionFactory.setSendTimeout(5000);
connectionFactory.setConnectResponseTimeout(5000);
connectionFactory.setMessagePrioritySupported(true);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(1);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(0);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy);

consumerConnectionFactory = new PooledConnectionFactory(connectionFactory);
consumerConnectionFactory.setMaxConnections(16);

consumerConnectionFactory.start();

//---------------------------------------------------
// Consumer: this is called 4 times per queue => 4 * 300 = 1200 sessions +
consumers
void startConsumer(String queue) {
    // 1. Establish a connection for the consumer.
    consumerConnection = consumerConnectionFactory.createConnection();
    // 2. Create a session.
    consumerSession = consumerConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
    // 3. Create a message consumer from the session to the queue.
    Destination destination = consumerSession.createQueue(queue));
    // ^^ e.g. "Consumer.ConsumerXXX.VirtualTopic.ProducerYYY"
    // 4. Create consumer
    consumer = consumerSession.createConsumer(destination);
    // 5. Add a listener for handling received messages
    consumer.setMessageListener((consumerMessage) -> {
        try {
            MapMessage consumerByteMessage = (MapMessage) consumerMessage;
                try {
                    Thread.sleep(N); // Introduce some artificial delay
                } catch (InterruptedException e) {
                    logger.error("", e);
                }
                consumerMessage.acknowledge();
        } catch (JMSException e) {
            try {
                consumerSession.recover();
            } catch (JMSException e1) {
                throw new RuntimeException(e1);
            }
        }
    });
    consumerConnection.start();
}



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: Consume rate drops dracstically with message process latency, even with many consumers per queue.

Posted by angeloslenis <le...@gmail.com>.
That was my initial thought also, so we had already set prefetch limit = 1,
which i think is the lowest allowed value, to run our tests:

ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 
prefetchPolicy.setAll(1); 
connectionFactory.setPrefetchPolicy(prefetchPolicy); 

With a value of '0' we get the following error:
javax.jms.JMSException: Illegal prefetch size of zero. This setting is not
supported for asynchronous consumers please set a value of at least 1.


My understanding is that different consumer sessions are not running on
different threads.
I added a debug log in the "onConsume" function of each consumer to
validate:

onConsume(Message message) {
  logger.info(Thread.currentThread().getName())
}

which prints the following unique entries:

ActiveMQ Session Task-1
ActiveMQ Session Task-2
....
ActiveMQ Session Task-100

As i understand, even though i have created 1200 consumer sessions there are
only 100 threads running.

What do you think?



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: Consume rate drops dracstically with message process latency, even with many consumers per queue.

Posted by Tim Bain <tb...@alumni.duke.edu>.
My guess is that this is because messages are sitting in the prefetch
buffer of consumers while they are being slow to consume their current
message, making those messages unavailable for other consumers who might be
idle.

If you change your consumers' prefetch size to 0, do you get the expected
throughput?

Tim

On Wed, Nov 7, 2018, 7:58 AM angeloslenis <lenis.angelos@gmail.com wrote:

> Hello,
>
> TLDR: When we introduce some latency on the consumers, the maximum consume
> rate drops drastically, even though we have enough consumer threads to
> handle the rate.
>
>
> We are currently evaluating ActiveMQ, focusing on produce/consume rate:
> The setup is the following:
> - Virtual Topics: 100
> - Consumer Queues: 3 per virtual topic amounting to 300 queues.
> - Produce rate: 10 messages/second on each Virtual topic, amounting to
> 1000messages/second
> -- PooledConnectionFactory with 16 connections
> - Consumers: 4 different consumers on each queue, amounting to 1200
> consumers (i.e. sessions + consumer)
> -- another PooledConnectionFactory with 16 connections
> - Broker configuration: default settings, producerFlowControl is enabled
> - Everything runs on a single machine, i7 2.5Ghz, 16GB memory
>
> When the consumers process messages very fast we can achieve a consume rate
> of 3000 messages/sec which is the expected one for the 1000messages/sec
> that
> we produce (3 queues per topic).
>
> When we introduce some latency on the consumers, the maximum consume rate
> drops drastically, even though we have enough consumers (== threads) to
> handle the rate:
>
> - Latency 10 msec, Consume Rate drops to 1300 messages/sec
> - Latency 20 msec, Consume Rate drops to 650 messages/sec
> - Latency 100 msec, Consume Rate drops to 130 messages/sec
>
> In all cases produce rate also drops to approximately 1/3 of consume rate,
> due to flow control.
>
> Any advice on why the consume rate drops?
>
>
> Below are code samples, with the connections and producers/consumers
> configurations:
>
> // ------------------------------------
> // Producer Connection Settings
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(connectionUrl(amqConfiguration));
> connectionFactory.setUserName(amqConfiguration.getPublisherUsername());
> connectionFactory.setPassword(amqConfiguration.getPublisherPassword());
> connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Producer");
> connectionFactory.setAlwaysSyncSend(true);
> connectionFactory.setDispatchAsync(false);
> connectionFactory.setSendTimeout(5000);
>
> publisherConnectionFactory = new
> PooledConnectionFactory(connectionFactory);
> publisherConnectionFactory.setMaxConnections(16);
> publisherConnectionFactory.start();
>
> // ------------------------------------
> // Producer: this is called by different threads  to produce 10
> messages/sec
> on each topic
> private synchronized void publish(String topic) throws JMSException,
> TException {
>     // 1. Create connection
>     Connection producerConnection =
> publisherConnectionFactory.createConnection();
>     // 2. Create Session
>     Session producerSession = producerConnection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>     // 3. Create Destination
>     Destination destination = producerSession.createTopic(topic);
>     // ^^ e.g. "VirtualTopic.ProducerYYY"
>     // 4. Create Producer
>     MessageProducer producer = producerSession.createProducer(destination);
>     producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>
>     // 5. Create message
>     MapMessage producerMessage = producerSession.createMapMessage();
>     {
>         // Add some payload
>         byte[] payload = new
> byte[monitorParameters.getProducerMessageSize()];
>         new Random().nextBytes(payload);
>         producerMessage.setObject("payload", payload);
>     }
>
>     // 6. Send Message
>     producer.send(producerMessage);
>
>     producer.close();
>     producerSession.close();
>     producerConnection.close();
> }
>
> // ------------------------------------
> // Consumer Connection Settings
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(connectionUrl(amqConfiguration));
> connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Consumer");
> connectionFactory.setUserName(amqConfiguration.getConsumerUsername());
> connectionFactory.setPassword(amqConfiguration.getConsumerPassword());
> connectionFactory.setAlwaysSessionAsync(true);
> connectionFactory.setOptimizeAcknowledge(false);
> connectionFactory.setDispatchAsync(false);
> connectionFactory.setSendTimeout(5000);
> connectionFactory.setConnectResponseTimeout(5000);
> connectionFactory.setMessagePrioritySupported(true);
> ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
> prefetchPolicy.setAll(1);
> connectionFactory.setPrefetchPolicy(prefetchPolicy);
> RedeliveryPolicy redeliveryPolicy =
> connectionFactory.getRedeliveryPolicy();
> redeliveryPolicy.setMaximumRedeliveries(0);
> connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
>
> consumerConnectionFactory = new PooledConnectionFactory(connectionFactory);
> consumerConnectionFactory.setMaxConnections(16);
>
> consumerConnectionFactory.start();
>
> //---------------------------------------------------
> // Consumer: this is called 4 times per queue => 4 * 300 = 1200 sessions +
> consumers
> void startConsumer(String queue) {
>     // 1. Establish a connection for the consumer.
>     consumerConnection = consumerConnectionFactory.createConnection();
>     // 2. Create a session.
>     consumerSession = consumerConnection.createSession(false,
> Session.CLIENT_ACKNOWLEDGE);
>     // 3. Create a message consumer from the session to the queue.
>     Destination destination = consumerSession.createQueue(queue));
>     // ^^ e.g. "Consumer.ConsumerXXX.VirtualTopic.ProducerYYY"
>     // 4. Create consumer
>     consumer = consumerSession.createConsumer(destination);
>     // 5. Add a listener for handling received messages
>     consumer.setMessageListener((consumerMessage) -> {
>         try {
>             MapMessage consumerByteMessage = (MapMessage) consumerMessage;
>                 try {
>                     Thread.sleep(N); // Introduce some artificial delay
>                 } catch (InterruptedException e) {
>                     logger.error("", e);
>                 }
>                 consumerMessage.acknowledge();
>         } catch (JMSException e) {
>             try {
>                 consumerSession.recover();
>             } catch (JMSException e1) {
>                 throw new RuntimeException(e1);
>             }
>         }
>     });
>     consumerConnection.start();
> }
>
>
>
> --
> Sent from:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
>