You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2011/01/07 21:23:16 UTC
svn commit: r1056490 - in /cxf/branches/2.3.x-fixes: ./
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Author: dkulp
Date: Fri Jan 7 20:23:16 2011
New Revision: 1056490
URL: http://svn.apache.org/viewvc?rev=1056490&view=rev
Log:
Merged revisions 1055837 via svnmerge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r1055837 | cschneider | 2011-01-06 06:45:50 -0500 (Thu, 06 Jan 2011) | 1 line
CXF-3230 delete jms temp queue after request
........
Modified:
cxf/branches/2.3.x-fixes/ (props changed)
cxf/branches/2.3.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Propchange: cxf/branches/2.3.x-fixes/
('svn:mergeinfo' removed)
Propchange: cxf/branches/2.3.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: cxf/branches/2.3.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=1056490&r1=1056489&r2=1056490&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/branches/2.3.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Fri Jan 7 20:23:16 2011
@@ -34,6 +34,7 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
import org.apache.cxf.Bus;
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
@@ -155,12 +156,7 @@ public class JMSConduit extends Abstract
throw new ConfigurationException(msg);
}
- JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
- .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
- if (headers == null) {
- headers = new JMSMessageHeadersType();
- outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, headers);
- }
+ JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
String replyTo = headers.getJMSReplyTo();
if (replyTo == null) {
replyTo = jmsConfig.getReplyDestination();
@@ -168,28 +164,8 @@ public class JMSConduit extends Abstract
final JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, headers);
String userCID = headers.getJMSCorrelationID();
- boolean messageIdPattern = false;
- String correlationId = null;
- if (!exchange.isOneWay()) {
- if (userCID != null) {
- correlationId = userCID;
- } else if (!jmsConfig.isSetConduitSelectorPrefix()
- && (exchange.isSynchronous() || exchange.isOneWay())
- && (!jmsConfig.isSetUseConduitIdSelector()
- || !jmsConfig.isUseConduitIdSelector())) {
- messageIdPattern = true;
- } else {
- if (jmsConfig.isUseConduitIdSelector()) {
- correlationId = JMSUtils.createCorrelationId(jmsConfig
- .getConduitSelectorPrefix()
- + conduitId, messageCount
- .incrementAndGet());
- } else {
- correlationId = JMSUtils.createCorrelationId(jmsConfig
- .getConduitSelectorPrefix(), messageCount.incrementAndGet());
- }
- }
- }
+
+ String correlationId = createCorrelationId(exchange, userCID);
Destination replyToDestination = null;
if (!exchange.isOneWay() || !jmsConfig.isEnforceSpec() && isSetReplyTo(outMessage)
@@ -248,7 +224,7 @@ public class JMSConduit extends Abstract
if (!exchange.isOneWay()) {
synchronized (exchange) {
jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator);
- if (messageIdPattern) {
+ if (correlationId == null) {
correlationId = messageCreator.getMessageID();
}
headers.setJMSMessageID(messageCreator.getMessageID());
@@ -263,6 +239,16 @@ public class JMSConduit extends Abstract
} else {
doReplyMessage(exchange, replyMessage);
}
+
+ // TODO How do we delete the temp queue in case of an async request
+ // or is async with a temp queue not possible ?
+ if (replyToDestination instanceof TemporaryQueue) {
+ try {
+ ((TemporaryQueue)replyToDestination).delete();
+ } catch (JMSException e) {
+ throw new RuntimeException("Unable to remove temporary queue", e);
+ }
+ }
}
}
} else {
@@ -271,6 +257,38 @@ public class JMSConduit extends Abstract
}
}
+ private String createCorrelationId(final Exchange exchange, String userCID) {
+ String correlationId = null;
+ if (!exchange.isOneWay()) {
+ if (userCID != null) {
+ correlationId = userCID;
+ } else if (!jmsConfig.isSetConduitSelectorPrefix()
+ && (exchange.isSynchronous() || exchange.isOneWay())
+ && (!jmsConfig.isSetUseConduitIdSelector()
+ || !jmsConfig.isUseConduitIdSelector())) {
+ // in this case the correlation id will be set to
+ // the message id later
+ correlationId = null;
+ } else {
+ String prefix = (jmsConfig.isUseConduitIdSelector())
+ ? jmsConfig.getConduitSelectorPrefix() + conduitId
+ : jmsConfig.getConduitSelectorPrefix();
+ correlationId = JMSUtils.createCorrelationId(prefix, messageCount.incrementAndGet());
+ }
+ }
+ return correlationId;
+ }
+
+ private JMSMessageHeadersType getOrCreateJmsHeaders(final Message outMessage) {
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+ if (headers == null) {
+ headers = new JMSMessageHeadersType();
+ outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, headers);
+ }
+ return headers;
+ }
+
static class JMSBusLifeCycleListener implements BusLifeCycleListener {
final WeakReference<JMSConduit> ref;
BusLifeCycleManager blcm;