You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by madsum <ma...@gmail.com> on 2017/09/24 15:35:52 UTC

What is the use case of BrokerService in ActiveMQ and how to use it correctly

I am new about ActiveMQ. I'm trying to study and check how it works by
checking the example code provided by Apache at this link:-

http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

public class Server implements MessageListener {
        private static int ackMode;
        private static String messageQueueName;
        private static String messageBrokerUrl;

        private Session session;
        private boolean transacted = false;
        private MessageProducer replyProducer;
        private MessageProtocol messageProtocol;

        static {
            messageBrokerUrl = "tcp://localhost:61616";
            messageQueueName = "client.messages";
            ackMode = Session.AUTO_ACKNOWLEDGE;
        }

        public Server() {
            try {
                //This message broker is embedded
                BrokerService broker = new BrokerService();
                broker.setPersistent(false);
                broker.setUseJmx(false);
                broker.addConnector(messageBrokerUrl);
                broker.start();
            } catch (Exception e) {
                System.out.println("Exception: "+e.getMessage());
                //Handle the exception appropriately
            }

            //Delegating the handling of messages to another class,
instantiate it before setting up JMS so it
            //is ready to handle messages
            this.messageProtocol = new MessageProtocol();
            this.setupMessageQueueConsumer();
        }

        private void setupMessageQueueConsumer() {
            ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(messageBrokerUrl);
            Connection connection;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                this.session = connection.createSession(this.transacted,
ackMode);
                Destination adminQueue =
this.session.createQueue(messageQueueName);

                //Setup a message producer to respond to messages from
clients, we will get the destination
                //to send to from the JMSReplyTo header field from a Message
                this.replyProducer = this.session.createProducer(null);
               
this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                //Set up a consumer to consume messages off of the admin
queue
                MessageConsumer consumer =
this.session.createConsumer(adminQueue);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                System.out.println("Exception: "+e.getMessage());
            }
        }

        public void onMessage(Message message) {
            try {
                TextMessage response = this.session.createTextMessage();
                if (message instanceof TextMessage) {
                    TextMessage txtMsg = (TextMessage) message;
                    String messageText = txtMsg.getText();
                   
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
                }

                //Set the correlation ID from the received message to be the
correlation id of the response message
                //this lets the client identify which message this is a
response to if it has more than
                //one outstanding message to the server
                response.setJMSCorrelationID(message.getJMSCorrelationID());

                //Send the response to the Destination specified by the
JMSReplyTo field of the received message,
                //this is presumably a temporary queue created by the client
                this.replyProducer.send(message.getJMSReplyTo(), response);
            } catch (JMSException e) {
                System.out.println("Exception: "+e.getMessage());
            }
        }

        public static void main(String[] args) {
            new Server();
        }
    }

My confusion about the messageBrokerUrl = "tcp://localhost:61616"; You know
ActiveMQ service is running on port 61616 by default. Why does this example
choose same port? If I try to run the code throws exception as: Exception:
Failed to bind to server socket: tcp://localhost:61616 due to:
java.net.BindException: Address already in use: JVM_Bind

Perhaps if I stop the ActiveMQ service then I can execute the code. Then it
works.

My assumptions are if I embed BrokerService in the code then I don't need
ActiveMQ service running anymore BrokerService represent ActiveMQ service in
the coed. I don't need ActiveMQ service running if I use BrokerService my
code.  Please correct me. 

Please let me know why it is like this in the example and how to work with
BrokerService correctly. Thanks in advnace! 

 





--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: What is the use case of BrokerService in ActiveMQ and how to use it correctly

Posted by Tim Bain <tb...@alumni.duke.edu>.
As you said, the purpose of BrokerService is to run a broker, either as a
standalone process or embedded within your application. A standalone broker
is the most common configuration, because it more easily supports multiple
client processes, but if all your producers and consumers run in the same
JVM, then an embedded broker might work for you.

Whichever approach you choose, you can only do one or the other on the same
host, or you have to configure them to use different ports.

As far as why the example is that way, it's because embedding the broker
makes the test self-contained and simpler. But the producer and consumer
code from this test would work the same with a standalone broker.

Tim

