You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2009/01/14 07:02:01 UTC
svn commit: r734341 - in
/servicemix/components/bindings/servicemix-jms/trunk/src:
main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
Author: ffang
Date: Tue Jan 13 22:02:00 2009
New Revision: 734341
URL: http://svn.apache.org/viewvc?rev=734341&view=rev
Log:
[SM-1764]JmsProviderEndpointTest hang
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=734341&r1=734340&r2=734341&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Tue Jan 13 22:02:00 2009
@@ -16,39 +16,38 @@
*/
package org.apache.servicemix.jms.endpoints;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.RobustInOnly;
import javax.jbi.messaging.InOut;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.Fault;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.jms.JmsEndpointType;
import org.apache.servicemix.store.Store;
import org.apache.servicemix.store.StoreFactory;
import org.apache.servicemix.store.memory.MemoryStoreFactory;
-import org.springframework.jms.JmsException;
import org.springframework.jms.UncategorizedJmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.JmsTemplate102;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.AbstractPollingMessageListenerContainer;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer102;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
@@ -473,23 +472,22 @@
return;
// Exchange is active
} else {
+ NormalizedMessage in;
// Fault message
if (exchange.getFault() != null) {
done(exchange);
// In message
- } else {
- NormalizedMessage in = exchange.getMessage("in");
- if (in != null) {
- if (exchange instanceof InOnly) {
- processInOnly(exchange, in);
- done(exchange);
- } else {
- processInOut(exchange, in);
- }
- // This is not compliant with the default MEPs
- } else {
- throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault is provided");
+ } else if ((in = exchange.getMessage("in")) != null) {
+ if (exchange instanceof InOnly) {
+ processInOnly(exchange, in);
+ done(exchange);
+ }
+ else {
+ processInOut(exchange, in);
}
+ // This is not compliant with the default MEPs
+ } else {
+ throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault is provided");
}
}
// Unsupported role: this should never happen has we never create exchanges
@@ -511,18 +509,18 @@
protected void processInOnly(final MessageExchange exchange,
final NormalizedMessage in) throws Exception {
SessionCallback callback = new SessionCallback() {
- public Object doInJms(Session session) throws JMSException {
- try {
- processInOnlyInSession(exchange, in, session);
- return null;
- } catch (JMSException e) {
- throw e;
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new UncategorizedJmsException(e);
- }
- }
+ public Object doInJms(Session session) throws JMSException {
+ try {
+ processInOnlyInSession(exchange, in, session);
+ return null;
+ } catch (JMSException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new UncategorizedJmsException(e);
+ }
+ }
};
template.execute(callback, true);
}
@@ -563,6 +561,19 @@
*/
protected void processInOut(final MessageExchange exchange,
final NormalizedMessage in) throws Exception {
+ if (listenerContainer == null) {
+ // Obtain the default reply destination
+ if (replyDestination == null && replyDestinationName != null) {
+ replyDestination = (Destination) template.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ return destinationResolver.resolveDestinationName(session, replyDestinationName, isPubSubDomain());
+ }
+ });
+ }
+ //create the listener container
+ listenerContainer = createListenerContainer();
+ listenerContainer.start();
+ }
SessionCallback callback = new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
try {
@@ -609,12 +620,16 @@
boolean asynchronous = replyDest.equals(replyDestination);
if (asynchronous) {
- createAndStartListener();
store.store(correlationId, exchange);
}
try {
- send(session, dest, sendJmsMsg);
+ template.send(dest, new MessageCreator() {
+ public Message createMessage(Session session)
+ throws JMSException {
+ return sendJmsMsg;
+ }
+ });
} catch (Exception e) {
if (asynchronous) {
store.load(exchange.getExchangeId());
@@ -624,9 +639,10 @@
if (!asynchronous) {
// Create selector
- String selector = MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END;
+ String jmsId = sendJmsMsg.getJMSMessageID();
+ String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
// Receiving JMS Message, Creating and Returning NormalizedMessage out
- Message receiveJmsMsg = receiveSelected(session, replyDest, selector);
+ Message receiveJmsMsg = template.receiveSelected(replyDest, selector);
if (receiveJmsMsg == null) {
throw new IllegalStateException("Unable to receive response");
}
@@ -662,48 +678,6 @@
}
}
- private void send(final Session session, final Destination dest, final Message message) throws JmsException {
- // Do not call directly the template to avoid the cost of creating a new connection / session
-// template.send(dest, new MessageCreator() {
-// public Message createMessage(Session session) throws JMSException {
-// return message;
-// }
-// });
- try {
- Method method = JmsTemplate.class.getDeclaredMethod("doSend", Session.class, Destination.class, MessageCreator.class);
- method.setAccessible(true);
- method.invoke(template, session, dest, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return message;
- }
- });
- } catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- private Message receiveSelected(final Session session,
- final Destination dest,
- final String messageSelector) throws JMSException {
- // Do not call directly the template to avoid the cost of creating a new connection / session
-// return template.doReceive(session, dest, messageSelector);
- try {
- Method method = JmsTemplate.class.getDeclaredMethod("doReceive", Session.class, Destination.class, String.class);
- method.setAccessible(true);
- return (Message) method.invoke(template, session, dest, messageSelector);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
/**
* Process a JMS response message.
* This method delegates to the marshaler for the JBI out message creation
@@ -773,18 +747,11 @@
* @throws JMSException
*/
protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
- Destination dest = chooseDestination(exchange, message, session, destinationChooser,
- destination != null ? destination : destinationName);
- if (dest == null) {
- throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
- }
- return dest;
+ return chooseDestination(exchange, message, session, destinationChooser, destination);
}
/**
- * Choose the JMS destination for the reply message.
- * If no default destination is specified or can be extracted from the JBI exchange,
- * a temporary destination will be created.
+ * Choose the JMS destination for the reply message
*
* @param exchange
* @param message
@@ -793,17 +760,7 @@
* @throws JMSException
*/
protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
- Destination dest = chooseDestination(exchange, message, session, replyDestinationChooser,
- replyDestination != null ? replyDestination : replyDestinationName);
- if (dest == null) {
- if (isPubSubDomain()) {
- return session.createTemporaryQueue();
- } else {
- return session.createTemporaryTopic();
- }
- } else {
- return dest;
- }
+ return chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination);
}
/**
@@ -820,7 +777,7 @@
Object message,
Session session,
DestinationChooser chooser,
- Object defaultDestination) throws JMSException {
+ Destination defaultDestination) throws JMSException {
Object dest = null;
// Let the replyDestinationChooser a chance to choose the destination
if (chooser != null) {
@@ -838,7 +795,7 @@
(String) dest,
isPubSubDomain());
}
- return null;
+ throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
}
/**
@@ -855,14 +812,15 @@
store = storeFactory.open(getService().toString() + getEndpoint());
}
template = createTemplate();
- }
-
- protected synchronized void createAndStartListener() throws Exception {
- if (listenerContainer == null) {
- // create the listener container
- listenerContainer = createListenerContainer();
- listenerContainer.start();
+ // Obtain the default destination
+ if (destination == null && destinationName != null) {
+ destination = (Destination) template.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ return destinationResolver.resolveDestinationName(session, destinationName, isPubSubDomain());
+ }
+ });
}
+
}
/**
@@ -874,7 +832,6 @@
if (listenerContainer != null) {
listenerContainer.stop();
listenerContainer.shutdown();
- listenerContainer = null;
}
if (store != null) {
if (storeFactory != null) {
@@ -950,12 +907,12 @@
cont = new DefaultMessageListenerContainer();
}
cont.setConnectionFactory(getConnectionFactory());
- if (replyDestination != null) {
- cont.setDestination(replyDestination);
- }
- if (replyDestinationName != null) {
- cont.setDestinationName(replyDestinationName);
+ Destination replyDest = getReplyDestination();
+ if (replyDest == null) {
+ replyDest = resolveOrCreateDestination(template, replyDestinationName, isPubSubDomain());
+ setReplyDestination(replyDest);
}
+ cont.setDestination(replyDest);
cont.setPubSubDomain(isPubSubDomain());
cont.setPubSubNoLocal(isPubSubNoLocal());
cont.setMessageListener(new MessageListener() {
@@ -968,4 +925,31 @@
return cont;
}
+ /**
+ * If the destinationName given is null then a temporary destination is created else the destination name
+ * is resolved using the resolver from the jmsConfig
+ *
+ * @param jmsTemplate template to use for session and resolver
+ * @param replyToDestinationName null for temporary destination or a destination name
+ * @param pubSubDomain true=pubSub, false=Queues
+ * @return resolved destination
+ */
+ private Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
+ final String replyToDestinationName,
+ final boolean pubSubDomain) {
+ return (Destination)jmsTemplate.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ if (replyToDestinationName == null) {
+ if (destination instanceof Queue) {
+ return session.createTemporaryQueue();
+ } else {
+ return session.createTemporaryTopic();
+ }
+ }
+ DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+ return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
+ }
+ });
+ }
+
}
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=734341&r1=734340&r2=734341&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Tue Jan 13 22:02:00 2009
@@ -128,7 +128,7 @@
endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
endpoint.setEndpoint("HelloPort");
endpoint.setDestinationName("destination");
- endpoint.setConnectionFactory(connectionFactory);
+ endpoint.setConnectionFactory(new PooledConnectionFactory(connectionFactory));
component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
container.activateComponent(component, "servicemix-jms");