You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by mpont <mp...@gmail.com> on 2007/06/20 19:22:23 UTC

Memory Leak Using Temporary Queues

Hello All,

I am grappling with what appears to be memory leaks while creating and
destroying Temporary Queues.

Should an application that is consuming X bytes of memory be able to create
new Temporary Queues that consume Z additional bytes (total application
memory consumption is now X+Z bytes) and then destroy/close the Temporary
Queues to return the application memory consumption back to X bytes?

It seems like Temporary Queues should behave this way, but they don't seem
to.

Here is a simple test rig that exhibits the apparent memory leak.  On my
machine it produces an OutOfMemoryError in two to three minutes (ActiveMQ
4.1.1/Sun 1.6 JVM/WinXP).  Am I doing something wrong?

public class TestActiveMQ {
  public static void main(String[] args) throws Exception {
    BrokerService brokerService = new BrokerService();
    brokerService.setUseJmx(false);
    brokerService.setPersistent(false);
    brokerService.addConnector(ActiveMQConnection.DEFAULT_BROKER_URL);
    brokerService.start();
    
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
          ActiveMQConnection.DEFAULT_BROKER_URL);
    
    Connection producerConnection = factory.createConnection();
    Session producerSession = producerConnection.createSession(false,
        Session.AUTO_ACKNOWLEDGE);
    MessageProducer producer = producerSession.createProducer(null);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    producerConnection.start();
    
    for (int j = 0; j < 1000; j++) {
      ArrayList<MyConsumerInfo> consumers = new ArrayList<MyConsumerInfo>();
      
      // Create 50 consumers
      for (int i = 0; i < 50; i++) {
        System.out.println(">>> Creating Consumer");
        
        Connection connection = factory.createConnection();
        Session consumerSession = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);
        TemporaryQueue consumerQueue =
consumerSession.createTemporaryQueue();
        MessageConsumer consumer =
            consumerSession.createConsumer(consumerQueue);
        
        MyMessageReceiver receiver = new MyMessageReceiver();
        consumer.setMessageListener(receiver);
        
        MyConsumerInfo consumerInfo = new MyConsumerInfo();
        consumerInfo.receiver = receiver;
        consumerInfo.connection = connection;
        consumerInfo.session = consumerSession;
        consumerInfo.queue = consumerQueue;
        consumerInfo.consumer = consumer;
        
        connection.start();
        
        consumers.add(consumerInfo);
      }
      
      // Send message to each consumer created above
      for (MyConsumerInfo consumerInfo : consumers) {
        System.out.println("--> Sending Message");
        
        TextMessage message = producerSession.createTextMessage("Hello");
        producer.send(consumerInfo.queue, message);
        
        while (!consumerInfo.receiver.done)
          Thread.sleep(500);
      }
      
      Thread.sleep(5000);
      
      // Destroy all consumers
      for (MyConsumerInfo consumerInfo : consumers) {
        System.out.println("< Closing Consumer");
        
        consumerInfo.consumer.close();
        consumerInfo.session.close();
        consumerInfo.connection.stop();
        consumerInfo.connection.close();
      }
      
      consumers.clear();
      consumers = null;
      
      System.gc();
      
      Thread.sleep(5000);
    }
    
    producer.close();
    producerSession.close();
    producerConnection.stop();
    producerConnection.close();
    
    Thread.sleep(60 * 60 * 1000);
  }

  private static class MyConsumerInfo {
    public MyMessageReceiver receiver;
    public Session session;
    public Connection connection;
    public Queue queue;
    public MessageConsumer consumer;
    public ResponseMessageMonitor monitor;
  }
  
  private static class MyMessageReceiver implements MessageListener {
    public boolean done;
    public void onMessage(Message message) {
      done = true;
    }
  }
}

Things I have tried:
  * One Connection for all Consumers.  This made things worse because
     apparently Temporary Queue resources are tied to their parent
Connection.
  * One Factory per Connection
  * One Producer per Consumer
  * No Producers or Messages
  * No Message Prefetch
  * Adjusting the Heap size up and down
  * Adjusting the MemoryManager limit
  * Adjusting the Connection inactivity timeouts
  * Persistence with Derby - made things worse
  * Explicitly deleting the Temporary Queues via ActiveMQTempQueue.delete
and
     ActiveMQConnection.deleteTempDestination

Any ideas are greatly appreciated,
/Matt

-- 
View this message in context: http://www.nabble.com/Memory-Leak-Using-Temporary-Queues-tf3953839s2354.html#a11218217
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Memory Leak Using Temporary Queues

Posted by James Strachan <ja...@gmail.com>.
Running a profiler should help decifer where the memory leak is. I'm
wondering if its the advisory issue ...

http://issues.apache.org/activemq/browse/AMQ-1255



