You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Yann Massard <ya...@gmail.com> on 2021/11/17 18:25:06 UTC

Confusion around reusing session, producer, consumer, ...

Hi,

working with Artemis, I am trying to understand how to reuse a session 
and corresponding producer and consumer in a request-reply scenario. I 
have started with the RequestReplyExample and tried to make it work with 
multiple requests/responses. After all, the example's comments say:

"Of course, in a real world example you would re-use the session, 
producer, consumer and temporary queue and not create a new one for each 
message!"

Since I know that sessions should not be used by multiple threads, I am 
making sure, all messages are sent through the same thread.

(My code is at the very bottom of this message.)

However, I still get exceptions:

Nov 17, 2021 6:42:58 PM 
org.apache.activemq.artemis.core.client.impl.ClientSessionImpl startCall
WARN: AMQ212051: Invalid concurrent session usage. Sessions are not 
supposed to be used by more than one thread concurrently.
java.lang.Exception: trace
     at 
org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.startCall(ClientSessionImpl.java:1587)
     at 
org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.acknowledge(ClientSessionImpl.java:1209)
     at 
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doAck(ClientConsumerImpl.java:1117)
     at 
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.acknowledge(ClientConsumerImpl.java:788)
     at 
org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:136)
     at 
org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:38)
     at 
org.apache.activemq.artemis.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:136)
     at 
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
     at 
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
     at 
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
     at 
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
     at 
org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
     at 
org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)


As soon as I remove the client's MessageListener, the exceptions are 
gone. So I assume that sending the ACK (see stack trace) is done by 
another thread and that might be the problem. However, I don't know how 
to change this.

Can anybody give me a hint how to procede?

Btw. the comment also says:

"Or better still use the correlation id, and just store the requests in 
a map, then you don't need a temporary queue at all"

I am very interested but have no idea how this is supposed to work. 
Which queue should the responses be sent through?

Any help is greatly appreciated!

Thank you!

Yann


package org.apache.activemq.artemis.jms.example;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class RequestReplyExample {

    public static void main(final String[] args)throws Exception {
       new Server().start();
       Client client =new Client();
       for (int i =0; i <100; i++) {
          client.sendJob("message-" + i);
       }
    }
}

class Client {

    private final Executor executor =Executors.newSingleThreadExecutor();
    private final Session session;
    private final TemporaryQueue replyQueue;
    private final MessageProducer producer;

    public Client()throws JMSException {
       var cf =new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
       Connection connection =cf.createConnection();
       connection.start();
       session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

       Queue requestQueue =session.createQueue("queue1");
       producer =session.createProducer(requestQueue);

       replyQueue =session.createTemporaryQueue();
       MessageConsumer replyConsumer =session.createConsumer(replyQueue);
       replyConsumer.setMessageListener(replyMessage -> {
          try {
             TextMessage reply = (TextMessage) replyMessage;
             System.out.println("Got reply: " +reply.getText());
             // use correlationId to correlate with request...
          }catch (JMSException e) {
             e.printStackTrace();
          }
       });
    }

    public void sendJob(String json) {
       executor.execute(() -> {// make sure the session is only used by a single thread! try {
             TextMessage msg =session.createTextMessage(json);
             msg.setJMSReplyTo(replyQueue);
             System.out.println("Sending message: " +json +" with replyTo: " +replyQueue);
             producer.send(msg);
          }catch (JMSException e) {
             e.printStackTrace();
          }
       });
    }

}

class Server {
    private Connection connection;

    public void start()throws Exception {
       ActiveMQConnectionFactory cf =new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
       connection =cf.createConnection();

       connection.start();
       Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
       Queue requestQueue =session.createQueue("queue1");
       MessageProducer replyProducer =session.createProducer(null);
       MessageConsumer requestConsumer =session.createConsumer(requestQueue);
       requestConsumer.setMessageListener(request -> {
          try {
             System.out.println("Received request message: " + ((TextMessage) request).getText());
             Destination replyDestination = request.getJMSReplyTo();
             System.out.println("Reply to queue: " +replyDestination);
             TextMessage replyMessage =session.createTextMessage("A reply message");
             replyMessage.setJMSCorrelationID(request.getJMSMessageID());
             replyProducer.send(replyDestination,replyMessage);
             System.out.println("Reply sent");
          }catch (JMSException e) {
             e.printStackTrace();
          }
       });
    }

