You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by rabidgremlin <ra...@gmail.com> on 2006/09/14 23:34:36 UTC

Publisher does not shutdown

Hi all,

I have a small test app that creates a TopicPublisher that generates 1000
messages and then shuts down.

However when I run the app it does not stop. It appears that the
TopicPublisher keeps a thread alive which prevents the app from stopping...

>From the logs of my app you can see the keep alive messages getting sent
even after the publisher has been stooped and the session closed:

1375 [main] DEBUG org.apache.activemq.ActiveMQSession  - Sending message:
ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId =
ID:isis-ws-007-4500-1158269259517-1:0:1:1:1000, originalDestination = null,
originalTransactionId = null, producerId =
ID:isis-ws-007-4500-1158269259517-1:0:1:1, destination = topic://testtopic,
transactionId = null, expiration = 0, timestamp = 1158269260720, arrival =
0, correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
null, dataStructure = null, redeliveryCounter = 0, size = 0, properties =
null, readOnlyProperties = true, readOnlyBody = true, text = Msg:999}
1375 [main] INFO TestPub  - Stopping...
1375 [main] INFO TestPub  - Stopped.
1375 [main] INFO TestPub  - Done.
15560 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor  - Message sent since last
write check, resetting flag
30557 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor  - No message sent since
last write check, sending a KeepAliveInfo
30557 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor  - Message received since
last read check, resetting flag:
45555 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor  - No message sent since
last write check, sending a KeepAliveInfo

The code of the app:

import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;

/**
 * Test topic subscriber.
 *
 * @author ackerj
 * @version $Id$
 */
public class TestPub implements ExceptionListener
{
  private static Logger log = Logger.getLogger(TestPub.class);

  ActiveMQConnectionFactory topicConnectionFactory = null;

  TopicConnection topicConnection = null;

  TopicSession topicSession = null;

  Topic topic = null;

  TopicPublisher topicPublisher = null;

  String brokerUri;

  String topicName;

  private static int MSG_TO_SEND = 1000;


  public void onException(JMSException e)
  {
    log.error("OnException event fired !", e);
    System.exit(1);
  }

  public TestPub(String brokerUri, String topicName)
    throws Exception
  {
    this.brokerUri = brokerUri;
    this.topicName = topicName;
  }

  public void go()
    throws Exception
  {
    log.info("Connecting.....");
    topicConnectionFactory = new ActiveMQConnectionFactory();
    topicConnectionFactory.setBrokerURL(brokerUri);

    topicConnection = topicConnectionFactory.createTopicConnection();
    topicConnection.setExceptionListener(this);
    topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);

    topic = topicSession.createTopic(topicName);

    topicPublisher = topicSession.createPublisher(topic);

    topicConnection.start();
    log.info("Connected.");

    log.info("Sending " + MSG_TO_SEND + " messages..");

    for (int loop=0; loop < MSG_TO_SEND; loop++)
    {
      TextMessage msg = topicSession.createTextMessage();
      msg.setText("Msg:" + loop);

      topicPublisher.send(msg);
    }

    log.info("Stopping...");
    topicPublisher.close();
    topicSession.close();
    topicConnection.stop();
    log.info("Stopped.");
  }

  public static void main(String[] args)
  {
    BasicConfigurator.configure();

    try
    {
      System.out.println("Test ActiveMQ Publisher");

      if (args.length != 1 && args.length != 2)
      {
        System.out.println("Usage: TestPub [brokerUri] [topicname]");
        System.out.println("eg: ");
        System.out.println("   TestPub tcp://127.0.0.1:12345 testtopic");
        System.exit(1);
      }

      String brokerUri = new String(args[0]);
      log.info("BrokerUri: " + brokerUri);

      String topicName = new String(args[1]);
      log.info("Topic name is " + topicName);

      TestPub pub = new TestPub(brokerUri, topicName);

      pub.go();

      log.info("Done.");
    }
    catch (Exception e)
    {
      e.printStackTrace();
      System.exit(1);
    }

  }

}