You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Andrew M <an...@oc384.net> on 2008/03/29 08:22:22 UTC
Retroactive consumers loose msgs...
My Retroactive Consumer is only receiving the last 789 out of 5000 msgs
sent. Any suggestions?
Thanks.
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class CommandLinePublisher {
static final String HOST = "tupolev";
static final String PORT = "61616";
static final String URL = "tcp://"+HOST+":"+PORT;
static final String MSG = "1"; // 1 character msg
static final String topic = "test";
static final int MSGS_TO_SEND = 5000;
static MessageProducer producer;
static Session session;
static Connection connection;
public static void main(String[] args) throws Exception {
new CommandLinePublisher();
}
public CommandLinePublisher() {
try {
connect();
TextMessage message;
message = session.createTextMessage(MSG);
System.out.println("Sent message: " + message.hashCode() + " : "
+ Thread.currentThread().getName());
for (int x = 0; x < MSGS_TO_SEND; x++) {
System.out.println("sending msg " + x);
producer.send(message);
}
disconnect();
new MonitorApp();
} catch (JMSException e) {
e.printStackTrace();
}
}
class MonitorApp implements MessageListener {
public MonitorApp() {
// connect CLient to ActiveMQ server.
ActiveMqClient c = new ActiveMqClient(this);
Thread brokerThread = new Thread(c);
brokerThread.setDaemon(true);
brokerThread.start();
System.out.println("Waiting for connection to Active MQ
server...");
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("...Connected to Active MQ server.");
try {
c.subscribe("test", this);
} catch (JMSException e) {
e.printStackTrace();
}
}
int counter = 0;
public void onMessage(Message message) {
System.out.println("received=" + counter++);
}
}
class ActiveMqClient implements Runnable, ExceptionListener {
Session session;
Connection connection;
Object o;
public ActiveMqClient(Object o) {
this.o = o;
}
public void run() {
try {
String url =
"failover:(tcp://" + HOST + ":" + PORT +
"?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
System.out.println("activeMQ client waiting for msgs");
synchronized (o) {
o.notifyAll();
}
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void close() throws JMSException {
session.close();
connection.close();
}
public void subscribe(String destName, MessageListener l) throws
JMSException {
char c = destName.contains("?") ? '&' : '?';
destName = destName + c + "consumer.retroactive=true";
System.out.println("ActiveMqClient subscribe " + destName);
MessageConsumer mc =
session.createConsumer(session.createTopic(destName));
mc.setMessageListener(l);
}
int messageCounter;
public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down
client.");
}
}
private static void connect() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(URL);
connectionFactory.setUseAsyncSend(true);
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic);
// Create a MessageProducer from the Session to the Topic or Queue
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
static void disconnect() {
// Clean up
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
RE: Retroactive consumers loose msgs...
Posted by Andrew M <an...@oc384.net>.
In the code below I'm sending 5000 of these:
static final String MSG = "1"; // 1 character msg
so it shouldn't be a memory issue. The java file of the code below is here:
http://66.17.204.68:8765/~andrew/CommandLinePublisher.java
Thanks,
Andrew
-----Original Message-----
From: Rob Davies [mailto:rajdavies@gmail.com]
Sent: Monday, March 31, 2008 2:58 AM
To: users@activemq.apache.org
Subject: Re: Retroactive consumers loose msgs...
How big are your messages ? - they might be reclaimed if you hit a
memory limit in the broker
On 29 Mar 2008, at 07:22, Andrew M wrote:
> My Retroactive Consumer is only receiving the last 789 out of 5000
> msgs
> sent. Any suggestions?
>
> Thanks.
>
>
>
>
>
> import javax.jms.Connection;
>
> import javax.jms.DeliveryMode;
>
> import javax.jms.Destination;
>
> import javax.jms.ExceptionListener;
>
> import javax.jms.JMSException;
>
> import javax.jms.Message;
>
> import javax.jms.MessageConsumer;
>
> import javax.jms.MessageListener;
>
> import javax.jms.MessageProducer;
>
> import javax.jms.Session;
>
> import javax.jms.TextMessage;
>
>
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
>
>
>
>
> public class CommandLinePublisher {
>
>
>
> static final String HOST = "tupolev";
>
> static final String PORT = "61616";
>
> static final String URL = "tcp://"+HOST+":"+PORT;
>
> static final String MSG = "1"; // 1 character msg
>
> static final String topic = "test";
>
> static final int MSGS_TO_SEND = 5000;
>
>
>
> static MessageProducer producer;
>
> static Session session;
>
> static Connection connection;
>
>
>
> public static void main(String[] args) throws Exception {
>
> new CommandLinePublisher();
>
> }
>
>
>
> public CommandLinePublisher() {
>
>
>
> try {
>
> connect();
>
> TextMessage message;
>
> message = session.createTextMessage(MSG);
>
> System.out.println("Sent message: " + message.hashCode()
> + " : "
> + Thread.currentThread().getName());
>
>
>
> for (int x = 0; x < MSGS_TO_SEND; x++) {
>
> System.out.println("sending msg " + x);
>
> producer.send(message);
>
> }
>
>
>
> disconnect();
>
>
>
> new MonitorApp();
>
>
>
> } catch (JMSException e) {
>
> e.printStackTrace();
>
> }
>
>
>
> }
>
>
>
>
>
> class MonitorApp implements MessageListener {
>
>
>
> public MonitorApp() {
>
>
>
> // connect CLient to ActiveMQ server.
>
> ActiveMqClient c = new ActiveMqClient(this);
>
> Thread brokerThread = new Thread(c);
>
> brokerThread.setDaemon(true);
>
> brokerThread.start();
>
>
>
> System.out.println("Waiting for connection to Active MQ
> server...");
>
> synchronized (this) {
>
> try {
>
> wait();
>
> } catch (InterruptedException e) {
>
> e.printStackTrace();
>
> }
>
> }
>
> System.out.println("...Connected to Active MQ server.");
>
>
>
>
>
> try {
>
> c.subscribe("test", this);
>
> } catch (JMSException e) {
>
> e.printStackTrace();
>
> }
>
>
>
>
>
> }
>
>
>
> int counter = 0;
>
>
>
> public void onMessage(Message message) {
>
> System.out.println("received=" + counter++);
>
> }
>
> }
>
>
>
>
>
> class ActiveMqClient implements Runnable, ExceptionListener {
>
>
>
> Session session;
>
> Connection connection;
>
>
>
> Object o;
>
>
>
>
>
> public ActiveMqClient(Object o) {
>
> this.o = o;
>
>
>
> }
>
>
>
> public void run() {
>
>
>
> try {
>
> String url =
>
> "failover:(tcp://" + HOST + ":" + PORT +
> "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>
>
>
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(url);
>
> // Create a Connection
>
> connection = connectionFactory.createConnection();
>
> connection.start();
>
> connection.setExceptionListener(this);
>
> // Create a Session
>
>
>
> session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>
>
> System.out.println("activeMQ client waiting for msgs");
>
> synchronized (o) {
>
> o.notifyAll();
>
> }
>
>
>
>
>
> } catch (Exception e) {
>
> System.out.println("Caught: " + e);
>
> e.printStackTrace();
>
> }
>
>
>
> }
>
>
>
> public void close() throws JMSException {
>
> session.close();
>
> connection.close();
>
> }
>
>
>
> public void subscribe(String destName, MessageListener l)
> throws
> JMSException {
>
> char c = destName.contains("?") ? '&' : '?';
>
> destName = destName + c + "consumer.retroactive=true";
>
> System.out.println("ActiveMqClient subscribe " + destName);
>
> MessageConsumer mc =
> session.createConsumer(session.createTopic(destName));
>
> mc.setMessageListener(l);
>
> }
>
>
>
> int messageCounter;
>
>
>
> public synchronized void onException(JMSException ex) {
>
> System.out.println("JMS Exception occured. Shutting down
> client.");
>
> }
>
> }
>
>
>
>
>
> private static void connect() throws JMSException {
>
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(URL);
>
> connectionFactory.setUseAsyncSend(true);
>
>
>
> // Create a Connection
>
> connection = connectionFactory.createConnection();
>
> connection.start();
>
>
>
> // Create a Session
>
> session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>
>
> Destination destination = session.createTopic(topic);
>
>
>
> // Create a MessageProducer from the Session to the Topic or
> Queue
>
> producer = session.createProducer(destination);
>
> producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>
> }
>
>
>
> static void disconnect() {
>
> // Clean up
>
> try {
>
> session.close();
>
> } catch (JMSException e) {
>
> e.printStackTrace();
>
> }
>
>
>
> try {
>
> connection.close();
>
> } catch (JMSException e) {
>
> e.printStackTrace();
>
> }
>
> }
>
>
>
>
>
> }
>
Re: Retroactive consumers loose msgs...
Posted by Rob Davies <ra...@gmail.com>.
How big are your messages ? - they might be reclaimed if you hit a
memory limit in the broker
On 29 Mar 2008, at 07:22, Andrew M wrote:
> My Retroactive Consumer is only receiving the last 789 out of 5000
> msgs
> sent. Any suggestions?
>
> Thanks.
>
>
>
>
>
> import javax.jms.Connection;
>
> import javax.jms.DeliveryMode;
>
> import javax.jms.Destination;
>
> import javax.jms.ExceptionListener;
>
> import javax.jms.JMSException;
>
> import javax.jms.Message;
>
> import javax.jms.MessageConsumer;
>
> import javax.jms.MessageListener;
>
> import javax.jms.MessageProducer;
>
> import javax.jms.Session;
>
> import javax.jms.TextMessage;
>
>
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
>
>
>
>
> public class CommandLinePublisher {
>
>
>
> static final String HOST = "tupolev";
>
> static final String PORT = "61616";
>
> static final String URL = "tcp://"+HOST+":"+PORT;
>
> static final String MSG = "1"; // 1 character msg
>
> static final String topic = "test";
>
> static final int MSGS_TO_SEND = 5000;
>
>
>
> static MessageProducer producer;
>
> static Session session;
>
> static Connection connection;
>
>
>
> public static void main(String[] args) throws Exception {
>
> new CommandLinePublisher();
>
> }
>
>
>
> public CommandLinePublisher() {
>
>
>
> try {
>
> connect();
>
> TextMessage message;
>
> message = session.createTextMessage(MSG);
>
> System.out.println("Sent message: " + message.hashCode()
> + " : "
> + Thread.currentThread().getName());
>
>
>
> for (int x = 0; x < MSGS_TO_SEND; x++) {
>
> System.out.println("sending msg " + x);
>
> producer.send(message);
>
> }
>
>
>
> disconnect();
>
>
>
> new MonitorApp();
>
>
>
> } catch (JMSException e) {
>
> e.printStackTrace();
>
> }
>
>
>
> }
>
>
>
>
>
> class MonitorApp implements MessageListener {
>
>
>
> public MonitorApp() {
>
>
>
> // connect CLient to ActiveMQ server.
>
> ActiveMqClient c = new ActiveMqClient(this);
>
> Thread brokerThread = new Thread(c);
>
> brokerThread.setDaemon(true);
>
> brokerThread.start();
>
>
>
> System.out.println("Waiting for connection to Active MQ
> server...");
>
> synchronized (this) {
>
> try {
>
> wait();
>
> } catch (InterruptedException e) {
>
> e.printStackTrace();
>
> }
>
> }
>
> System.out.println("...Connected to Active MQ server.");
>
>
>
>
>
> try {
>
> c.subscribe("test", this);
>
> } catch (JMSException e) {
>
> e.printStackTrace();
>
> }
>
>
>
>
>
> }
>
>
>
> int counter = 0;
>
>
>
> public void onMessage(Message message) {
>
> System.out.println("received=" + counter++);
>
> }
>
> }
>
>
>
>
>
> class ActiveMqClient implements Runnable, ExceptionListener {
>
>
>
> Session session;
>
> Connection connection;
>
>
>
> Object o;
>
>
>
>
>
> public ActiveMqClient(Object o) {
>
> this.o = o;
>
>
>
> }
>
>
>
> public void run() {
>
>
>
> try {
>
> String url =
>
> "failover:(tcp://" + HOST + ":" + PORT +
> "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>
>
>
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(url);
>
> // Create a Connection
>
> connection = connectionFactory.createConnection();
>
> connection.start();
>
> connection.setExceptionListener(this);
>
> // Create a Session
>
>
>
> session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>
>
> System.out.println("activeMQ client waiting for msgs");
>
> synchronized (o) {
>
> o.notifyAll();
>
> }
>
>
>
>
>
> } catch (Exception e) {
>
> System.out.println("Caught: " + e);
>
> e.printStackTrace();
>
> }
>
>
>
> }
>
>
>
> public void close() throws JMSException {
>
> session.close();
>
> connection.close();
>
> }
>
>
>
> public void subscribe(String destName, MessageListener l)
> throws
> JMSException {
>
> char c = destName.contains("?") ? '&' : '?';
>
> destName = destName + c + "consumer.retroactive=true";
>
> System.out.println("ActiveMqClient subscribe " + destName);
>
> MessageConsumer mc =
> session.createConsumer(session.createTopic(destName));
>
> mc.setMessageListener(l);
>
> }
>
>
>
> int messageCounter;
>
>
>
> public synchronized void onException(JMSException ex) {
>
> System.out.println("JMS Exception occured. Shutting down
> client.");
>
> }
>
> }
>
>
>
>
>
> private static void connect() throws JMSException {
>
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(URL);
>
> connectionFactory.setUseAsyncSend(true);
>
>
>
> // Create a Connection
>
> connection = connectionFactory.createConnection();
>
> connection.start();
>
>
>
> // Create a Session
>
> session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>
>
> Destination destination = session.createTopic(topic);
>
>
>
> // Create a MessageProducer from the Session to the Topic or
> Queue
>
> producer = session.createProducer(destination);
>
> producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>
> }
>
>
>
> static void disconnect() {
>
> // Clean up
>
> try {
>
> session.close();
>
> } catch (JMSException e) {
>
> e.printStackTrace();
>
> }
>
>
>
> try {
>
> connection.close();
>
> } catch (JMSException e) {
>
> e.printStackTrace();
>
> }
>
> }
>
>
>
>
>
> }
>