You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Bob M <rg...@orcon.net.nz> on 2015/10/30 03:37:33 UTC

First activeMQ use - help please

I have the following code so far:-

Producer code:-

public class TradeProducer implements Runnable {
        public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost");
 
                // Create a Connection
                javax.jms.Connection connection =
connectionFactory.createConnection();
                connection.start();
 
                // Create a Session
                Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
 
                // Create the Topic
                Topic topic = session.createTopic("prospective_trade");
 
                // Create a MessageProducer from the Session to the Topic
                MessageProducer producer = session.createProducer(topic);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
                // Create a messages
                String text = "Hello world! From: " +
Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);
 
                // Tell the producer to send the message
                System.out.println("Sent message: "+ message.hashCode() + "
: " + Thread.currentThread().getName());
                producer.send(message);
 
                // Clean up
                session.close();
                connection.close();
            }
            catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

Consumer code:-

public class TradeConsumer implements Runnable, ExceptionListener {
        public void run() {
            try {
 
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost");
 
                // Create a Connection
                javax.jms.Connection connection =
connectionFactory.createConnection();
                connection.start();
 
                connection.setExceptionListener(this);
 
                // Create a Session
                Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
 
                // Create the Topic
                Topic topic = session.createTopic("prospective_trade");
 
                // Create a MessageConsumer from the Session to the Topic
                MessageConsumer consumer = session.createConsumer(topic);
 
                // Wait for a message
                Message message = consumer.receive(1000);
 
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }
 
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
 
        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occurred.  Shutting down
client.");
        }
    }

My question:-
Is it possible to enclose the "wait for a message' section of code in a loop
which has as a maximum number, the no. of active consumers minus 1 ?

i.e. I am trying to decide when I have received messages (one only) from
each of the other active consumers before continuing with the remainder of
the code




--
View this message in context: http://activemq.2283324.n4.nabble.com/First-activeMQ-use-help-please-tp4703506.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: First activeMQ use - help please

Posted by Bob M <rg...@orcon.net.nz>.
Thanks Tim

The active consumers will send a 'subscribe' message when starting and a
'unsubscribe' message when stopping.

I figure that I should know the no. of active consumers at any stage

They all do similar calculations simultaneously and so the sending of each
prospective trade takes place within seconds.

But you suggestion of a time limit as well is well founded

Thank you for your advice

Bob M 




--
View this message in context: http://activemq.2283324.n4.nabble.com/First-activeMQ-use-help-please-tp4703506p4703536.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: First activeMQ use - help please

Posted by Tim Bain <tb...@alumni.duke.edu>.
It's not possible for a consumer to know the number of producers.  Well,
it's possible via JMX, but it's not desirable.  The whole point of JMS is
that it decouples producers and consumers so they don't have to know about
each other.

If you're going to need to wait for N publishers to send messages and N
varies dynamically, you could:
* wait a set amount of time and assume you've gotten everything you're
going to get
* make producers send you a heartbeat message periodically so you can count
them
* if the message takes a long time to produce, make producers send a
message when they start producing the message, so you know how many to
expect

Any way you do it, you need to handle the case where your count is too high
because a producer died before producing the message, so put a time limit
on it even if you do something else too.

Tim
On Oct 29, 2015 8:50 PM, "Bob M" <rg...@orcon.net.nz> wrote:

> I have the following code so far:-
>
> Producer code:-
>
> public class TradeProducer implements Runnable {
>         public void run() {
>             try {
>                 // Create a ConnectionFactory
>                 ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("vm://localhost");
>
>                 // Create a Connection
>                 javax.jms.Connection connection =
> connectionFactory.createConnection();
>                 connection.start();
>
>                 // Create a Session
>                 Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>                 // Create the Topic
>                 Topic topic = session.createTopic("prospective_trade");
>
>                 // Create a MessageProducer from the Session to the Topic
>                 MessageProducer producer = session.createProducer(topic);
>                 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>
>                 // Create a messages
>                 String text = "Hello world! From: " +
> Thread.currentThread().getName() + " : " + this.hashCode();
>                 TextMessage message = session.createTextMessage(text);
>
>                 // Tell the producer to send the message
>                 System.out.println("Sent message: "+ message.hashCode() + "
> : " + Thread.currentThread().getName());
>                 producer.send(message);
>
>                 // Clean up
>                 session.close();
>                 connection.close();
>             }
>             catch (Exception e) {
>                 System.out.println("Caught: " + e);
>                 e.printStackTrace();
>             }
>         }
>     }
>
> Consumer code:-
>
> public class TradeConsumer implements Runnable, ExceptionListener {
>         public void run() {
>             try {
>
>                 // Create a ConnectionFactory
>                 ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("vm://localhost");
>
>                 // Create a Connection
>                 javax.jms.Connection connection =
> connectionFactory.createConnection();
>                 connection.start();
>
>                 connection.setExceptionListener(this);
>
>                 // Create a Session
>                 Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>                 // Create the Topic
>                 Topic topic = session.createTopic("prospective_trade");
>
>                 // Create a MessageConsumer from the Session to the Topic
>                 MessageConsumer consumer = session.createConsumer(topic);
>
>                 // Wait for a message
>                 Message message = consumer.receive(1000);
>
>                 if (message instanceof TextMessage) {
>                     TextMessage textMessage = (TextMessage) message;
>                     String text = textMessage.getText();
>                     System.out.println("Received: " + text);
>                 } else {
>                     System.out.println("Received: " + message);
>                 }
>
>                 consumer.close();
>                 session.close();
>                 connection.close();
>             } catch (Exception e) {
>                 System.out.println("Caught: " + e);
>                 e.printStackTrace();
>             }
>         }
>
>         public synchronized void onException(JMSException ex) {
>             System.out.println("JMS Exception occurred.  Shutting down
> client.");
>         }
>     }
>
> My question:-
> Is it possible to enclose the "wait for a message' section of code in a
> loop
> which has as a maximum number, the no. of active consumers minus 1 ?
>
> i.e. I am trying to decide when I have received messages (one only) from
> each of the other active consumers before continuing with the remainder of
> the code
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/First-activeMQ-use-help-please-tp4703506.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>