You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/01/12 18:53:41 UTC
svn commit: r733850 - in
/servicemix/components/bindings/servicemix-jms/trunk/src:
main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
Author: gnodet
Date: Mon Jan 12 09:53:35 2009
New Revision: 733850
URL: http://svn.apache.org/viewvc?rev=733850&view=rev
Log:
SM-1760: smx-jms provider should not create a temporary replyTo destination unless one is needed
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=733850&r1=733849&r2=733850&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Mon Jan 12 09:53:35 2009
@@ -16,6 +16,9 @@
*/
package org.apache.servicemix.jms.endpoints;
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
+
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
@@ -32,6 +35,7 @@
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
+import javax.jms.MessageProducer;
import org.apache.servicemix.common.endpoints.ProviderEndpoint;
import org.apache.servicemix.common.JbiConstants;
@@ -40,6 +44,7 @@
import org.apache.servicemix.store.StoreFactory;
import org.apache.servicemix.store.memory.MemoryStoreFactory;
import org.springframework.jms.UncategorizedJmsException;
+import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.JmsTemplate102;
import org.springframework.jms.core.MessageCreator;
@@ -50,6 +55,7 @@
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
+import org.springframework.jms.support.JmsUtils;
/**
* A Spring-based JMS provider endpoint
@@ -607,16 +613,12 @@
boolean asynchronous = replyDest.equals(replyDestination);
if (asynchronous) {
+ createAndStartListener();
store.store(correlationId, exchange);
}
try {
- template.send(dest, new MessageCreator() {
- public Message createMessage(Session session)
- throws JMSException {
- return sendJmsMsg;
- }
- });
+ send(session, dest, sendJmsMsg);
} catch (Exception e) {
if (asynchronous) {
store.load(exchange.getExchangeId());
@@ -626,10 +628,9 @@
if (!asynchronous) {
// Create selector
- String jmsId = sendJmsMsg.getJMSMessageID();
- String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
+ String selector = MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END;
// Receiving JMS Message, Creating and Returning NormalizedMessage out
- Message receiveJmsMsg = template.receiveSelected(replyDest, selector);
+ Message receiveJmsMsg = receiveSelected(session, replyDest, selector);
if (receiveJmsMsg == null) {
throw new IllegalStateException("Unable to receive response");
}
@@ -665,6 +666,47 @@
}
}
+ private void send(final Session session, final Destination destination, final Message message) throws JmsException {
+ // Do not call directly the template to avoid the cost of creating a new connection / session
+// template.send(dest, new MessageCreator() {
+// public Message createMessage(Session session)
+// throws JMSException {
+// return sendJmsMsg;
+// }
+// });
+ try {
+ Method method = JmsTemplate.class.getDeclaredMethod("doSend", Session.class, Destination.class, MessageCreator.class);
+ method.setAccessible(true);
+ method.invoke(template, session, destination, new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ return message;
+ }
+ });
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Message receiveSelected(final Session session, final Destination destination, final String messageSelector) throws JMSException {
+ // Do not call directly the template to avoid the cost of creating a new connection / session
+// return template.doReceive(session, destination, messageSelector);
+ try {
+ Method method = JmsTemplate.class.getDeclaredMethod("doReceive", Session.class, Destination.class, String.class);
+ method.setAccessible(true);
+ return (Message) method.invoke(template, session, destination, messageSelector);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Process a JMS response message.
* This method delegates to the marshaler for the JBI out message creation
@@ -734,11 +776,17 @@
* @throws JMSException
*/
protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
- return chooseDestination(exchange, message, session, destinationChooser, destination);
+ Destination dest = chooseDestination(exchange, message, session, destinationChooser, destination != null ? destination : destinationName);
+ if (dest == null) {
+ throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
+ }
+ return dest;
}
/**
- * Choose the JMS destination for the reply message
+ * Choose the JMS destination for the reply message.
+ * If no default destination is specified or can be extracted from the JBI exchange,
+ * a temporary destination will be created.
*
* @param exchange
* @param message
@@ -747,7 +795,16 @@
* @throws JMSException
*/
protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
- return chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination);
+ Destination dest = chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination != null ? replyDestination : replyDestinationName);
+ if (dest == null) {
+ if (isPubSubDomain()) {
+ return session.createTemporaryQueue();
+ } else {
+ return session.createTemporaryTopic();
+ }
+ } else {
+ return dest;
+ }
}
/**
@@ -764,7 +821,7 @@
Object message,
Session session,
DestinationChooser chooser,
- Destination defaultDestination) throws JMSException {
+ Object defaultDestination) throws JMSException {
Object dest = null;
// Let the replyDestinationChooser a chance to choose the destination
if (chooser != null) {
@@ -782,7 +839,7 @@
(String) dest,
isPubSubDomain());
}
- throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
+ return null;
}
/**
@@ -799,25 +856,14 @@
store = storeFactory.open(getService().toString() + getEndpoint());
}
template = createTemplate();
- // Obtain the default destination
- if (destination == null && destinationName != null) {
- destination = (Destination) template.execute(new SessionCallback() {
- public Object doInJms(Session session) throws JMSException {
- return destinationResolver.resolveDestinationName(session, destinationName, isPubSubDomain());
- }
- });
- }
- // Obtain the default reply destination
- if (replyDestination == null && replyDestinationName != null) {
- replyDestination = (Destination) template.execute(new SessionCallback() {
- public Object doInJms(Session session) throws JMSException {
- return destinationResolver.resolveDestinationName(session, replyDestinationName, isPubSubDomain());
- }
- });
+ }
+
+ protected synchronized void createAndStartListener() throws Exception {
+ if (listenerContainer == null) {
+ // create the listener container
+ listenerContainer = createListenerContainer();
+ listenerContainer.start();
}
- // create the listener container
- listenerContainer = createListenerContainer();
- listenerContainer.start();
}
/**
@@ -829,6 +875,7 @@
if (listenerContainer != null) {
listenerContainer.stop();
listenerContainer.shutdown();
+ listenerContainer = null;
}
if (store != null) {
if (storeFactory != null) {
@@ -904,12 +951,12 @@
cont = new DefaultMessageListenerContainer();
}
cont.setConnectionFactory(getConnectionFactory());
- Destination replyDest = getReplyDestination();
- if (replyDest == null) {
- replyDest = resolveOrCreateDestination(template, replyDestinationName, isPubSubDomain());
- setReplyDestination(replyDest);
+ if (replyDestination != null) {
+ cont.setDestination(replyDestination);
+ }
+ if (replyDestinationName != null) {
+ cont.setDestinationName(replyDestinationName);
}
- cont.setDestination(replyDest);
cont.setPubSubDomain(isPubSubDomain());
cont.setPubSubNoLocal(isPubSubNoLocal());
cont.setMessageListener(new MessageListener() {
@@ -922,31 +969,4 @@
return cont;
}
- /**
- * If the destinationName given is null then a temporary destination is created else the destination name
- * is resolved using the resolver from the jmsConfig
- *
- * @param jmsTemplate template to use for session and resolver
- * @param replyToDestinationName null for temporary destination or a destination name
- * @param pubSubDomain true=pubSub, false=Queues
- * @return resolved destination
- */
- private Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
- final String replyToDestinationName,
- final boolean pubSubDomain) {
- return (Destination)jmsTemplate.execute(new SessionCallback() {
- public Object doInJms(Session session) throws JMSException {
- if (replyToDestinationName == null) {
- if (destination instanceof Queue) {
- return session.createTemporaryQueue();
- } else {
- return session.createTemporaryTopic();
- }
- }
- DestinationResolver resolv = jmsTemplate.getDestinationResolver();
- return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
- }
- });
- }
-
}
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=733850&r1=733849&r2=733850&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Mon Jan 12 09:53:35 2009
@@ -95,6 +95,79 @@
assertNotNull(msg);
}
+ public void testProviderInOnlyWithoutReplyDest() throws Exception {
+ JmsComponent component = new JmsComponent();
+
+ JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
+ endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
+ endpoint.setEndpoint("HelloPort");
+ endpoint.setDestinationName("destination");
+ endpoint.setConnectionFactory(connectionFactory);
+ component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
+ container.activateComponent(component, "servicemix-jms");
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-OneWay.xml").getInputStream(), baos);
+ InOnly me = client.createInOnlyExchange();
+ me.getInMessage().setContent(new StringSource(baos.toString()));
+
+ me.setOperation(new QName("uri:HelloWorld", "OneWay"));
+ me.setService(new QName("uri:HelloWorld", "HelloService"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+
+ Message msg = jmsTemplate.receive("destination");
+ assertNotNull(msg);
+ System.err.println(((TextMessage) msg).getText());
+ }
+
+ public void testProviderInOutWithoutReplyDest() throws Exception {
+ JmsComponent component = new JmsComponent();
+
+ JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
+ endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
+ endpoint.setEndpoint("HelloPort");
+ endpoint.setDestinationName("destination");
+ endpoint.setConnectionFactory(connectionFactory);
+ component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
+ container.activateComponent(component, "servicemix-jms");
+
+ Thread th = new Thread() {
+ public void run() {
+ try {
+ final Message msg = jmsTemplate.receive("destination");
+ assertNotNull(msg);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Output.xml")
+ .getInputStream(), baos);
+ jmsTemplate.send(msg.getJMSReplyTo(), new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ TextMessage rep = session.createTextMessage(baos.toString());
+ rep.setJMSCorrelationID(msg.getJMSCorrelationID() != null ? msg.getJMSCorrelationID() : msg.getJMSMessageID());
+ return rep;
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ th.start();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-OneWay.xml").getInputStream(), baos);
+ InOut me = client.createInOutExchange();
+ me.getInMessage().setContent(new StringSource(baos.toString()));
+ me.setOperation(new QName("uri:HelloWorld", "OneWay"));
+ me.setService(new QName("uri:HelloWorld", "HelloService"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getOutMessage());
+ assertNotNull(me.getOutMessage().getContent());
+ System.err.println(new SourceTransformer().contentToString(me.getOutMessage()));
+ client.done(me);
+ }
+
public void testSoapProviderInOnly() throws Exception {
JmsComponent component = new JmsComponent();
@@ -116,10 +189,6 @@
me.setService(new QName("uri:HelloWorld", "HelloService"));
client.sendSync(me);
assertEquals(ExchangeStatus.DONE, me.getStatus());
-
- Message msg = jmsTemplate.receive("destination");
- assertNotNull(msg);
- System.err.println(((TextMessage) msg).getText());
}
public void testSoapProviderInOut() throws Exception {
@@ -169,7 +238,6 @@
assertNotNull(me.getOutMessage().getContent());
System.err.println(new SourceTransformer().contentToString(me.getOutMessage()));
client.done(me);
-
}
public void testSoapProviderInOutWithoutReplyDest() throws Exception {