You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by rabidgremlin <ra...@gmail.com> on 2006/09/14 23:34:36 UTC
Publisher does not shutdown
Hi all,
I have a small test app that creates a TopicPublisher that generates 1000
messages and then shuts down.
However when I run the app it does not stop. It appears that the
TopicPublisher keeps a thread alive which prevents the app from stopping...
>From the logs of my app you can see the keep alive messages getting sent
even after the publisher has been stooped and the session closed:
1375 [main] DEBUG org.apache.activemq.ActiveMQSession - Sending message:
ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId =
ID:isis-ws-007-4500-1158269259517-1:0:1:1:1000, originalDestination = null,
originalTransactionId = null, producerId =
ID:isis-ws-007-4500-1158269259517-1:0:1:1, destination = topic://testtopic,
transactionId = null, expiration = 0, timestamp = 1158269260720, arrival =
0, correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
null, dataStructure = null, redeliveryCounter = 0, size = 0, properties =
null, readOnlyProperties = true, readOnlyBody = true, text = Msg:999}
1375 [main] INFO TestPub - Stopping...
1375 [main] INFO TestPub - Stopped.
1375 [main] INFO TestPub - Done.
15560 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor - Message sent since last
write check, resetting flag
30557 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor - No message sent since
last write check, sending a KeepAliveInfo
30557 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor - Message received since
last read check, resetting flag:
45555 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor - No message sent since
last write check, sending a KeepAliveInfo
The code of the app:
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
/**
* Test topic subscriber.
*
* @author ackerj
* @version $Id$
*/
public class TestPub implements ExceptionListener
{
private static Logger log = Logger.getLogger(TestPub.class);
ActiveMQConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
String brokerUri;
String topicName;
private static int MSG_TO_SEND = 1000;
public void onException(JMSException e)
{
log.error("OnException event fired !", e);
System.exit(1);
}
public TestPub(String brokerUri, String topicName)
throws Exception
{
this.brokerUri = brokerUri;
this.topicName = topicName;
}
public void go()
throws Exception
{
log.info("Connecting.....");
topicConnectionFactory = new ActiveMQConnectionFactory();
topicConnectionFactory.setBrokerURL(brokerUri);
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.setExceptionListener(this);
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic(topicName);
topicPublisher = topicSession.createPublisher(topic);
topicConnection.start();
log.info("Connected.");
log.info("Sending " + MSG_TO_SEND + " messages..");
for (int loop=0; loop < MSG_TO_SEND; loop++)
{
TextMessage msg = topicSession.createTextMessage();
msg.setText("Msg:" + loop);
topicPublisher.send(msg);
}
log.info("Stopping...");
topicPublisher.close();
topicSession.close();
topicConnection.stop();
log.info("Stopped.");
}
public static void main(String[] args)
{
BasicConfigurator.configure();
try
{
System.out.println("Test ActiveMQ Publisher");
if (args.length != 1 && args.length != 2)
{
System.out.println("Usage: TestPub [brokerUri] [topicname]");
System.out.println("eg: ");
System.out.println(" TestPub tcp://127.0.0.1:12345 testtopic");
System.exit(1);
}
String brokerUri = new String(args[0]);
log.info("BrokerUri: " + brokerUri);
String topicName = new String(args[1]);
log.info("Topic name is " + topicName);
TestPub pub = new TestPub(brokerUri, topicName);
pub.go();
log.info("Done.");
}
catch (Exception e)
{
e.printStackTrace();
System.exit(1);
}
}
}