On Sep 24, 2017 9:57 AM, "madsum" <ma...@gmail.com> wrote:

> I am new about ActiveMQ. I'm trying to study and check how it works by
> checking the example code provided by Apache at this link:-
>
> http://activemq.apache.org/how-should-i-implement-
> request-response-with-jms.html
>
> public class Server implements MessageListener {
>         private static int ackMode;
>         private static String messageQueueName;
>         private static String messageBrokerUrl;
>
>         private Session session;
>         private boolean transacted = false;
>         private MessageProducer replyProducer;
>         private MessageProtocol messageProtocol;
>
>         static {
>             messageBrokerUrl = "tcp://localhost:61616";
>             messageQueueName = "client.messages";
>             ackMode = Session.AUTO_ACKNOWLEDGE;
>         }
>
>         public Server() {
>             try {
>                 //This message broker is embedded
>                 BrokerService broker = new BrokerService();
>                 broker.setPersistent(false);
>                 broker.setUseJmx(false);
>                 broker.addConnector(messageBrokerUrl);
>                 broker.start();
>             } catch (Exception e) {
>                 System.out.println("Exception: "+e.getMessage());
>                 //Handle the exception appropriately
>             }
>
>             //Delegating the handling of messages to another class,
> instantiate it before setting up JMS so it
>             //is ready to handle messages
>             this.messageProtocol = new MessageProtocol();
>             this.setupMessageQueueConsumer();
>         }
>
>         private void setupMessageQueueConsumer() {
>             ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(messageBrokerUrl);
>             Connection connection;
>             try {
>                 connection = connectionFactory.createConnection();
>                 connection.start();
>                 this.session = connection.createSession(this.transacted,
> ackMode);
>                 Destination adminQueue =
> this.session.createQueue(messageQueueName);
>
>                 //Setup a message producer to respond to messages from
> clients, we will get the destination
>                 //to send to from the JMSReplyTo header field from a
> Message
>                 this.replyProducer = this.session.createProducer(null);
>
> this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>
>                 //Set up a consumer to consume messages off of the admin
> queue
>                 MessageConsumer consumer =
> this.session.createConsumer(adminQueue);
>                 consumer.setMessageListener(this);
>             } catch (JMSException e) {
>                 System.out.println("Exception: "+e.getMessage());
>             }
>         }
>
>         public void onMessage(Message message) {
>             try {
>                 TextMessage response = this.session.createTextMessage();
>                 if (message instanceof TextMessage) {
>                     TextMessage txtMsg = (TextMessage) message;
>                     String messageText = txtMsg.getText();
>
> response.setText(this.messageProtocol.handleProtocolMessage(messageText));
>                 }
>
>                 //Set the correlation ID from the received message to be
> the
> correlation id of the response message
>                 //this lets the client identify which message this is a
> response to if it has more than
>                 //one outstanding message to the server
>                 response.setJMSCorrelationID(
> message.getJMSCorrelationID());
>
>                 //Send the response to the Destination specified by the
> JMSReplyTo field of the received message,
>                 //this is presumably a temporary queue created by the
> client
>                 this.replyProducer.send(message.getJMSReplyTo(),
> response);
>             } catch (JMSException e) {
>                 System.out.println("Exception: "+e.getMessage());
>             }
>         }
>
>         public static void main(String[] args) {
>             new Server();
>         }
>     }
>
> My confusion about the messageBrokerUrl = "tcp://localhost:61616"; You know
> ActiveMQ service is running on port 61616 by default. Why does this example
> choose same port? If I try to run the code throws exception as: Exception:
> Failed to bind to server socket: tcp://localhost:61616 due to:
> java.net.BindException: Address already in use: JVM_Bind
>
> Perhaps if I stop the ActiveMQ service then I can execute the code. Then it
> works.
>
> My assumptions are if I embed BrokerService in the code then I don't need
> ActiveMQ service running anymore BrokerService represent ActiveMQ service
> in
> the coed. I don't need ActiveMQ service running if I use BrokerService my
> code.  Please correct me.
>
> Please let me know why it is like this in the example and how to work with
> BrokerService correctly. Thanks in advnace!
>
>
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>