You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/05/13 08:48:43 UTC
svn commit: r774236 - in
/servicemix/components/bindings/servicemix-jms/trunk/src:
main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Author: ccustine
Date: Wed May 13 06:48:43 2009
New Revision: 774236
URL: http://svn.apache.org/viewvc?rev=774236&view=rev
Log:
SMXCOMP-507 - smx-jms in-out provider w/unspecified replyTo queue and Pooled/SingleConnectionFactory leaks one temp replyTo queue per message
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=774236&r1=774235&r2=774236&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Wed May 13 06:48:43 2009
@@ -16,9 +16,6 @@
*/
package org.apache.servicemix.jms.endpoints;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.Fault;
@@ -41,7 +38,9 @@
import javax.jms.TopicSession;
import javax.jms.QueueSession;
import javax.jms.MessageProducer;
+import javax.jms.TemporaryTopic;
import javax.jms.Topic;
+import javax.jms.TemporaryQueue;
import javax.jms.Queue;
import javax.jms.MessageConsumer;
import javax.jms.QueueBrowser;
@@ -624,6 +623,8 @@
// to indicate we will use the listener container
boolean asynchronous = false;
boolean useSelector = true;
+ // Indicate whether the replyTo destination is temporary or explicitely specified replyTo destination
+ boolean isReplyDestTemporary = false;
Destination replyDest = chooseDestination(exchange, in, session, replyDestinationChooser, null);
if (replyDest == null) {
useSelector = false;
@@ -637,6 +638,7 @@
} else {
replyDest = session.createTemporaryQueue();
}
+ isReplyDestTemporary = true;
}
}
// Create message and send it
@@ -697,6 +699,15 @@
} else {
send(exchange);
}
+
+ // delete temporary queue/topic immediately to avoid accumulation in case that the connection is never destroyed
+ if (isReplyDestTemporary) {
+ if (isPubSubDomain()) {
+ ((TemporaryTopic)replyDest).delete();
+ } else {
+ ((TemporaryQueue)replyDest).delete();
+ }
+ }
}
}
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=774236&r1=774235&r2=774236&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Wed May 13 06:48:43 2009
@@ -32,6 +32,7 @@
import javax.xml.transform.Source;
import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.broker.region.RegionBroker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.components.util.EchoComponent;
@@ -68,19 +69,26 @@
DataHandler dh = null;
// Test successful return
- inout = client.createInOutExchange();
- inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
- inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
- dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
- inout.getInMessage().addAttachment("myImage", dh);
- result = client.sendSync(inout);
- assertTrue(result);
- NormalizedMessage out = inout.getOutMessage();
- assertNotNull(out);
- Source src = out.getContent();
- assertNotNull(src);
- dh = out.getAttachment("myImage");
- assertNotNull(dh);
+ Source src = null;
+ for (int i = 0; i < 2; i++) {
+ inout = client.createInOutExchange();
+ inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+ inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
+ inout.getInMessage().addAttachment("myImage", dh);
+ result = client.sendSync(inout);
+ assertTrue(result);
+ NormalizedMessage out = inout.getOutMessage();
+ assertNotNull(out);
+ src = out.getContent();
+ assertNotNull(src);
+ dh = out.getAttachment("myImage");
+ assertNotNull(dh);
+ }
+
+ // Ensure that only one temporary replyTo queue was created for multiple messages sent
+//
+ assertEquals(0, countBrokerTemporaryQueues());
logger.info(new SourceTransformer().toString(src));
@@ -283,4 +291,8 @@
endpoint.setDestinationName("destination");
return endpoint;
}
+
+ private int countBrokerTemporaryQueues() throws Exception {
+ return ((RegionBroker) broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
+ }
}