You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Andrew M <an...@oc384.net> on 2008/03/29 08:22:22 UTC

Retroactive consumers loose msgs...

My Retroactive Consumer is only receiving the last 789 out of 5000 msgs
sent.  Any suggestions?

Thanks.

 

 

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

 

public class CommandLinePublisher {

 

    static final String HOST = "tupolev";

    static final String PORT = "61616";

    static final String URL = "tcp://"+HOST+":"+PORT;

    static final String MSG = "1"; // 1 character msg

    static final String topic = "test";

    static final int MSGS_TO_SEND = 5000;

 

    static MessageProducer producer;

    static Session session;

    static Connection connection;

 

    public static void main(String[] args) throws Exception {

        new CommandLinePublisher();

    }

 

    public CommandLinePublisher() {

 

        try {

            connect();

            TextMessage message;

            message = session.createTextMessage(MSG);

            System.out.println("Sent message: " + message.hashCode() + " : "
+ Thread.currentThread().getName());

 

            for (int x = 0; x < MSGS_TO_SEND; x++) {

                System.out.println("sending msg " + x);

                producer.send(message);

            }

 

            disconnect();

 

            new MonitorApp();

 

        } catch (JMSException e) {

            e.printStackTrace();

        }

 

    }

 

 

    class MonitorApp implements MessageListener {

 