    public void shutdown()throws JMSException {
       connection.close();
    }
}



Re: Confusion around reusing session, producer, consumer, ...

Posted by Yann Massard <ya...@gmail.com>.
Hi Robbie,

Thanks for your super helpful reply! I'll go with a fixed reply queue and a
dedicated thread for receiving replies.

Again, thanks a lot!
Yann


On Thu, Nov 18, 2021 at 2:42 PM Robbie Gemmell <ro...@gmail.com>
wrote:

> Setting a MessageListener on a consumer dedicates the Session to its
> asynchronous message delivery thread of control [while the connection
> is started]. So sending with one thread and also having listener(s)
> being delivered consumed messages on the same session is still
> multi-threading the session.
>
> Using a synchronous receive(..) calls to consume replies, instead of a
> MessageListener, might be more appropriate than setting a listener if
> you want/need to use a single session for send+recieve. You would then
> only have one application thread using the session. Alternatively,
> MessageListener callbacks can also send.
>
> The comments are suggesting that you can use the same queue for
> multiple responses [concurrently] by mapping the arriving response to
> a particular request through use of a JMSCorrelationID value set on
> the message, which you included on the request and that the responder
> then sets on its reply back so that you can correlate them. A reply-to
> queue does not need to be a TemporaryQueue either way, you can also
> use fixed queues, and often you might need to depending on your
> reliability needs for the responses etc.
>
>
> On Wed, 17 Nov 2021 at 18:25, Yann Massard <ya...@gmail.com> wrote:
> >
> > Hi,
> >
> > working with Artemis, I am trying to understand how to reuse a session
> > and corresponding producer and consumer in a request-reply scenario. I
> > have started with the RequestReplyExample and tried to make it work with
> > multiple requests/responses. After all, the example's comments say:
> >
> > "Of course, in a real world example you would re-use the session,
> > producer, consumer and temporary queue and not create a new one for each
> > message!"
> >
> > Since I know that sessions should not be used by multiple threads, I am
> > making sure, all messages are sent through the same thread.
> >
> > (My code is at the very bottom of this message.)
> >
> > However, I still get exceptions:
> >
> > Nov 17, 2021 6:42:58 PM
> > org.apache.activemq.artemis.core.client.impl.ClientSessionImpl startCall
> > WARN: AMQ212051: Invalid concurrent session usage. Sessions are not
> > supposed to be used by more than one thread concurrently.
> > java.lang.Exception: trace
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.startCall(ClientSessionImpl.java:1587)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.acknowledge(ClientSessionImpl.java:1209)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doAck(ClientConsumerImpl.java:1117)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.acknowledge(ClientConsumerImpl.java:788)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:136)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:38)
> >      at
> >
> org.apache.activemq.artemis.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:136)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> >      at
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> >      at
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> >      at
> >
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> >      at
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> >      at
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> >      at
> >
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> >
> >
> > As soon as I remove the client's MessageListener, the exceptions are
> > gone. So I assume that sending the ACK (see stack trace) is done by
> > another thread and that might be the problem. However, I don't know how
> > to change this.
> >
> > Can anybody give me a hint how to procede?
> >
> > Btw. the comment also says:
> >
> > "Or better still use the correlation id, and just store the requests in
> > a map, then you don't need a temporary queue at all"
> >
> > I am very interested but have no idea how this is supposed to work.
> > Which queue should the responses be sent through?
> >
> > Any help is greatly appreciated!
> >
> > Thank you!
> >
> > Yann
> >
> >
> > package org.apache.activemq.artemis.jms.example;
> >
> > import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
> >
> > import javax.jms.*;
> > import java.util.concurrent.Executor;
> > import java.util.concurrent.Executors;
> >
> > public class RequestReplyExample {
> >
> >     public static void main(final String[] args)throws Exception {
> >        new Server().start();
> >        Client client =new Client();
> >        for (int i =0; i <100; i++) {
> >           client.sendJob("message-" + i);
> >        }
> >     }
> > }
> >
> > class Client {
> >
> >     private final Executor executor =Executors.newSingleThreadExecutor();
> >     private final Session session;
> >     private final TemporaryQueue replyQueue;
> >     private final MessageProducer producer;
> >
> >     public Client()throws JMSException {
> >        var cf =new
> ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
> >        Connection connection =cf.createConnection();
> >        connection.start();
> >        session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
> >
> >        Queue requestQueue =session.createQueue("queue1");
> >        producer =session.createProducer(requestQueue);
> >
> >        replyQueue =session.createTemporaryQueue();
> >        MessageConsumer replyConsumer =session.createConsumer(replyQueue);
> >        replyConsumer.setMessageListener(replyMessage -> {
> >           try {
> >              TextMessage reply = (TextMessage) replyMessage;
> >              System.out.println("Got reply: " +reply.getText());
> >              // use correlationId to correlate with request...
> >           }catch (JMSException e) {
> >              e.printStackTrace();
> >           }
> >        });
> >     }
> >
> >     public void sendJob(String json) {
> >        executor.execute(() -> {// make sure the session is only used by
> a single thread! try {
> >              TextMessage msg =session.createTextMessage(json);
> >              msg.setJMSReplyTo(replyQueue);
> >              System.out.println("Sending message: " +json +" with
> replyTo: " +replyQueue);
> >              producer.send(msg);
> >           }catch (JMSException e) {
> >              e.printStackTrace();
> >           }
> >        });
> >     }
> >
> > }
> >
> > class Server {
> >     private Connection connection;
> >
> >     public void start()throws Exception {
> >        ActiveMQConnectionFactory cf =new
> ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
> >        connection =cf.createConnection();
> >
> >        connection.start();
> >        Session session
> =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
> >        Queue requestQueue =session.createQueue("queue1");
> >        MessageProducer replyProducer =session.createProducer(null);
> >        MessageConsumer requestConsumer
> =session.createConsumer(requestQueue);
> >        requestConsumer.setMessageListener(request -> {
> >           try {
> >              System.out.println("Received request message: " +
> ((TextMessage) request).getText());
> >              Destination replyDestination = request.getJMSReplyTo();
> >              System.out.println("Reply to queue: " +replyDestination);
> >              TextMessage replyMessage =session.createTextMessage("A
> reply message");
> >              replyMessage.setJMSCorrelationID(request.getJMSMessageID());
> >              replyProducer.send(replyDestination,replyMessage);
> >              System.out.println("Reply sent");
> >           }catch (JMSException e) {
> >              e.printStackTrace();
> >           }
> >        });
> >     }
> >
> >     public void shutdown()throws JMSException {
> >        connection.close();
> >     }
> > }
> >
> >
>

Re: Confusion around reusing session, producer, consumer, ...

Posted by Robbie Gemmell <ro...@gmail.com>.
Setting a MessageListener on a consumer dedicates the Session to its
asynchronous message delivery thread of control [while the connection
is started]. So sending with one thread and also having listener(s)
being delivered consumed messages on the same session is still
multi-threading the session.

Using a synchronous receive(..) calls to consume replies, instead of a
MessageListener, might be more appropriate than setting a listener if
you want/need to use a single session for send+recieve. You would then
only have one application thread using the session. Alternatively,
MessageListener callbacks can also send.

The comments are suggesting that you can use the same queue for
multiple responses [concurrently] by mapping the arriving response to
a particular request through use of a JMSCorrelationID value set on
the message, which you included on the request and that the responder
then sets on its reply back so that you can correlate them. A reply-to
queue does not need to be a TemporaryQueue either way, you can also
use fixed queues, and often you might need to depending on your
reliability needs for the responses etc.


On Wed, 17 Nov 2021 at 18:25, Yann Massard <ya...@gmail.com> wrote:
>
> Hi,
>
> working with Artemis, I am trying to understand how to reuse a session
> and corresponding producer and consumer in a request-reply scenario. I
> have started with the RequestReplyExample and tried to make it work with
> multiple requests/responses. After all, the example's comments say:
>
> "Of course, in a real world example you would re-use the session,
> producer, consumer and temporary queue and not create a new one for each
> message!"
>
> Since I know that sessions should not be used by multiple threads, I am
> making sure, all messages are sent through the same thread.
>
> (My code is at the very bottom of this message.)
>
> However, I still get exceptions:
>
> Nov 17, 2021 6:42:58 PM
> org.apache.activemq.artemis.core.client.impl.ClientSessionImpl startCall
> WARN: AMQ212051: Invalid concurrent session usage. Sessions are not
> supposed to be used by more than one thread concurrently.
> java.lang.Exception: trace
>      at
> org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.startCall(ClientSessionImpl.java:1587)
>      at
> org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.acknowledge(ClientSessionImpl.java:1209)
>      at
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doAck(ClientConsumerImpl.java:1117)
>      at
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.acknowledge(ClientConsumerImpl.java:788)
>      at
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:136)
>      at
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:38)
>      at
> org.apache.activemq.artemis.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:136)
>      at
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
>      at
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
>      at
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
>      at
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
>      at
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
>      at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>      at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>      at
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
>
>
> As soon as I remove the client's MessageListener, the exceptions are
> gone. So I assume that sending the ACK (see stack trace) is done by
> another thread and that might be the problem. However, I don't know how
> to change this.
>
> Can anybody give me a hint how to procede?
>
> Btw. the comment also says:
>
> "Or better still use the correlation id, and just store the requests in
> a map, then you don't need a temporary queue at all"
>
> I am very interested but have no idea how this is supposed to work.
> Which queue should the responses be sent through?
>
> Any help is greatly appreciated!
>
> Thank you!
>
> Yann
>
>
> package org.apache.activemq.artemis.jms.example;
>
> import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
>
> import javax.jms.*;
> import java.util.concurrent.Executor;
> import java.util.concurrent.Executors;
>
> public class RequestReplyExample {
>
>     public static void main(final String[] args)throws Exception {
>        new Server().start();
>        Client client =new Client();
>        for (int i =0; i <100; i++) {
>           client.sendJob("message-" + i);
>        }
>     }
> }
>
> class Client {
>
>     private final Executor executor =Executors.newSingleThreadExecutor();
>     private final Session session;
>     private final TemporaryQueue replyQueue;
>     private final MessageProducer producer;
>
>     public Client()throws JMSException {
>        var cf =new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
>        Connection connection =cf.createConnection();
>        connection.start();
>        session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
>
>        Queue requestQueue =session.createQueue("queue1");
>        producer =session.createProducer(requestQueue);
>
>        replyQueue =session.createTemporaryQueue();
>        MessageConsumer replyConsumer =session.createConsumer(replyQueue);
>        replyConsumer.setMessageListener(replyMessage -> {
>           try {
>              TextMessage reply = (TextMessage) replyMessage;
>              System.out.println("Got reply: " +reply.getText());
>              // use correlationId to correlate with request...
>           }catch (JMSException e) {
>              e.printStackTrace();
>           }
>        });
>     }
>
>     public void sendJob(String json) {
>        executor.execute(() -> {// make sure the session is only used by a single thread! try {
>              TextMessage msg =session.createTextMessage(json);
>              msg.setJMSReplyTo(replyQueue);
>              System.out.println("Sending message: " +json +" with replyTo: " +replyQueue);
>              producer.send(msg);
>           }catch (JMSException e) {
>              e.printStackTrace();
>           }
>        });
>     }
>
> }
>
> class Server {
>     private Connection connection;
>
>     public void start()throws Exception {
>        ActiveMQConnectionFactory cf =new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
>        connection =cf.createConnection();
>
>        connection.start();
>        Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
>        Queue requestQueue =session.createQueue("queue1");
>        MessageProducer replyProducer =session.createProducer(null);
>        MessageConsumer requestConsumer =session.createConsumer(requestQueue);
>        requestConsumer.setMessageListener(request -> {
>           try {
>              System.out.println("Received request message: " + ((TextMessage) request).getText());
>              Destination replyDestination = request.getJMSReplyTo();
>              System.out.println("Reply to queue: " +replyDestination);
>              TextMessage replyMessage =session.createTextMessage("A reply message");
>              replyMessage.setJMSCorrelationID(request.getJMSMessageID());
>              replyProducer.send(replyDestination,replyMessage);
>              System.out.println("Reply sent");
>           }catch (JMSException e) {
>              e.printStackTrace();
>           }
>        });
>     }
>
>     public void shutdown()throws JMSException {
>        connection.close();
>     }
> }
>
>