You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2011/01/06 12:45:50 UTC

svn commit: r1055837 - /cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java

Author: cschneider
Date: Thu Jan  6 11:45:50 2011
New Revision: 1055837

URL: http://svn.apache.org/viewvc?rev=1055837&view=rev
Log:
CXF-3230 delete jms temp queue after request

Modified:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=1055837&r1=1055836&r2=1055837&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Thu Jan  6 11:45:50 2011
@@ -34,6 +34,7 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.buslifecycle.BusLifeCycleListener;
@@ -155,12 +156,7 @@ public class JMSConduit extends Abstract
             throw new ConfigurationException(msg);
         }
         
-        JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
-            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-        if (headers == null) {
-            headers = new JMSMessageHeadersType();
-            outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, headers);
-        }
+        JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
         String replyTo = headers.getJMSReplyTo();
         if (replyTo == null) {
             replyTo = jmsConfig.getReplyDestination();
@@ -168,28 +164,8 @@ public class JMSConduit extends Abstract
         final JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, headers);
         
         String userCID = headers.getJMSCorrelationID();
-        boolean messageIdPattern = false;
-        String correlationId = null;
-        if (!exchange.isOneWay()) {
-            if (userCID != null) {
-                correlationId = userCID;
-            } else if (!jmsConfig.isSetConduitSelectorPrefix()
-                       && (exchange.isSynchronous() || exchange.isOneWay())
-                       && (!jmsConfig.isSetUseConduitIdSelector() 
-                           || !jmsConfig.isUseConduitIdSelector())) {
-                messageIdPattern = true;
-            } else { 
-                if (jmsConfig.isUseConduitIdSelector()) {
-                    correlationId = JMSUtils.createCorrelationId(jmsConfig
-                        .getConduitSelectorPrefix()
-                                                                 + conduitId, messageCount
-                        .incrementAndGet());
-                } else {
-                    correlationId = JMSUtils.createCorrelationId(jmsConfig
-                        .getConduitSelectorPrefix(), messageCount.incrementAndGet());
-                }
-            }
-        }
+
+        String correlationId = createCorrelationId(exchange, userCID);
         
         Destination replyToDestination = null;
         if (!exchange.isOneWay() || !jmsConfig.isEnforceSpec() && isSetReplyTo(outMessage)
@@ -248,7 +224,7 @@ public class JMSConduit extends Abstract
         if (!exchange.isOneWay()) {
             synchronized (exchange) {
                 jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator);
-                if (messageIdPattern) {
+                if (correlationId == null) {
                     correlationId = messageCreator.getMessageID();
                 }
                 headers.setJMSMessageID(messageCreator.getMessageID());
@@ -263,6 +239,16 @@ public class JMSConduit extends Abstract
                     } else {
                         doReplyMessage(exchange, replyMessage);
                     }
+                    
+                    // TODO How do we delete the temp queue in case of an async request
+                    // or is async with a temp queue not possible ?
+                    if (replyToDestination instanceof TemporaryQueue) {
+                        try {
+                            ((TemporaryQueue)replyToDestination).delete();
+                        } catch (JMSException e) {
+                            throw new RuntimeException("Unable to remove temporary queue", e);
+                        }
+                    }
                 }
             }
         } else {
@@ -271,6 +257,38 @@ public class JMSConduit extends Abstract
         }
     }
 
+    private String createCorrelationId(final Exchange exchange, String userCID) {
+        String correlationId = null;
+        if (!exchange.isOneWay()) {
+            if (userCID != null) {
+                correlationId = userCID;
+            } else if (!jmsConfig.isSetConduitSelectorPrefix()
+                       && (exchange.isSynchronous() || exchange.isOneWay())
+                       && (!jmsConfig.isSetUseConduitIdSelector() 
+                           || !jmsConfig.isUseConduitIdSelector())) {
+                // in this case the correlation id will be set to
+                // the message id later
+                correlationId = null;
+            } else { 
+                String prefix = (jmsConfig.isUseConduitIdSelector()) 
+                    ? jmsConfig.getConduitSelectorPrefix() + conduitId 
+                    : jmsConfig.getConduitSelectorPrefix();
+                correlationId = JMSUtils.createCorrelationId(prefix, messageCount.incrementAndGet());
+            }
+        }
+        return correlationId;
+    }
+
+    private JMSMessageHeadersType getOrCreateJmsHeaders(final Message outMessage) {
+        JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+        if (headers == null) {
+            headers = new JMSMessageHeadersType();
+            outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, headers);
+        }
+        return headers;
+    }
+
     static class JMSBusLifeCycleListener implements BusLifeCycleListener {
         final WeakReference<JMSConduit> ref;
         BusLifeCycleManager blcm;