You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/05/13 08:48:43 UTC

svn commit: r774236 - in /servicemix/components/bindings/servicemix-jms/trunk/src: main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java

Author: ccustine
Date: Wed May 13 06:48:43 2009
New Revision: 774236

URL: http://svn.apache.org/viewvc?rev=774236&view=rev
Log:
SMXCOMP-507 - smx-jms in-out provider w/unspecified replyTo queue and Pooled/SingleConnectionFactory leaks one temp replyTo queue per message

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=774236&r1=774235&r2=774236&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Wed May 13 06:48:43 2009
@@ -16,9 +16,6 @@
  */
 package org.apache.servicemix.jms.endpoints;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.Fault;
@@ -41,7 +38,9 @@
 import javax.jms.TopicSession;
 import javax.jms.QueueSession;
 import javax.jms.MessageProducer;
+import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
+import javax.jms.TemporaryQueue;
 import javax.jms.Queue;
 import javax.jms.MessageConsumer;
 import javax.jms.QueueBrowser;
@@ -624,6 +623,8 @@
         // to indicate we will use the listener container
         boolean asynchronous = false;
         boolean useSelector = true;
+        // Indicate whether the replyTo destination is temporary or explicitely specified replyTo destination
+        boolean isReplyDestTemporary = false;
         Destination replyDest = chooseDestination(exchange, in, session, replyDestinationChooser, null);
         if (replyDest == null) {
             useSelector = false;
@@ -637,6 +638,7 @@
                 } else {
                     replyDest = session.createTemporaryQueue();
                 }
+                isReplyDestTemporary = true;
             }
         }
         // Create message and send it
@@ -697,6 +699,15 @@
             } else {
                 send(exchange);
             }
+
+            // delete temporary queue/topic immediately to avoid accumulation in case that the connection is never destroyed
+            if (isReplyDestTemporary) {
+                if (isPubSubDomain()) {
+                    ((TemporaryTopic)replyDest).delete();
+                } else {
+                    ((TemporaryQueue)replyDest).delete();
+                }
+            }
         }
     }
 

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=774236&r1=774235&r2=774236&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Wed May 13 06:48:43 2009
@@ -32,6 +32,7 @@
 import javax.xml.transform.Source;
 
 import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.components.util.EchoComponent;
@@ -68,19 +69,26 @@
         DataHandler dh = null;
 
         // Test successful return
-        inout = client.createInOutExchange();
-        inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
-        inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
-        dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
-        inout.getInMessage().addAttachment("myImage", dh);
-        result = client.sendSync(inout);
-        assertTrue(result);
-        NormalizedMessage out = inout.getOutMessage();
-        assertNotNull(out);
-        Source src = out.getContent();
-        assertNotNull(src);
-        dh = out.getAttachment("myImage");
-        assertNotNull(dh);
+        Source src = null;
+        for (int i = 0; i < 2; i++) {
+            inout = client.createInOutExchange();
+            inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+            inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+            dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
+            inout.getInMessage().addAttachment("myImage", dh);
+            result = client.sendSync(inout);
+            assertTrue(result);
+            NormalizedMessage out = inout.getOutMessage();
+            assertNotNull(out);
+            src = out.getContent();
+            assertNotNull(src);
+            dh = out.getAttachment("myImage");
+            assertNotNull(dh);
+        }
+
+        // Ensure that only one temporary replyTo queue was created for multiple messages sent
+//
+        assertEquals(0, countBrokerTemporaryQueues());
 
         logger.info(new SourceTransformer().toString(src));
 
@@ -283,4 +291,8 @@
         endpoint.setDestinationName("destination");
         return endpoint;
     }
+
+    private int countBrokerTemporaryQueues() throws Exception {
+        return ((RegionBroker) broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
+    }
 }