You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2008/09/29 22:11:34 UTC
svn commit: r700236 [1/2] - in /cxf/trunk/rt/transports/jms/src:
main/java/org/apache/cxf/transport/jms/
main/java/org/apache/cxf/transport/jms/spring/
main/java/org/apache/cxf/transport/jms/wsdl11/
main/resources/schemas/configuration/ test/java/org/a...
Author: cschneider
Date: Mon Sep 29 13:11:34 2008
New Revision: 700236
URL: http://svn.apache.org/viewvc?rev=700236&view=rev
Log:
CXF-1832
Added:
cxf/trunk/rt/transports/jms/src/test/resources/wsdl/
cxf/trunk/rt/transports/jms/src/test/resources/wsdl/jms_test.wsdl
Removed:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSListenerThread.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java
cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java
cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Mon Sep 29 13:11:34 2008
@@ -23,70 +23,58 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
import javax.jms.QueueSession;
import javax.jms.Session;
-import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.configuration.Configurable;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
-import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.destination.DestinationResolver;
/**
* JMSConduit is instantiated by the JMSTransportfactory which is selected by a client if the transport
* protocol starts with jms:// JMSConduit converts CXF Messages to JMS Messages and sends the request by using
- * JMS topics or queues. If the Exchange is not oneway it then recevies the response and converts it to a CXF
+ * a JMS destination. If the Exchange is not oneway it then recevies the response and converts it to a CXF
* Message. This is then provided in the Exchange and also sent to the incomingObserver
*/
-public class JMSConduit extends AbstractConduit implements Configurable, JMSExchangeSender {
-
- protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
-
+public class JMSConduit extends AbstractConduit implements JMSExchangeSender, MessageListener,
+ InitializingBean {
static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
- protected Bus bus;
- protected EndpointInfo endpointInfo;
- protected JMSConfiguration jmsConfig;
- protected String beanNameSuffix;
+ private JMSConfiguration jmsConfig;
+ private Map<String, Message> correlationMap;
- public JMSConduit(Bus b, EndpointInfo endpointInfo) {
- this(b, endpointInfo, null);
- }
+ private DefaultMessageListenerContainer jmsListener;
+ private JmsTemplate jmsTemplate;
- public JMSConduit(Bus b, EndpointInfo endpointInfo, EndpointReferenceType target) {
+ public JMSConduit(EndpointReferenceType target, JMSConfiguration jmsConfig) {
super(target);
- this.bus = b;
- this.endpointInfo = endpointInfo;
- this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
- }
-
- // prepare the message for send out , not actually send out the message
- public void prepare(Message message) throws IOException {
- message.setContent(OutputStream.class, new JMSOutputStream(this, message.getExchange(),
- isTextPayload()));
- // After this step flow will continue in JMSOutputStream.doClose()
+ this.jmsConfig = jmsConfig;
+ correlationMap = new ConcurrentHashMap<String, Message>();
}
-
- public Destination determineReplyToDestination(final JmsTemplate jmsTemplate,
+
+ private Destination determineReplyToDestination(final JmsTemplate jmsTemplate2,
final String replyToDestinationName,
- final boolean pubSubDomain, boolean isOneWay) {
- if (isOneWay) {
- return null;
- }
- return (Destination)jmsTemplate.execute(new SessionCallback() {
+ final boolean pubSubDomain) {
+ return (Destination)jmsTemplate2.execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
if (replyToDestinationName == null) {
if (session instanceof QueueSession) {
@@ -97,13 +85,56 @@
return session.createTemporaryQueue();
}
}
- DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+ DestinationResolver resolv = jmsTemplate2.getDestinationResolver();
return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
}
});
}
/**
+ * Initialize jmsTemplate and jmsListener from jms configuration data in jmsConfig {@inheritDoc}
+ */
+ public void afterPropertiesSet() {
+ jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102();
+ jmsTemplate.setDefaultDestinationName(jmsConfig.getTargetDestination());
+ jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory());
+ jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain());
+ jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout());
+ jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive());
+ jmsTemplate.setPriority(jmsConfig.getPriority());
+ jmsTemplate.setDeliveryMode(jmsConfig.getDeliveryMode());
+ jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled());
+ jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted());
+
+ jmsListener = new DefaultMessageListenerContainer();
+ jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
+ jmsListener.setAutoStartup(false);
+ jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory());
+ jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
+ jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
+ jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
+ jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
+
+ jmsListener.setMessageListener(this);
+
+ if (jmsConfig.getDestinationResolver() != null) {
+ jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver());
+ jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
+ }
+ }
+
+ /**
+ * Prepare the message for send out. The message will be sent after the caller has written the payload to
+ * the OutputStream of the message and calls the close method of the stream. In the JMS case the
+ * JMSOutputStream will then call back the sendExchange method of this class. {@inheritDoc}
+ */
+ public void prepare(Message message) throws IOException {
+ boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
+ JMSOutputStream out = new JMSOutputStream(this, message.getExchange(), isTextPayload);
+ message.setContent(OutputStream.class, out);
+ }
+
+ /**
* Send the JMS Request out and if not oneWay receive the response
*
* @param outMessage
@@ -112,58 +143,93 @@
*/
public void sendExchange(final Exchange exchange, final Object request) {
LOG.log(Level.FINE, "JMSConduit send message");
- final JmsTemplate jmsTemplate = jmsConfig.getJmsTemplate();
- final Destination replyTo = determineReplyToDestination(jmsTemplate,
- jmsConfig.getReplyDestination(), jmsConfig
- .isPubSubDomain(), exchange.isOneWay());
final Message outMessage = exchange.getOutMessage();
if (outMessage == null) {
throw new RuntimeException("Exchange to be sent has no outMessage");
}
+
+ if (!exchange.isOneWay() && !jmsListener.isRunning()) {
+ Destination replyTo = determineReplyToDestination(jmsTemplate,
+ jmsConfig.getReplyDestination(),
+ jmsConfig.isPubSubDomain());
+ jmsListener.setDestination(replyTo);
+ jmsListener.start();
+ jmsListener.initialize();
+ }
JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
final String correlationId = (headers != null && headers.isSetJMSCorrelationID()) ? headers
- .getJMSCorrelationID() : JMSUtils.generateUniqueSelector();
- String selector = "JMSCorrelationID = '" + correlationId + "'";
-
- // TODO This is not thread safe
- jmsTemplate.setPriority(JMSUtils.getJMSPriority(headers));
- jmsTemplate.send(jmsConfig.getTargetDestination(), new MessageCreator() {
+ .getJMSCorrelationID() : JMSUtils.generateCorrelationId();
+ // String selector = "JMSCorrelationID = '" + correlationId + "'";
+
+ jmsTemplate.send(new MessageCreator() {
public javax.jms.Message createMessage(Session session) throws JMSException {
String messageType = jmsConfig.getMessageType();
final javax.jms.Message jmsMessage;
jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(outMessage, request, messageType,
- session, replyTo, correlationId);
+ session, jmsListener.getDestination(),
+ correlationId);
LOG.log(Level.FINE, "client sending request: ", jmsMessage);
return jmsMessage;
}
});
+ /**
+ * If the message is not oneWay we will expect to receive a reply on the listener.
+ * To receive this reply we add the correlationId and an empty CXF Message to the
+ * correlationMap. The listener will fill to Message and notify this thread
+ */
if (!exchange.isOneWay()) {
- javax.jms.Message jmsMessage = jmsTemplate.receiveSelected(replyTo, selector);
- if (jmsMessage == null) {
- throw new RuntimeException("JMS receive timed out");
- }
Message inMessage = new MessageImpl();
- LOG.log(Level.FINE, "client received reply: ", jmsMessage);
- JMSUtils
- .populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
- byte[] response = JMSUtils.retrievePayload(jmsMessage);
- LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
- inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
- exchange.setInMessage(inMessage);
+ synchronized (inMessage) {
+ correlationMap.put(correlationId, inMessage);
+ try {
+ inMessage.wait(jmsTemplate.getReceiveTimeout());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ correlationMap.remove(correlationId);
+ }
if (incomingObserver != null) {
incomingObserver.onMessage(inMessage);
}
+ exchange.setInMessage(inMessage);
}
}
- private boolean isTextPayload() {
- return JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
+ /**
+ * When a message is received on the reply destination the correlation map is searched
+ * for the correlationId. If it is found the message is converted to a CXF message and the
+ * thread sending the request is notified
+ *
+ * {@inheritDoc}
+ */
+ public void onMessage(javax.jms.Message jmsMessage) {
+ String correlationId;
+ try {
+ correlationId = jmsMessage.getJMSCorrelationID();
+ } catch (JMSException e) {
+ throw JmsUtils.convertJmsAccessException(e);
+ }
+ Message inMessage = correlationMap.get(correlationId);
+ if (inMessage == null) {
+ LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId);
+ }
+ LOG.log(Level.FINE, "client received reply: ", jmsMessage);
+ JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+ byte[] response = JMSUtils.retrievePayload(jmsMessage);
+ LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
+ inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
+
+ synchronized (inMessage) {
+ inMessage.notifyAll();
+ }
+
}
public void close() {
+ jmsListener.shutdown();
LOG.log(Level.FINE, "JMSConduit closed ");
}
@@ -171,10 +237,6 @@
return LOG;
}
- public String getBeanName() {
- return endpointInfo.getName().toString() + ".jms-conduit";
- }
-
public JMSConfiguration getJmsConfig() {
return jmsConfig;
}
@@ -183,4 +245,14 @@
this.jmsConfig = jmsConfig;
}
+ @Override
+ protected void finalize() throws Throwable {
+ if (jmsListener.isRunning()) {
+ jmsListener.shutdown();
+ }
+ super.finalize();
+ }
+
+
+
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Mon Sep 29 13:11:34 2008
@@ -19,52 +19,139 @@
package org.apache.cxf.transport.jms;
import javax.jms.ConnectionFactory;
+import javax.jms.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.listener.AbstractJmsListeningContainer;
+import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.transaction.PlatformTransactionManager;
public class JMSConfiguration implements InitializingBean {
private ConnectionFactory connectionFactory;
- private JmsTemplate jmsTemplate;
- private AbstractJmsListeningContainer jmsListener;
+ private DestinationResolver destinationResolver;
+ private PlatformTransactionManager transactionManager;
+ private boolean useJms11 = true;
+ private boolean useJndi;
+ private boolean messageIdEnabled = true;
+ private boolean messageTimestampEnabled = true;
+ private boolean pubSubNoLocal;
+ private long receiveTimeout = JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT;
+ private boolean explicitQosEnabled;
+ private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+ private int priority = Message.DEFAULT_PRIORITY;
+ private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+ private boolean sessionTransacted;
+
+ private volatile String messageSelector;
+ private boolean subscriptionDurable;
+ private String durableSubscriptionName;
+
private String targetDestination;
private String replyDestination;
- private String messageType;
+ private String messageType = JMSConstants.TEXT_MESSAGE_TYPE;
private boolean pubSubDomain;
- public JMSConfiguration() {
- targetDestination = null;
- replyDestination = null;
- messageType = JMSConstants.TEXT_MESSAGE_TYPE;
- pubSubDomain = false;
+ public boolean isUseJndi() {
+ return useJndi;
}
- public void afterPropertiesSet() throws Exception {
- /*
- * if (connectionFactory == null) { throw new RuntimeException("Required property connectionfactory
- * was not set"); } jmsTemplate.setConnectionFactory(connectionFactory);
- * jmsListener.setConnectionFactory(connectionFactory);
- */
+ public void setUseJndi(boolean useJndi) {
+ this.useJndi = useJndi;
}
- public JmsTemplate getJmsTemplate() {
- return jmsTemplate;
+ public boolean isMessageIdEnabled() {
+ return messageIdEnabled;
}
- @Required
- public void setJmsTemplate(JmsTemplate jmsTemplate) {
- this.jmsTemplate = jmsTemplate;
+ public void setMessageIdEnabled(boolean messageIdEnabled) {
+ this.messageIdEnabled = messageIdEnabled;
}
- public AbstractJmsListeningContainer getJmsListener() {
- return jmsListener;
+ public boolean isMessageTimestampEnabled() {
+ return messageTimestampEnabled;
}
- @Required
- public void setJmsListener(AbstractJmsListeningContainer jmsListener) {
- this.jmsListener = jmsListener;
+ public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
+ this.messageTimestampEnabled = messageTimestampEnabled;
+ }
+
+ public boolean isPubSubNoLocal() {
+ return pubSubNoLocal;
+ }
+
+ public void setPubSubNoLocal(boolean pubSubNoLocal) {
+ this.pubSubNoLocal = pubSubNoLocal;
+ }
+
+ public long getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ public void setReceiveTimeout(long receiveTimeout) {
+ this.receiveTimeout = receiveTimeout;
+ }
+
+ public boolean isExplicitQosEnabled() {
+ return explicitQosEnabled;
+ }
+
+ public void setExplicitQosEnabled(boolean explicitQosEnabled) {
+ this.explicitQosEnabled = explicitQosEnabled;
+ }
+
+ public int getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ public void setDeliveryMode(int deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public long getTimeToLive() {
+ return timeToLive;
+ }
+
+ public void setTimeToLive(long timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ public String getMessageSelector() {
+ return messageSelector;
+ }
+
+ public void setMessageSelector(String messageSelector) {
+ this.messageSelector = messageSelector;
+ }
+
+ public boolean isSubscriptionDurable() {
+ return subscriptionDurable;
+ }
+
+ public void setSubscriptionDurable(boolean subscriptionDurable) {
+ this.subscriptionDurable = subscriptionDurable;
+ }
+
+ public String getDurableSubscriptionName() {
+ return durableSubscriptionName;
+ }
+
+ public void setDurableSubscriptionName(String durableSubscriptionName) {
+ this.durableSubscriptionName = durableSubscriptionName;
+ }
+
+ public void afterPropertiesSet() throws Exception {
+ if (connectionFactory == null) {
+ throw new RuntimeException("Required property connectionfactory was not set");
+ }
}
public ConnectionFactory getConnectionFactory() {
@@ -108,4 +195,36 @@
this.pubSubDomain = pubSubDomain;
}
+ public boolean isUseJms11() {
+ return useJms11;
+ }
+
+ public void setUseJms11(boolean useJms11) {
+ this.useJms11 = useJms11;
+ }
+
+ public DestinationResolver getDestinationResolver() {
+ return destinationResolver;
+ }
+
+ public void setDestinationResolver(DestinationResolver destinationResolver) {
+ this.destinationResolver = destinationResolver;
+ }
+
+ public boolean isSessionTransacted() {
+ return sessionTransacted;
+ }
+
+ public void setSessionTransacted(boolean sessionTransacted) {
+ this.sessionTransacted = sessionTransacted;
+ }
+
+ public PlatformTransactionManager getTransactionManager() {
+ return transactionManager;
+ }
+
+ public void setTransactionManager(PlatformTransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Mon Sep 29 13:11:34 2008
@@ -29,27 +29,19 @@
import java.util.Map;
import java.util.SimpleTimeZone;
import java.util.TimeZone;
-import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.BytesMessage;
-import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
-import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.xml.namespace.QName;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.configuration.Configurable;
-import org.apache.cxf.configuration.Configurer;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
@@ -58,56 +50,31 @@
import org.apache.cxf.transport.AbstractConduit;
import org.apache.cxf.transport.AbstractMultiplexDestination;
import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.ConduitInitiator;
import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.workqueue.SynchronousExecutor;
-import org.apache.cxf.workqueue.WorkQueueManager;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.JmsUtils;
+import org.springframework.jms.support.destination.DestinationResolver;
-public class JMSDestination extends AbstractMultiplexDestination implements Configurable, MessageListener,
+public class JMSDestination extends AbstractMultiplexDestination implements MessageListener,
JMSExchangeSender {
- protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
-
private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
- protected ServerConfig serverConfig;
- protected ServerBehaviorPolicyType runtimePolicy;
- protected AddressType address;
- protected SessionPoolType sessionPool;
- protected Destination targetDestination;
- protected Destination replyToDestination;
- protected JMSSessionFactory sessionFactory;
- protected Bus bus;
- protected EndpointInfo endpointInfo;
- protected String beanNameSuffix;
-
- final ConduitInitiator conduitInitiator;
- Session listenerSession;
- JMSListenerThread listenerThread;
+ private JMSConfiguration jmsConfig;
+ private Bus bus;
+ private DefaultMessageListenerContainer jmsListener;
+ private JmsTemplate jmsTemplate;
- public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info) throws IOException {
+ public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
super(b, getTargetReference(info, b), info);
-
this.bus = b;
- this.endpointInfo = info;
- this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
- conduitInitiator = ci;
-
- initConfig();
- }
-
- private void initConfig() {
- this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
- ServerBehaviorPolicyType.class);
- this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
- this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
- this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
- Configurer configurer = bus.getExtension(Configurer.class);
- if (null != configurer) {
- configurer.configureBean(this);
- }
+ this.jmsConfig = jmsConfig;
}
/**
@@ -119,64 +86,46 @@
return new BackChannelConduit(this, anon, inMessage);
}
- private Executor getExecutor(WorkQueueManager wqm, QName name) {
- // Fallback if no Workqueuemanager
- Executor executor = SynchronousExecutor.getInstance();
- if (wqm != null) {
- if (name != null) {
- executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}" + name.getLocalPart());
- }
- if (executor == null) {
- executor = wqm.getNamedWorkQueue("jms");
- }
- if (executor == null) {
- executor = wqm.getAutomaticWorkQueue();
- }
- }
- return executor;
- }
-
/**
- * Initialize Sessionfactory, Initialize and start ListenerThread {@inheritDoc}
+ * Initialize jmsTemplate and jmsListener from jms configuration data in jmsConfig {@inheritDoc}
*/
public void activate() {
getLogger().log(Level.INFO, "JMSDestination activate().... ");
- if (this.address == null || this.address.getJndiConnectionFactoryName() == null) {
- throw new RuntimeException("Insufficient configuration for Destination. "
- + "Did you configure a <jms:destination name=\"" + getBeanName()
- + "\"> and set the jndiConnectionFactoryName ?");
+ jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102();
+ jmsTemplate.setDefaultDestinationName(jmsConfig.getReplyDestination());
+ jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory());
+ jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain());
+ jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout());
+ jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive());
+ jmsTemplate.setPriority(jmsConfig.getPriority());
+ jmsTemplate.setDeliveryMode(jmsConfig.getDeliveryMode());
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted());
+
+ jmsListener = new DefaultMessageListenerContainer();
+ jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
+ jmsListener.setAutoStartup(true);
+ jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory());
+ jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
+ jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
+ jmsListener.setDestinationName(jmsConfig.getTargetDestination());
+ jmsListener.setMessageListener(this);
+ jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
+ jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
+
+ if (jmsConfig.getDestinationResolver() != null) {
+ jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver());
+ jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
}
- try {
- getLogger().log(Level.FINE, "establishing JMS connection");
- sessionFactory = JMSSessionFactory.connect(getJMSAddress(), getSessionPool(), serverConfig);
- Connection connection = sessionFactory.getConnection();
- Context context = sessionFactory.getInitialContext();
- this.targetDestination = JMSUtils.resolveRequestDestination(context, connection, address);
- this.replyToDestination = JMSUtils.resolveRequestDestination(context, connection, address);
- WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
- QName name = null;
- if (endpointInfo != null) {
- name = endpointInfo.getName();
- }
- Executor executor = getExecutor(wqm, name);
- String messageSelector = runtimePolicy.getMessageSelector();
- String durableName = runtimePolicy.getDurableSubscriberName();
- listenerThread = new JMSListenerThread(executor, this);
- listenerThread.start(connection, targetDestination, messageSelector, durableName);
- } catch (JMSException ex) {
- getLogger().log(Level.SEVERE, "JMS connect failed with JMSException : ", ex);
- } catch (NamingException nex) {
- getLogger().log(Level.SEVERE, "JMS connect failed with NamingException : ", nex);
+ if (!jmsListener.isRunning()) {
+ jmsListener.initialize();
}
}
public void deactivate() {
- if (listenerThread != null) {
- listenerThread.close();
- }
- sessionFactory.shutdown();
+ jmsListener.shutdown();
}
public void shutdown() {
@@ -184,24 +133,31 @@
this.deactivate();
}
- public Queue getReplyToDestination(Message inMessage) throws JMSException, NamingException {
+ private Destination resolveDestinationName(final String name) {
+ return (Destination)jmsTemplate.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+ return resolv.resolveDestinationName(session, name, jmsConfig.isPubSubDomain());
+ }
+ });
+ }
+
+ public Destination getReplyToDestination(Message inMessage) throws JMSException {
javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
// If WS-Addressing had set the replyTo header.
- String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
+ final String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
if (replyToName != null) {
- Context context = sessionFactory.getInitialContext();
- return (Queue)context.lookup(replyToName);
+ return resolveDestinationName(replyToName);
} else if (message.getJMSReplyTo() != null) {
- return (Queue)message.getJMSReplyTo();
+ return message.getJMSReplyTo();
} else {
- return (Queue)replyToDestination;
+ throw new RuntimeException("No replyTo destination set on request message or cxf message");
}
}
/**
* Decides what correlationId to use for the reply by looking at the request headers. If the request has a
- * correlationId set this is taken. Else if the useMessageIDAsCorrelationID is true then the messageId
- * from the request message is used as correlation Id
+ * correlationId set this is taken. Else the messageId from the request message is used as correlation Id
*
* @param request
* @return
@@ -209,10 +165,7 @@
*/
public String determineCorrelationID(javax.jms.Message request) throws JMSException {
String correlationID = request.getJMSCorrelationID();
- if ("".equals(correlationID)) {
- correlationID = null;
- }
- if (correlationID == null && getRuntimePolicy().isUseMessageIDAsCorrelationID()) {
+ if (correlationID == null || "".equals(correlationID)) {
correlationID = request.getJMSMessageID();
}
return correlationID;
@@ -250,10 +203,10 @@
}
}
- public void sendExchange(Exchange exchange, Object replyObj) {
+ public void sendExchange(Exchange exchange, final Object replyObj) {
Message inMessage = exchange.getInMessage();
- Message outMessage = exchange.getOutMessage();
- if (!JMSUtils.isDestinationStyleQueue(address)) {
+ final Message outMessage = exchange.getOutMessage();
+ if (jmsConfig.isPubSubDomain()) {
// we will never receive a non-oneway invocation in pub-sub
// domain from CXF client - however a mis-behaving pure JMS
// client could conceivably make suce an invocation, in which
@@ -262,12 +215,11 @@
"with 'topic' destinationStyle");
return;
}
- PooledSession replySession = null;
try {
// setup the reply message
- replySession = sessionFactory.get();
- javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
- String msgType = null;
+ final javax.jms.Message request = (javax.jms.Message)inMessage
+ .get(JMSConstants.JMS_REQUEST_MESSAGE);
+ final String msgType;
if (request instanceof TextMessage) {
msgType = JMSConstants.TEXT_MESSAGE_TYPE;
} else if (request instanceof BytesMessage) {
@@ -275,57 +227,50 @@
} else {
msgType = JMSConstants.BINARY_MESSAGE_TYPE;
}
- javax.jms.Message reply = JMSUtils
- .createAndSetPayload(replyObj, replySession.session(), msgType);
- reply.setJMSCorrelationID(determineCorrelationID(request));
- JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
- .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
- JMSUtils.setMessageProperties(headers, reply);
- // ensure that the contentType is set to the out jms message header
- JMSUtils.addContentTypeToProtocolHeader(outMessage);
- Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
- .get(Message.PROTOCOL_HEADERS));
- JMSUtils.addProtocolHeaders(reply, protHeaders);
Destination replyTo = getReplyToDestination(inMessage);
-
+ final JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)inMessage
.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
- long timeToLive = 0;
if (request.getJMSExpiration() > 0) {
TimeZone tz = new SimpleTimeZone(0, "GMT");
Calendar cal = new GregorianCalendar(tz);
- timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
- }
-
- if (timeToLive < 0) {
- getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
- return;
+ long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
+ if (timeToLive < 0) {
+ getLogger()
+ .log(Level.INFO, "Message time to live is already expired skipping response.");
+ return;
+ }
}
int deliveryMode = JMSUtils.getJMSDeliveryMode(inHeaders);
int priority = JMSUtils.getJMSPriority(inHeaders);
- long ttl = JMSUtils.getTimeToLive(headers);
- if (ttl <= 0) {
- ttl = getServerConfig().getMessageTimeToLive();
- }
- if (ttl <= 0) {
- ttl = timeToLive;
- }
+
+ jmsTemplate.setDeliveryMode(deliveryMode);
+ jmsTemplate.setPriority(priority);
getLogger().log(Level.FINE, "send out the message!");
- replySession.producer().send(replyTo, reply, deliveryMode, priority, ttl);
+ jmsTemplate.send(replyTo, new MessageCreator() {
+ public javax.jms.Message createMessage(Session session) throws JMSException {
+ javax.jms.Message reply = JMSUtils.createAndSetPayload(replyObj, session, msgType);
+
+ reply.setJMSCorrelationID(determineCorrelationID(request));
+
+ JMSUtils.setMessageProperties(headers, reply);
+ // ensure that the contentType is set to the out jms message header
+ JMSUtils.addContentTypeToProtocolHeader(outMessage);
+ Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+ .get(Message.PROTOCOL_HEADERS));
+ JMSUtils.addProtocolHeaders(reply, protHeaders);
+
+ LOG.log(Level.FINE, "server sending reply: ", reply);
+ return reply;
+ }
+ });
- getLogger().log(Level.FINE, "just server sending reply: ", reply);
- // Check the reply time limit Stream close will call for this
} catch (JMSException ex) {
- getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
- throw new RuntimeException(ex.getMessage());
- } catch (NamingException nex) {
- getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
- throw new RuntimeException(nex.getMessage());
- } finally {
- sessionFactory.recycle(replySession);
+ JmsUtils.convertJmsAccessException(ex);
}
}
@@ -333,43 +278,9 @@
return LOG;
}
- public String getBeanName() {
- return endpointInfo.getName().toString() + ".jms-destination";
- }
-
- public AddressType getJMSAddress() {
- return address;
- }
-
- public void setJMSAddress(AddressType a) {
- this.address = a;
- }
-
- public ServerBehaviorPolicyType getRuntimePolicy() {
- return runtimePolicy;
- }
-
- public void setRuntimePolicy(ServerBehaviorPolicyType runtimePolicy) {
- this.runtimePolicy = runtimePolicy;
- }
-
- public ServerConfig getServerConfig() {
- return serverConfig;
- }
-
- public void setServerConfig(ServerConfig serverConfig) {
- this.serverConfig = serverConfig;
- }
-
- public SessionPoolType getSessionPool() {
- return sessionPool;
- }
-
- public void setSessionPool(SessionPoolType sessionPool) {
- this.sessionPool = sessionPool;
- }
-
- // this should deal with the cxf message
+ /**
+ * Conduit for sending the reply back to the client
+ */
protected class BackChannelConduit extends AbstractConduit {
protected Message inMessage;
@@ -419,4 +330,12 @@
}
}
+ public JMSConfiguration getJmsConfig() {
+ return jmsConfig;
+ }
+
+ public void setJmsConfig(JMSConfiguration jmsConfig) {
+ this.jmsConfig = jmsConfig;
+ }
+
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java Mon Sep 29 13:11:34 2008
@@ -20,6 +20,19 @@
import org.apache.cxf.message.Exchange;
+/**
+ * Callback interface for JMSOutputStream
+ */
interface JMSExchangeSender {
+
+ /**
+ * Is called from JMSOutputStream.doClose() when the stream is fully
+ * written. Sends the outMessage of the given exchange with the given payload
+ * from the JMSOutputStream. If the exchange is not oneway a reply should be recieved
+ * and set as inMessage
+ *
+ * @param exchange
+ * @param payload
+ */
void sendExchange(Exchange exchange, Object payload);
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java Mon Sep 29 13:11:34 2008
@@ -19,7 +19,6 @@
package org.apache.cxf.transport.jms;
import javax.jms.ConnectionFactory;
-import javax.jms.Message;
import javax.naming.NamingException;
import org.apache.cxf.Bus;
@@ -27,15 +26,18 @@
import org.apache.cxf.service.model.EndpointInfo;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
-import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.destination.JndiDestinationResolver;
import org.springframework.jndi.JndiTemplate;
public class JMSOldConfigHolder {
- protected ClientConfig clientConfig;
- protected ClientBehaviorPolicyType runtimePolicy;
- protected AddressType address;
- protected SessionPoolType sessionPool;
+ private ClientConfig clientConfig;
+ private ClientBehaviorPolicyType runtimePolicy;
+
+ private AddressType address;
+ private SessionPoolType sessionPool;
+ private JMSConfiguration jmsConfig;
+ private ServerConfig serverConfig;
+ private ServerBehaviorPolicyType serverBehavior;
private ConnectionFactory getConnectionFactoryFromJndi(String connectionFactoryName, String userName,
String password, JndiTemplate jt) {
@@ -43,7 +45,6 @@
return null;
}
try {
-
ConnectionFactory connectionFactory = (ConnectionFactory)jt.lookup(connectionFactoryName);
UserCredentialsConnectionFactoryAdapter uccf = new UserCredentialsConnectionFactoryAdapter();
uccf.setUsername(userName);
@@ -58,20 +59,26 @@
}
}
- public JMSConfiguration createJMSConfigurationFromEndpointInfo(Bus bus, EndpointInfo endpointInfo) {
- JMSConfiguration jmsConf = new JMSConfiguration();
+ public JMSConfiguration createJMSConfigurationFromEndpointInfo(Bus bus, EndpointInfo endpointInfo,
+ boolean isConduit) {
+ jmsConfig = new JMSConfiguration();
- // Retrieve configuration information that was extracted from the wsdl
+ // Retrieve configuration information that was extracted from the WSDL
address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
ClientBehaviorPolicyType.class);
+ serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
+ sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+ serverBehavior = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
+ ServerBehaviorPolicyType.class);
+ String name = endpointInfo.getName().toString() + (isConduit ? ".jms-conduit" : ".jms-destination");
// Try to retrieve configuration information from the spring
- // config. Search for a tag <jms:conduit> with name=endpoint name + ".jms-conduit"
+ // config. Search for a conduit or destination with name=endpoint name + ".jms-conduit"
+ // or ".jms-destination"
Configurer configurer = bus.getExtension(Configurer.class);
if (null != configurer) {
- String name = endpointInfo.getName().toString() + ".jms-conduit";
configurer.configureBean(name, this);
}
@@ -80,38 +87,57 @@
ConnectionFactory cf = getConnectionFactoryFromJndi(address.getJndiConnectionFactoryName(), address
.getConnectionUserName(), address.getConnectionPassword(), jt);
- // TODO Use JmsTemplate102 in case JMS 1.1 is not available
- JmsTemplate jmsTemplate = new JmsTemplate();
- jmsTemplate.setConnectionFactory(cf);
boolean pubSubDomain = false;
if (address.isSetDestinationStyle()) {
pubSubDomain = DestinationStyleType.TOPIC == address.getDestinationStyle();
}
- jmsTemplate.setPubSubDomain(pubSubDomain);
- jmsTemplate.setReceiveTimeout(clientConfig.getClientReceiveTimeout());
- jmsTemplate.setTimeToLive(clientConfig.getMessageTimeToLive());
- jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
- jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
- jmsTemplate.setExplicitQosEnabled(true);
+ jmsConfig.setConnectionFactory(cf);
+ jmsConfig.setDurableSubscriptionName(serverBehavior.getDurableSubscriberName());
+ jmsConfig.setExplicitQosEnabled(true);
+ // jmsConfig.setMessageIdEnabled(messageIdEnabled);
+ jmsConfig.setMessageSelector(serverBehavior.getMessageSelector());
+ // jmsConfig.setMessageTimestampEnabled(messageTimestampEnabled);
+ if (runtimePolicy.isSetMessageType()) {
+ jmsConfig.setMessageType(runtimePolicy.getMessageType().value());
+ }
+ // jmsConfig.setOneWay(oneWay);
+ // jmsConfig.setPriority(priority);
+ jmsConfig.setPubSubDomain(pubSubDomain);
+ jmsConfig.setPubSubNoLocal(true);
+ jmsConfig.setReceiveTimeout(clientConfig.getClientReceiveTimeout());
+ jmsConfig.setSubscriptionDurable(serverBehavior.isSetDurableSubscriberName());
+ long timeToLive = isConduit ? clientConfig.getMessageTimeToLive() : serverConfig
+ .getMessageTimeToLive();
+ jmsConfig.setTimeToLive(timeToLive);
+ jmsConfig.setUseJms11(true);
+ boolean useJndi = address.isSetJndiDestinationName();
+ jmsConfig.setUseJndi(useJndi);
+ jmsConfig.setSessionTransacted(serverBehavior.isSetTransactional());
- if (address.isSetJndiDestinationName()) {
+ if (useJndi) {
// Setup Destination jndi destination resolver
final JndiDestinationResolver jndiDestinationResolver = new JndiDestinationResolver();
jndiDestinationResolver.setJndiTemplate(jt);
- jmsTemplate.setDestinationResolver(jndiDestinationResolver);
- jmsConf.setTargetDestination(address.getJndiDestinationName());
- jmsConf.setReplyDestination(address.getJndiReplyDestinationName());
+ jmsConfig.setDestinationResolver(jndiDestinationResolver);
+ jmsConfig.setTargetDestination(address.getJndiDestinationName());
+ jmsConfig.setReplyDestination(address.getJndiReplyDestinationName());
} else {
// Use the default dynamic destination resolver
- jmsConf.setTargetDestination(address.getJmsDestinationName());
- jmsConf.setReplyDestination(address.getJmsReplyDestinationName());
+ jmsConfig.setTargetDestination(address.getJmsDestinationName());
+ jmsConfig.setReplyDestination(address.getJmsReplyDestinationName());
}
- if (runtimePolicy.isSetMessageType()) {
- jmsConf.setMessageType(runtimePolicy.getMessageType().value());
+
+ jmsConfig.setConnectionFactory(cf);
+
+ if (jmsConfig.getTargetDestination() == null || jmsConfig.getConnectionFactory() == null) {
+ throw new RuntimeException("Insufficient configuration for "
+ + (isConduit ? "Conduit" : "Destination") + ". "
+ + "Did you configure a <jms:"
+ + (isConduit ? "conduit" : "destination") + " name=\"" + name
+ + "\"> and set the jndiConnectionFactoryName ?");
}
- jmsConf.setJmsTemplate(jmsTemplate);
- return jmsConf;
+ return jmsConfig;
}
public ClientConfig getClientConfig() {
@@ -145,4 +171,28 @@
public void setSessionPool(SessionPoolType sessionPool) {
this.sessionPool = sessionPool;
}
+
+ public JMSConfiguration getJmsConfig() {
+ return jmsConfig;
+ }
+
+ public void setJmsConfig(JMSConfiguration jmsConfig) {
+ this.jmsConfig = jmsConfig;
+ }
+
+ public ServerConfig getServerConfig() {
+ return serverConfig;
+ }
+
+ public void setServerConfig(ServerConfig serverConfig) {
+ this.serverConfig = serverConfig;
+ }
+
+ public ServerBehaviorPolicyType getServerBehavior() {
+ return serverBehavior;
+ }
+
+ public void setServerBehavior(ServerBehaviorPolicyType serverBehavior) {
+ this.serverBehavior = serverBehavior;
+ }
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java Mon Sep 29 13:11:34 2008
@@ -25,7 +25,7 @@
import org.apache.cxf.message.Exchange;
/**
- *
+ * Outputstream that sends a message when the exchange is closed
*/
class JMSOutputStream extends CachedOutputStream {
private final JMSExchangeSender sender;
@@ -44,6 +44,9 @@
}
@Override
+ /**
+ * Close the stream and send the message out
+ */
protected void doClose() throws IOException {
Object payload = retrieveRequestFromStream(isTextPayload);
this.sender.sendExchange(exchange, payload);
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java Mon Sep 29 13:11:34 2008
@@ -26,7 +26,6 @@
import javax.annotation.Resource;
import org.apache.cxf.Bus;
-import org.apache.cxf.configuration.Configurer;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractTransportFactory;
import org.apache.cxf.transport.Conduit;
@@ -44,7 +43,6 @@
}
private Bus bus;
- private JMSConfiguration jmsConfig;
@Resource(name = "cxf")
public void setBus(Bus b) {
@@ -55,37 +53,32 @@
return bus;
}
- public Conduit getConduit(EndpointInfo targetInfo) throws IOException {
- return getConduit(targetInfo, targetInfo.getTarget());
+ public Conduit getConduit(EndpointInfo endpointInfo) throws IOException {
+ return getConduit(endpointInfo, endpointInfo.getTarget());
}
+ /**
+ * {@inheritDoc}
+ */
public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target) throws IOException {
- JMSConduit conduit = target == null
- ? new JMSConduit(bus, endpointInfo) : new JMSConduit(bus, endpointInfo, target);
JMSOldConfigHolder old = new JMSOldConfigHolder();
- JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo);
- conduit.setJmsConfig(jmsConf);
- return conduit;
+ JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true);
+ JMSConduit jmsConduit = new JMSConduit(target, jmsConf);
+ jmsConduit.afterPropertiesSet();
+ return jmsConduit;
}
+ /**
+ * {@inheritDoc}
+ */
public Destination getDestination(EndpointInfo endpointInfo) throws IOException {
- JMSDestination destination = new JMSDestination(bus, this, endpointInfo);
- Configurer configurer = bus.getExtension(Configurer.class);
- if (null != configurer) {
- configurer.configureBean(destination);
- }
- return destination;
+ JMSOldConfigHolder old = new JMSOldConfigHolder();
+ JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, false);
+ return new JMSDestination(bus, endpointInfo, jmsConf);
}
public Set<String> getUriPrefixes() {
return URI_PREFIXES;
}
- public JMSConfiguration getJmsConfig() {
- return jmsConfig;
- }
-
- public void setJmsConfig(JMSConfiguration jmsConfig) {
- this.jmsConfig = jmsConfig;
- }
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Mon Sep 29 13:11:34 2008
@@ -33,19 +33,11 @@
import java.util.logging.Logger;
import javax.jms.BytesMessage;
-import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueSender;
import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.naming.Context;
-import javax.naming.NamingException;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.helpers.CastUtils;
@@ -63,37 +55,26 @@
public static Properties getInitialContextEnv(AddressType addrType) {
Properties env = new Properties();
- populateContextEnvironment(addrType, env);
-
+ java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator();
+ while (listIter.hasNext()) {
+ JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next();
+ if (null != propertyPair.getValue()) {
+ env.setProperty(propertyPair.getName(), propertyPair.getValue());
+ }
+ }
if (LOG.isLoggable(Level.FINE)) {
Enumeration props = env.propertyNames();
-
while (props.hasMoreElements()) {
String name = (String)props.nextElement();
String value = env.getProperty(name);
LOG.log(Level.FINE, "Context property: " + name + " | " + value);
}
}
-
return env;
}
- protected static void populateContextEnvironment(AddressType addrType, Properties env) {
-
- java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator();
-
- while (listIter.hasNext()) {
- JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next();
-
- if (null != propertyPair.getValue()) {
- env.setProperty(propertyPair.getName(), propertyPair.getValue());
- }
- }
- }
-
public static int getJMSDeliveryMode(JMSMessageHeadersType headers) {
int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
-
if (headers != null && headers.isSetJMSDeliveryMode()) {
deliveryMode = headers.getJMSDeliveryMode();
}
@@ -101,11 +82,8 @@
}
public static int getJMSPriority(JMSMessageHeadersType headers) {
- int priority = Message.DEFAULT_PRIORITY;
- if (headers != null && headers.isSetJMSPriority()) {
- priority = headers.getJMSPriority();
- }
- return priority;
+ return (headers != null && headers.isSetJMSPriority())
+ ? headers.getJMSPriority() : Message.DEFAULT_PRIORITY;
}
public static long getTimeToLive(JMSMessageHeadersType headers) {
@@ -118,7 +96,6 @@
public static void setMessageProperties(JMSMessageHeadersType headers, Message message)
throws JMSException {
-
if (headers != null && headers.isSetProperty()) {
List<JMSPropertyType> props = headers.getProperty();
for (int x = 0; x < props.size(); x++) {
@@ -139,7 +116,6 @@
public static Message createAndSetPayload(Object payload, Session session, String messageType)
throws JMSException {
Message message = null;
-
if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) {
message = session.createTextMessage((String)payload);
} else if (JMSConstants.BYTE_MESSAGE_TYPE.equals(messageType)) {
@@ -149,7 +125,6 @@
message = session.createObjectMessage();
((ObjectMessage)message).setObject((byte[])payload);
}
-
return message;
}
@@ -177,9 +152,8 @@
}
}
- public static JMSMessageHeadersType populateIncomingContext(javax.jms.Message message,
- org.apache.cxf.message.Message inMessage,
- String headerType) {
+ public static void populateIncomingContext(javax.jms.Message message,
+ org.apache.cxf.message.Message inMessage, String headerType) {
try {
JMSMessageHeadersType headers = null;
headers = (JMSMessageHeadersType)inMessage.get(headerType);
@@ -220,7 +194,6 @@
}
}
inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, protHeaders);
- return headers;
} catch (JMSException ex) {
throw JmsUtils.convertJmsAccessException(ex);
}
@@ -242,7 +215,7 @@
value.append(s);
first = false;
}
- // Incase if the Content-Type header key is Content-Type replace with JMS_Content_Type
+ // If the Content-Type header key is Content-Type replace with JMS_Content_Type
if (entry.getKey().equals(org.apache.cxf.message.Message.CONTENT_TYPE)) {
message.setStringProperty(JMSConstants.JMS_CONTENT_TYPE, value.toString());
} else {
@@ -252,20 +225,18 @@
}
}
- public static Map<String, List<String>> getSetProtocolHeaders(org.apache.cxf.message.Message message) {
+ public static void addContentTypeToProtocolHeader(org.apache.cxf.message.Message message) {
+ String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
+
+ // Retrieve or create protocol headers
Map<String, List<String>> headers = CastUtils.cast((Map<?, ?>)message
.get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
if (null == headers) {
headers = new HashMap<String, List<String>>();
message.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, headers);
}
- return headers;
- }
-
- public static void addContentTypeToProtocolHeader(org.apache.cxf.message.Message message) {
- String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
-
- Map<String, List<String>> headers = JMSUtils.getSetProtocolHeaders(message);
+
+ // Add content type to the protocol headers
List<String> ct;
if (headers.get(JMSConstants.JMS_CONTENT_TYPE) != null) {
ct = headers.get(JMSConstants.JMS_CONTENT_TYPE);
@@ -275,14 +246,9 @@
ct = new ArrayList<String>();
headers.put(JMSConstants.JMS_CONTENT_TYPE, ct);
}
-
ct.add(contentType);
}
- public static boolean isDestinationStyleQueue(AddressType address) {
- return JMSConstants.JMS_QUEUE.equals(address.getDestinationStyle().value());
- }
-
public static Message buildJMSMessageFromCXFMessage(org.apache.cxf.message.Message outMessage,
Object payload, String messageType, Session session,
Destination replyTo, String correlationId)
@@ -312,76 +278,19 @@
return jmsMessage;
}
- public static void sendMessage(MessageProducer producer, Destination destination, Message jmsMessage,
- long timeToLive, int deliveryMode, int priority) throws JMSException {
- /*
- * Can this be changed to producer.send(destination, jmsMessage, deliveryMode, priority, timeToLive);
- */
-
- if (destination instanceof Queue) {
- QueueSender sender = (QueueSender)producer;
- sender.setTimeToLive(timeToLive);
- sender.send((Queue)destination, jmsMessage, deliveryMode, priority, timeToLive);
- } else {
- TopicPublisher publisher = (TopicPublisher)producer;
- publisher.setTimeToLive(timeToLive);
- publisher.publish((Topic)destination, jmsMessage, deliveryMode, priority, timeToLive);
- }
- }
-
- public static Destination resolveRequestDestination(Context context, Connection connection,
- AddressType addrDetails) throws JMSException,
- NamingException {
- Destination requestDestination = null;
- // see if jndiDestination is set
- if (addrDetails.getJndiDestinationName() != null) {
- requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName());
- }
-
- // if no jndiDestination or it fails see if jmsDestination is set
- // and try to create it.
- if (requestDestination == null && addrDetails.getJmsDestinationName() != null) {
- if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
- requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
- .createQueue(addrDetails.getJmsDestinationName());
- } else {
- requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
- .createTopic(addrDetails.getJmsDestinationName());
- }
- }
- return requestDestination;
- }
-
- public static Queue resolveReplyDestination(Context context, Connection connection,
- AddressType addrDetails) throws NamingException,
- JMSException {
- Queue replyDestination = null;
-
- // Reply Destination is used (if present) only if the session is
- // point-to-point session
- if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
- if (addrDetails.getJndiReplyDestinationName() != null) {
- replyDestination = (Queue)context.lookup(addrDetails.getJndiReplyDestinationName());
- }
- if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- replyDestination = session.createQueue(addrDetails.getJmsReplyDestinationName());
- session.close();
- }
- }
- return replyDestination;
- }
-
- public static String generateUniqueSelector() {
+ /**
+ * Create a unique correlation Id from
+ * <host>_<user.name>_<currentThread><time>
+ * @return correlationId
+ */
+ public static String generateCorrelationId() {
String host = "localhost";
-
try {
InetAddress addr = InetAddress.getLocalHost();
host = addr.getHostName();
} catch (UnknownHostException ukex) {
- // Default to localhost.
+ // Default to localhost
}
-
long time = Calendar.getInstance().getTimeInMillis();
return host + "_" + System.getProperty("user.name") + "_" + Thread.currentThread() + time;
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java Mon Sep 29 13:11:34 2008
@@ -20,7 +20,10 @@
import javax.xml.namespace.QName;
+
import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
import org.apache.cxf.configuration.spring.AbstractBeanDefinitionParser;
import org.apache.cxf.transport.jms.AddressType;
@@ -40,12 +43,17 @@
bean.setAbstract(true);
mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "clientConfig"), "clientConfig",
ClientConfig.class);
- mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "runtimePolicy",
+ mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "runtimePolicy",
ClientBehaviorPolicyType.class);
- mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "address",
- AddressType.class);
- mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool",
+ mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "address", AddressType.class);
+ mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool",
SessionPoolType.class);
+ NodeList el = element.getElementsByTagNameNS(JMS_NS, "jmsConfig-ref");
+
+ if (el.getLength() == 1) {
+ Node el1 = el.item(0);
+ bean.addPropertyReference("jmsConfig", el1.getTextContent());
+ }
}
@Override
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java Mon Sep 29 13:11:34 2008
@@ -38,13 +38,13 @@
@Override
protected void doParse(Element element, ParserContext ctx, BeanDefinitionBuilder bean) {
bean.setAbstract(true);
- mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "serverConfig"), "serverConfig",
+ mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "serverConfig"), "serverConfig",
ServerConfig.class);
- mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "runtimePolicy",
+ mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "serverBehavior",
ServerBehaviorPolicyType.class);
- mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "JMSAddress",
+ mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "address",
AddressType.class);
- mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool",
+ mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool",
SessionPoolType.class);
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java Mon Sep 29 13:11:34 2008
@@ -20,6 +20,7 @@
package org.apache.cxf.transport.jms.wsdl11;
import java.util.Map;
+
import javax.wsdl.Port;
import javax.wsdl.WSDLException;
import javax.wsdl.extensions.ExtensibilityElement;
Modified: cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd (original)
+++ cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd Mon Sep 29 13:11:34 2008
@@ -51,10 +51,17 @@
<xs:complexContent>
<xs:extension base="beans:identifiedType">
<xs:sequence>
- <xs:element name="clientConfig" type="jms:ClientConfig" minOccurs="0"/>
- <xs:element name="runtimePolicy" type="jms:ClientBehaviorPolicyType" minOccurs="0"/>
- <xs:element name="sessionPool" type="jms:SessionPoolType" minOccurs="0"/>
- <xs:element name="address" type="jms:AddressType" minOccurs="0"/>
+ <xs:element name="clientConfig"
+ type="jms:ClientConfig" minOccurs="0" />
+ <xs:element name="runtimePolicy"
+ type="jms:ClientBehaviorPolicyType" minOccurs="0" />
+ <xs:element name="sessionPool"
+ type="jms:SessionPoolType" minOccurs="0" />
+ <xs:element name="address"
+ type="jms:AddressType" minOccurs="0" />
+ <xs:element name="jmsConfig-ref"
+ type="xs:string">
+ </xs:element>
</xs:sequence>
<xs:attributeGroup ref="cxf-beans:beanAttributes"/>
</xs:extension>
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Mon Sep 29 13:11:34 2008
@@ -111,11 +111,14 @@
} else {
target = EasyMock.createMock(EndpointReferenceType.class);
}
-
- JMSConduit jmsConduit = new JMSConduit(bus, endpointInfo, target);
+
JMSConfiguration jmsConfig = new JMSOldConfigHolder()
- .createJMSConfigurationFromEndpointInfo(bus, endpointInfo);
- jmsConduit.setJmsConfig(jmsConfig);
+ .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true);
+ jmsConfig.setDeliveryMode(3);
+ jmsConfig.setPriority(1);
+ jmsConfig.setTimeToLive(1000);
+ JMSConduit jmsConduit = new JMSConduit(target, jmsConfig);
+ jmsConduit.afterPropertiesSet();
if (send) {
// setMessageObserver
observer = new MessageObserver() {
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Mon Sep 29 13:11:34 2008
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.charset.Charset;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@@ -56,7 +57,7 @@
"HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
JMSConduit conduit = setupJMSConduit(false, false);
assertEquals("Can't get the right ClientReceiveTimeout", 500L, conduit.getJmsConfig()
- .getJmsTemplate().getReceiveTimeout());
+ .getReceiveTimeout());
bus.shutdown(false);
BusFactory.setDefaultBus(null);
@@ -121,21 +122,16 @@
JMSConduit conduit = setupJMSConduit(true, false);
Message msg = new MessageImpl();
conduit.prepare(msg);
- final byte[] b = testMsg.getBytes(); // TODO encoding
- JmsTemplate jmsTemplate = conduit.getJmsConfig().getJmsTemplate();
+ final byte[] testBytes = testMsg.getBytes(Charset.defaultCharset()); // TODO encoding
+ JMSConfiguration jmsConfig = conduit.getJmsConfig();
+ JmsTemplate jmsTemplate = new JmsTemplate();
+ jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory());
javax.jms.Message message = (javax.jms.Message)jmsTemplate.execute(new SessionCallback() {
-
public Object doInJms(Session session) throws JMSException {
- return JMSUtils.createAndSetPayload(b, session, JMSConstants.BYTE_MESSAGE_TYPE);
+ return JMSUtils.createAndSetPayload(testBytes, session, JMSConstants.BYTE_MESSAGE_TYPE);
}
-
});
-
assertTrue("Message should have been of type BytesMessage ", message instanceof BytesMessage);
- // byte[] returnBytes = new byte[(int)((BytesMessage) message).getBodyLength()];
- // ((BytesMessage) message).readBytes(returnBytes);
- // assertTrue("Message marshalled was incorrect",
- // testMsg.equals(new String(returnBytes)));
}
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Mon Sep 29 13:11:34 2008
@@ -31,10 +31,8 @@
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.ConduitInitiator;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.MultiplexDestination;
-import org.easymock.classextension.EasyMock;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -63,20 +61,23 @@
private void waitForReceiveDestMessage() {
int waitTime = 0;
- while (destMessage == null && waitTime < 3000) {
+ while (destMessage == null && waitTime < MAX_RECEIVE_TIME) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// do nothing here
}
- waitTime = waitTime + 1000;
+ waitTime++;
}
- assertTrue("Can't receive the Destination message in 3 seconds", destMessage != null);
+ assertTrue("Can't receive the Destination message in " + MAX_RECEIVE_TIME
+ + " seconds", destMessage != null);
}
- public JMSDestination setupJMSDestination(boolean send) throws IOException {
- ConduitInitiator conduitInitiator = EasyMock.createMock(ConduitInitiator.class);
- JMSDestination jmsDestination = new JMSDestination(bus, conduitInitiator, endpointInfo);
+ public JMSDestination setupJMSDestination(boolean send) {
+ JMSConfiguration jmsConfig = new JMSOldConfigHolder()
+ .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, false);
+ JMSDestination jmsDestination = new JMSDestination(bus, endpointInfo, jmsConfig);
+
if (send) {
// setMessageObserver
observer = new MessageObserver() {
@@ -101,24 +102,25 @@
setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
"HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
JMSDestination destination = setupJMSDestination(false);
- assertEquals("Can't get the right ServerConfig's MessageTimeToLive ", 500L, destination
- .getServerConfig().getMessageTimeToLive());
- assertEquals("Can't get the right Server's MessageSelector", "cxf_message_selector", destination
- .getRuntimePolicy().getMessageSelector());
- assertEquals("Can't get the right SessionPoolConfig's LowWaterMark", 10, destination
- .getSessionPool().getLowWaterMark());
- assertEquals("Can't get the right AddressPolicy's ConnectionPassword", "testPassword", destination
- .getJMSAddress().getConnectionPassword());
- assertEquals("Can't get the right DurableSubscriberName", "cxf_subscriber", destination
- .getRuntimePolicy().getDurableSubscriberName());
- assertEquals("Can't get the right MessageSelectorName", "cxf_message_selector", destination
- .getRuntimePolicy().getMessageSelector());
+ JMSConfiguration jmsConfig = destination.getJmsConfig();
+ //JmsTemplate jmsTemplate = destination.getJmsTemplate();
+ //AbstractMessageListenerContainer jmsListener = destination.getJmsListener();
+ assertEquals("Can't get the right ServerConfig's MessageTimeToLive ", 500L, jmsConfig
+ .getTimeToLive());
+ assertEquals("Can't get the right Server's MessageSelector", "cxf_message_selector", jmsConfig
+ .getMessageSelector());
+ // assertEquals("Can't get the right SessionPoolConfig's LowWaterMark", 10,
+ // jmsListener.getLowWaterMark());
+ // assertEquals("Can't get the right AddressPolicy's ConnectionPassword", "testPassword",
+ // .getConnectionPassword());
+ assertEquals("Can't get the right DurableSubscriberName", "cxf_subscriber", jmsConfig
+ .getDurableSubscriptionName());
BusFactory.setDefaultBus(null);
}
@Test
- public void testGetConfigurationFormWSDL() throws Exception {
+ public void testGetConfigurationFromWSDL() throws Exception {
SpringBusFactory bf = new SpringBusFactory();
BusFactory.setDefaultBus(null);
bus = bf.createBus();
@@ -129,11 +131,11 @@
JMSDestination destination = setupJMSDestination(false);
assertEquals("Can't get the right DurableSubscriberName", "CXF_subscriber", destination
- .getRuntimePolicy().getDurableSubscriberName());
+ .getJmsConfig().getDurableSubscriptionName());
- assertEquals("Can't get the right AddressPolicy's ConnectionPassword",
- "dynamicQueues/test.jmstransport.binary", destination.getJMSAddress()
- .getJndiDestinationName());
+ assertEquals("Can't get the right AddressPolicy's Destination",
+ "dynamicQueues/test.jmstransport.binary", destination.getJmsConfig()
+ .getTargetDestination());
BusFactory.setDefaultBus(null);
@@ -153,7 +155,7 @@
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
JMSDestination destination = setupJMSDestination(true);
- // destination.activate();
+ destination.activate();
sendoutMessage(conduit, outMessage, true);
// wait for the message to be get from the destination
waitForReceiveDestMessage();
@@ -161,6 +163,7 @@
assertTrue("The destiantion should have got the message ", destMessage != null);
verifyReceivedMessage(destMessage);
verifyHeaders(destMessage, outMessage);
+ conduit.close();
destination.shutdown();
}
@@ -173,14 +176,8 @@
JMSConduit conduit = setupJMSConduit(true, false);
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
- JMSDestination destination = null;
- try {
- destination = setupJMSDestination(true);
- destination.activate();
- } catch (IOException e) {
- assertFalse("The JMSDestination activate should not throw exception ", false);
- e.printStackTrace();
- }
+ JMSDestination destination = setupJMSDestination(true);
+ destination.activate();
sendoutMessage(conduit, outMessage, true);
// wait for the message to be get from the destination
waitForReceiveDestMessage();
@@ -188,6 +185,7 @@
assertTrue("The destiantion should have got the message ", destMessage != null);
verifyReceivedMessage(destMessage);
verifyHeaders(destMessage, outMessage);
+ conduit.close();
destination.shutdown();
}
@@ -248,8 +246,8 @@
public void testRoundTripDestination() throws Exception {
inMessage = null;
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl", "HelloWorldService",
- "HelloWorldPort");
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HelloWorldService", "HelloWorldPort");
// set up the conduit send to be true
JMSConduit conduit = setupJMSConduit(true, false);
final Message outMessage = new MessageImpl();
@@ -293,14 +291,14 @@
verifyReceivedMessage(inMessage);
Thread.sleep(1000);
+ conduit.close();
destination.shutdown();
}
@Test
public void testPropertyExclusion() throws Exception {
- final String customPropertyName =
- "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
+ final String customPropertyName = "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
inMessage = null;
setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
@@ -360,6 +358,7 @@
inHeader.getProperty().get(0).getName().equals(JMSConstants.JMS_CONTENT_TYPE));
// wait for a while for the jms session recycling
Thread.sleep(1000);
+ conduit.close();
destination.shutdown();
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java Mon Sep 29 13:11:34 2008
@@ -28,9 +28,6 @@
public class JMSUtilsTest extends Assert {
-
- // This is just a place holder for now it will be chaning in next task
- // when the new JMS address policies and configurations are introdced.
@Test
public void testpopulateIncomingContextNonNull() throws Exception {
AddressType addrType = new AddressType();
@@ -40,14 +37,12 @@
prop.setValue("testValue");
addrType.getJMSNamingProperty().add(prop);
- prop.setName(Context.BATCHSIZE);
- prop.setValue("12");
- addrType.getJMSNamingProperty().add(prop);
+ JMSNamingPropertyType prop2 = new JMSNamingPropertyType();
+ prop2.setName(Context.BATCHSIZE);
+ prop2.setValue("12");
+ addrType.getJMSNamingProperty().add(prop2);
-
- Properties env = new Properties();
- assertTrue(env.size() <= 0);
- JMSUtils.populateContextEnvironment(addrType, env);
+ Properties env = JMSUtils.getInitialContextEnv(addrType);
assertTrue("Environment should not be empty", env.size() > 0);
assertTrue("Environemnt should contain NamingBatchSize property", env.get(Context.BATCHSIZE) != null);
}