On 6/20/07, mpont <mp...@gmail.com> wrote:
>
> Hello All,
>
> I am grappling with what appears to be memory leaks while creating and
> destroying Temporary Queues.
>
> Should an application that is consuming X bytes of memory be able to create
> new Temporary Queues that consume Z additional bytes (total application
> memory consumption is now X+Z bytes) and then destroy/close the Temporary
> Queues to return the application memory consumption back to X bytes?
>
> It seems like Temporary Queues should behave this way, but they don't seem
> to.
>
> Here is a simple test rig that exhibits the apparent memory leak.  On my
> machine it produces an OutOfMemoryError in two to three minutes (ActiveMQ
> 4.1.1/Sun 1.6 JVM/WinXP).  Am I doing something wrong?
>
> public class TestActiveMQ {
>   public static void main(String[] args) throws Exception {
>     BrokerService brokerService = new BrokerService();
>     brokerService.setUseJmx(false);
>     brokerService.setPersistent(false);
>     brokerService.addConnector(ActiveMQConnection.DEFAULT_BROKER_URL);
>     brokerService.start();
>
>     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
>         ActiveMQConnection.DEFAULT_USER,
> ActiveMQConnection.DEFAULT_PASSWORD,
>           ActiveMQConnection.DEFAULT_BROKER_URL);
>
>     Connection producerConnection = factory.createConnection();
>     Session producerSession = producerConnection.createSession(false,
>         Session.AUTO_ACKNOWLEDGE);
>     MessageProducer producer = producerSession.createProducer(null);
>     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>     producerConnection.start();
>
>     for (int j = 0; j < 1000; j++) {
>       ArrayList<MyConsumerInfo> consumers = new ArrayList<MyConsumerInfo>();
>
>       // Create 50 consumers
>       for (int i = 0; i < 50; i++) {
>         System.out.println(">>> Creating Consumer");
>
>         Connection connection = factory.createConnection();
>         Session consumerSession = connection.createSession(false,
>             Session.AUTO_ACKNOWLEDGE);
>         TemporaryQueue consumerQueue =
> consumerSession.createTemporaryQueue();
>         MessageConsumer consumer =
>             consumerSession.createConsumer(consumerQueue);
>
>         MyMessageReceiver receiver = new MyMessageReceiver();
>         consumer.setMessageListener(receiver);
>
>         MyConsumerInfo consumerInfo = new MyConsumerInfo();
>         consumerInfo.receiver = receiver;
>         consumerInfo.connection = connection;
>         consumerInfo.session = consumerSession;
>         consumerInfo.queue = consumerQueue;
>         consumerInfo.consumer = consumer;
>
>         connection.start();
>
>         consumers.add(consumerInfo);
>       }
>
>       // Send message to each consumer created above
>       for (MyConsumerInfo consumerInfo : consumers) {
>         System.out.println("--> Sending Message");
>
>         TextMessage message = producerSession.createTextMessage("Hello");
>         producer.send(consumerInfo.queue, message);
>
>         while (!consumerInfo.receiver.done)
>           Thread.sleep(500);
>       }
>
>       Thread.sleep(5000);
>
>       // Destroy all consumers
>       for (MyConsumerInfo consumerInfo : consumers) {
>         System.out.println("< Closing Consumer");
>
>         consumerInfo.consumer.close();
>         consumerInfo.session.close();
>         consumerInfo.connection.stop();
>         consumerInfo.connection.close();
>       }
>
>       consumers.clear();
>       consumers = null;
>
>       System.gc();
>
>       Thread.sleep(5000);
>     }
>
>     producer.close();
>     producerSession.close();
>     producerConnection.stop();
>     producerConnection.close();
>
>     Thread.sleep(60 * 60 * 1000);
>   }
>
>   private static class MyConsumerInfo {
>     public MyMessageReceiver receiver;
>     public Session session;
>     public Connection connection;
>     public Queue queue;
>     public MessageConsumer consumer;
>     public ResponseMessageMonitor monitor;
>   }
>
>   private static class MyMessageReceiver implements MessageListener {
>     public boolean done;
>     public void onMessage(Message message) {
>       done = true;
>     }
>   }
> }
>
> Things I have tried:
>   * One Connection for all Consumers.  This made things worse because
>      apparently Temporary Queue resources are tied to their parent
> Connection.
>   * One Factory per Connection
>   * One Producer per Consumer
>   * No Producers or Messages
>   * No Message Prefetch
>   * Adjusting the Heap size up and down
>   * Adjusting the MemoryManager limit
>   * Adjusting the Connection inactivity timeouts
>   * Persistence with Derby - made things worse
>   * Explicitly deleting the Temporary Queues via ActiveMQTempQueue.delete
> and
>      ActiveMQConnection.deleteTempDestination
>
> Any ideas are greatly appreciated,
> /Matt
>
> --
> View this message in context: http://www.nabble.com/Memory-Leak-Using-Temporary-Queues-tf3953839s2354.html#a11218217
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 
James
-------
http://macstrac.blogspot.com/