        public MonitorApp() {

 

            // connect CLient to ActiveMQ server.

            ActiveMqClient c = new ActiveMqClient(this);

            Thread brokerThread = new Thread(c);

            brokerThread.setDaemon(true);

            brokerThread.start();

 

            System.out.println("Waiting for connection to Active MQ
server...");

            synchronized (this) {

                try {

                    wait();

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

            System.out.println("...Connected to Active MQ server.");

 

 

            try {

                c.subscribe("test", this);

            } catch (JMSException e) {

                e.printStackTrace();

            }

 

 

        }

 

        int counter = 0;

 

        public void onMessage(Message message) {

            System.out.println("received=" + counter++);

        }

    }

 

 

    class ActiveMqClient implements Runnable, ExceptionListener {

 

        Session session;

        Connection connection;

 

        Object o;

 

 

        public ActiveMqClient(Object o) {

            this.o = o;

 

        }

 

        public void run() {

 

            try {

                String url = 

                    "failover:(tcp://" + HOST + ":" + PORT +
"?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";

 

                ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);

                // Create a Connection

                connection = connectionFactory.createConnection();

                connection.start();

                connection.setExceptionListener(this);

                // Create a Session

 

                session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

 

                System.out.println("activeMQ client waiting for msgs");

                synchronized (o) {

                    o.notifyAll();

                }

 

 

            } catch (Exception e) {

                System.out.println("Caught: " + e);

                e.printStackTrace();

            }

 

        }

 

        public void close() throws JMSException {

            session.close();

            connection.close();

        }

 

        public void subscribe(String destName, MessageListener l) throws
JMSException {

            char c = destName.contains("?") ? '&' : '?';

            destName = destName + c + "consumer.retroactive=true";

            System.out.println("ActiveMqClient subscribe " + destName);

            MessageConsumer mc =
session.createConsumer(session.createTopic(destName));

            mc.setMessageListener(l);

        }

 

        int messageCounter;

 

        public synchronized void onException(JMSException ex) {

            System.out.println("JMS Exception occured.  Shutting down
client.");

        }

    }

 

 

    private static void connect() throws JMSException {

        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(URL);

        connectionFactory.setUseAsyncSend(true);

 

        // Create a Connection

        connection = connectionFactory.createConnection();

        connection.start();

 

        // Create a Session

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

        Destination destination = session.createTopic(topic);

 

        // Create a MessageProducer from the Session to the Topic or Queue

        producer = session.createProducer(destination);

        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    }

 

    static void disconnect() {

        // Clean up

        try {

            session.close();

        } catch (JMSException e) {

            e.printStackTrace();

        }

 

        try {

            connection.close();

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

 

 

}


RE: Retroactive consumers loose msgs...

Posted by Andrew M <an...@oc384.net>.
In the code below I'm sending 5000 of these:

	static final String MSG = "1"; // 1 character msg

so it shouldn't be a memory issue.  The java file of the code below is here:

	http://66.17.204.68:8765/~andrew/CommandLinePublisher.java

Thanks,
Andrew


-----Original Message-----
From: Rob Davies [mailto:rajdavies@gmail.com] 
Sent: Monday, March 31, 2008 2:58 AM
To: users@activemq.apache.org
Subject: Re: Retroactive consumers loose msgs...

How big are your messages ? - they might be reclaimed if you hit a  
memory limit in the broker
On 29 Mar 2008, at 07:22, Andrew M wrote:

> My Retroactive Consumer is only receiving the last 789 out of 5000  
> msgs
> sent.  Any suggestions?
>
> Thanks.
>
>
>
>
>
> import javax.jms.Connection;
>
> import javax.jms.DeliveryMode;
>
> import javax.jms.Destination;
>
> import javax.jms.ExceptionListener;
>
> import javax.jms.JMSException;
>
> import javax.jms.Message;
>
> import javax.jms.MessageConsumer;
>
> import javax.jms.MessageListener;
>
> import javax.jms.MessageProducer;
>
> import javax.jms.Session;
>
> import javax.jms.TextMessage;
>
>
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
>
>
>
>
> public class CommandLinePublisher {
>
>
>
>    static final String HOST = "tupolev";
>
>    static final String PORT = "61616";
>
>    static final String URL = "tcp://"+HOST+":"+PORT;
>
>    static final String MSG = "1"; // 1 character msg
>
>    static final String topic = "test";
>
>    static final int MSGS_TO_SEND = 5000;
>
>
>
>    static MessageProducer producer;
>
>    static Session session;
>
>    static Connection connection;
>
>
>
>    public static void main(String[] args) throws Exception {
>
>        new CommandLinePublisher();
>
>    }
>
>
>
>    public CommandLinePublisher() {
>
>
>
>        try {
>
>            connect();
>
>            TextMessage message;
>
>            message = session.createTextMessage(MSG);
>
>            System.out.println("Sent message: " + message.hashCode()  
> + " : "
> + Thread.currentThread().getName());
>
>
>
>            for (int x = 0; x < MSGS_TO_SEND; x++) {
>
>                System.out.println("sending msg " + x);
>
>                producer.send(message);
>
>            }
>
>
>
>            disconnect();
>
>
>
>            new MonitorApp();
>
>
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>
>
>    }
>
>
>
>
>
>    class MonitorApp implements MessageListener {
>
>
>
>        public MonitorApp() {
>
>
>
>            // connect CLient to ActiveMQ server.
>
>            ActiveMqClient c = new ActiveMqClient(this);
>
>            Thread brokerThread = new Thread(c);
>
>            brokerThread.setDaemon(true);
>
>            brokerThread.start();
>
>
>
>            System.out.println("Waiting for connection to Active MQ
> server...");
>
>            synchronized (this) {
>
>                try {
>
>                    wait();
>
>                } catch (InterruptedException e) {
>
>                    e.printStackTrace();
>
>                }
>
>            }
>
>            System.out.println("...Connected to Active MQ server.");
>
>
>
>
>
>            try {
>
>                c.subscribe("test", this);
>
>            } catch (JMSException e) {
>
>                e.printStackTrace();
>
>            }
>
>
>
>
>
>        }
>
>
>
>        int counter = 0;
>
>
>
>        public void onMessage(Message message) {
>
>            System.out.println("received=" + counter++);
>
>        }
>
>    }
>
>
>
>
>
>    class ActiveMqClient implements Runnable, ExceptionListener {
>
>
>
>        Session session;
>
>        Connection connection;
>
>
>
>        Object o;
>
>
>
>
>
>        public ActiveMqClient(Object o) {
>
>            this.o = o;
>
>
>
>        }
>
>
>
>        public void run() {
>
>
>
>            try {
>
>                String url =
>
>                    "failover:(tcp://" + HOST + ":" + PORT +
> "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>
>
>
>                ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(url);
>
>                // Create a Connection
>
>                connection = connectionFactory.createConnection();
>
>                connection.start();
>
>                connection.setExceptionListener(this);
>
>                // Create a Session
>
>
>
>                session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>
>
>                System.out.println("activeMQ client waiting for msgs");
>
>                synchronized (o) {
>
>                    o.notifyAll();
>
>                }
>
>
>
>
>
>            } catch (Exception e) {
>
>                System.out.println("Caught: " + e);
>
>                e.printStackTrace();
>
>            }
>
>
>
>        }
>
>
>
>        public void close() throws JMSException {
>
>            session.close();
>
>            connection.close();
>
>        }
>
>
>
>        public void subscribe(String destName, MessageListener l)  
> throws
> JMSException {
>
>            char c = destName.contains("?") ? '&' : '?';
>
>            destName = destName + c + "consumer.retroactive=true";
>
>            System.out.println("ActiveMqClient subscribe " + destName);
>
>            MessageConsumer mc =
> session.createConsumer(session.createTopic(destName));
>
>            mc.setMessageListener(l);
>
>        }
>
>
>
>        int messageCounter;
>
>
>
>        public synchronized void onException(JMSException ex) {
>
>            System.out.println("JMS Exception occured.  Shutting down
> client.");
>
>        }
>
>    }
>
>
>
>
>
>    private static void connect() throws JMSException {
>
>        ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(URL);
>
>        connectionFactory.setUseAsyncSend(true);
>
>
>
>        // Create a Connection
>
>        connection = connectionFactory.createConnection();
>
>        connection.start();
>
>
>
>        // Create a Session
>
>        session = connection.createSession(false,  
> Session.AUTO_ACKNOWLEDGE);
>
>
>
>        Destination destination = session.createTopic(topic);
>
>
>
>        // Create a MessageProducer from the Session to the Topic or  
> Queue
>
>        producer = session.createProducer(destination);
>
>        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>
>    }
>
>
>
>    static void disconnect() {
>
>        // Clean up
>
>        try {
>
>            session.close();
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>
>
>        try {
>
>            connection.close();
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>    }
>
>
>
>
>
> }
>



Re: Retroactive consumers loose msgs...

Posted by Rob Davies <ra...@gmail.com>.
How big are your messages ? - they might be reclaimed if you hit a  
memory limit in the broker
On 29 Mar 2008, at 07:22, Andrew M wrote:

> My Retroactive Consumer is only receiving the last 789 out of 5000  
> msgs
> sent.  Any suggestions?
>
> Thanks.
>
>
>
>
>
> import javax.jms.Connection;
>
> import javax.jms.DeliveryMode;
>
> import javax.jms.Destination;
>
> import javax.jms.ExceptionListener;
>
> import javax.jms.JMSException;
>
> import javax.jms.Message;
>
> import javax.jms.MessageConsumer;
>
> import javax.jms.MessageListener;
>
> import javax.jms.MessageProducer;
>
> import javax.jms.Session;
>
> import javax.jms.TextMessage;
>
>
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
>
>
>
>
> public class CommandLinePublisher {
>
>
>
>    static final String HOST = "tupolev";
>
>    static final String PORT = "61616";
>
>    static final String URL = "tcp://"+HOST+":"+PORT;
>
>    static final String MSG = "1"; // 1 character msg
>
>    static final String topic = "test";
>
>    static final int MSGS_TO_SEND = 5000;
>
>
>
>    static MessageProducer producer;
>
>    static Session session;
>
>    static Connection connection;
>
>
>
>    public static void main(String[] args) throws Exception {
>
>        new CommandLinePublisher();
>
>    }
>
>
>
>    public CommandLinePublisher() {
>
>
>
>        try {
>
>            connect();
>
>            TextMessage message;
>
>            message = session.createTextMessage(MSG);
>
>            System.out.println("Sent message: " + message.hashCode()  
> + " : "
> + Thread.currentThread().getName());
>
>
>
>            for (int x = 0; x < MSGS_TO_SEND; x++) {
>
>                System.out.println("sending msg " + x);
>
>                producer.send(message);
>
>            }
>
>
>
>            disconnect();
>
>
>
>            new MonitorApp();
>
>
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>
>
>    }
>
>
>
>
>
>    class MonitorApp implements MessageListener {
>
>
>
>        public MonitorApp() {
>
>
>
>            // connect CLient to ActiveMQ server.
>
>            ActiveMqClient c = new ActiveMqClient(this);
>
>            Thread brokerThread = new Thread(c);
>
>            brokerThread.setDaemon(true);
>
>            brokerThread.start();
>
>
>
>            System.out.println("Waiting for connection to Active MQ
> server...");
>
>            synchronized (this) {
>
>                try {
>
>                    wait();
>
>                } catch (InterruptedException e) {
>
>                    e.printStackTrace();
>
>                }
>
>            }
>
>            System.out.println("...Connected to Active MQ server.");
>
>
>
>
>
>            try {
>
>                c.subscribe("test", this);
>
>            } catch (JMSException e) {
>
>                e.printStackTrace();
>
>            }
>
>
>
>
>
>        }
>
>
>
>        int counter = 0;
>
>
>
>        public void onMessage(Message message) {
>
>            System.out.println("received=" + counter++);
>
>        }
>
>    }
>
>
>
>
>
>    class ActiveMqClient implements Runnable, ExceptionListener {
>
>
>
>        Session session;
>
>        Connection connection;
>
>
>
>        Object o;
>
>
>
>
>
>        public ActiveMqClient(Object o) {
>
>            this.o = o;
>
>
>
>        }
>
>
>
>        public void run() {
>
>
>
>            try {
>
>                String url =
>
>                    "failover:(tcp://" + HOST + ":" + PORT +
> "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>
>
>
>                ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(url);
>
>                // Create a Connection
>
>                connection = connectionFactory.createConnection();
>
>                connection.start();
>
>                connection.setExceptionListener(this);
>
>                // Create a Session
>
>
>
>                session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>
>
>                System.out.println("activeMQ client waiting for msgs");
>
>                synchronized (o) {
>
>                    o.notifyAll();
>
>                }
>
>
>
>
>
>            } catch (Exception e) {
>
>                System.out.println("Caught: " + e);
>
>                e.printStackTrace();
>
>            }
>
>
>
>        }
>
>
>
>        public void close() throws JMSException {
>
>            session.close();
>
>            connection.close();
>
>        }
>
>
>
>        public void subscribe(String destName, MessageListener l)  
> throws
> JMSException {
>
>            char c = destName.contains("?") ? '&' : '?';
>
>            destName = destName + c + "consumer.retroactive=true";
>
>            System.out.println("ActiveMqClient subscribe " + destName);
>
>            MessageConsumer mc =
> session.createConsumer(session.createTopic(destName));
>
>            mc.setMessageListener(l);
>
>        }
>
>
>
>        int messageCounter;
>
>
>
>        public synchronized void onException(JMSException ex) {
>
>            System.out.println("JMS Exception occured.  Shutting down
> client.");
>
>        }
>
>    }
>
>
>
>
>
>    private static void connect() throws JMSException {
>
>        ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(URL);
>
>        connectionFactory.setUseAsyncSend(true);
>
>
>
>        // Create a Connection
>
>        connection = connectionFactory.createConnection();
>
>        connection.start();
>
>
>
>        // Create a Session
>
>        session = connection.createSession(false,  
> Session.AUTO_ACKNOWLEDGE);
>
>
>
>        Destination destination = session.createTopic(topic);
>
>
>
>        // Create a MessageProducer from the Session to the Topic or  
> Queue
>
>        producer = session.createProducer(destination);
>
>        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>
>    }
>
>
>
>    static void disconnect() {
>
>        // Clean up
>
>        try {
>
>            session.close();
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>
>
>        try {
>
>            connection.close();
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>    }
>
>
>
>
>
> }
>