You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2008/09/29 22:11:34 UTC

svn commit: r700236 [1/2] - in /cxf/trunk/rt/transports/jms/src: main/java/org/apache/cxf/transport/jms/ main/java/org/apache/cxf/transport/jms/spring/ main/java/org/apache/cxf/transport/jms/wsdl11/ main/resources/schemas/configuration/ test/java/org/a...

Author: cschneider
Date: Mon Sep 29 13:11:34 2008
New Revision: 700236

URL: http://svn.apache.org/viewvc?rev=700236&view=rev
Log:
CXF-1832

Added:
    cxf/trunk/rt/transports/jms/src/test/resources/wsdl/
    cxf/trunk/rt/transports/jms/src/test/resources/wsdl/jms_test.wsdl
Removed:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSListenerThread.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java
Modified:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java
    cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java
    cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Mon Sep 29 13:11:34 2008
@@ -23,70 +23,58 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.MessageListener;
 import javax.jms.QueueSession;
 import javax.jms.Session;
 
-import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.configuration.Configurable;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
-import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.destination.DestinationResolver;
 
 /**
  * JMSConduit is instantiated by the JMSTransportfactory which is selected by a client if the transport
  * protocol starts with jms:// JMSConduit converts CXF Messages to JMS Messages and sends the request by using
- * JMS topics or queues. If the Exchange is not oneway it then recevies the response and converts it to a CXF
+ * a JMS destination. If the Exchange is not oneway it then recevies the response and converts it to a CXF
  * Message. This is then provided in the Exchange and also sent to the incomingObserver
  */
-public class JMSConduit extends AbstractConduit implements Configurable, JMSExchangeSender {
-
-    protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
-
+public class JMSConduit extends AbstractConduit implements JMSExchangeSender, MessageListener,
+    InitializingBean {
     static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
 
-    protected Bus bus;
-    protected EndpointInfo endpointInfo;
-    protected JMSConfiguration jmsConfig;
-    protected String beanNameSuffix;
+    private JMSConfiguration jmsConfig;
+    private Map<String, Message> correlationMap;
 
-    public JMSConduit(Bus b, EndpointInfo endpointInfo) {
-        this(b, endpointInfo, null);
-    }
+    private DefaultMessageListenerContainer jmsListener;
+    private JmsTemplate jmsTemplate;
 
-    public JMSConduit(Bus b, EndpointInfo endpointInfo, EndpointReferenceType target) {
+    public JMSConduit(EndpointReferenceType target, JMSConfiguration jmsConfig) {
         super(target);
-        this.bus = b;
-        this.endpointInfo = endpointInfo;
-        this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
-    }
-
-    // prepare the message for send out , not actually send out the message
-    public void prepare(Message message) throws IOException {
-        message.setContent(OutputStream.class, new JMSOutputStream(this, message.getExchange(),
-                                                                   isTextPayload()));
-        // After this step flow will continue in JMSOutputStream.doClose()
+        this.jmsConfig = jmsConfig;
+        correlationMap = new ConcurrentHashMap<String, Message>();
     }
-
-    public Destination determineReplyToDestination(final JmsTemplate jmsTemplate,
+    
+    private Destination determineReplyToDestination(final JmsTemplate jmsTemplate2,
                                                    final String replyToDestinationName,
-                                                   final boolean pubSubDomain, boolean isOneWay) {
-        if (isOneWay) {
-            return null;
-        }
-        return (Destination)jmsTemplate.execute(new SessionCallback() {
+                                                   final boolean pubSubDomain) {
+        return (Destination)jmsTemplate2.execute(new SessionCallback() {
             public Object doInJms(Session session) throws JMSException {
                 if (replyToDestinationName == null) {
                     if (session instanceof QueueSession) {
@@ -97,13 +85,56 @@
                         return session.createTemporaryQueue();
                     }
                 }
-                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+                DestinationResolver resolv = jmsTemplate2.getDestinationResolver();
                 return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
             }
         });
     }
 
     /**
+     * Initialize jmsTemplate and jmsListener from jms configuration data in jmsConfig {@inheritDoc}
+     */
+    public void afterPropertiesSet() {
+        jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102();
+        jmsTemplate.setDefaultDestinationName(jmsConfig.getTargetDestination());
+        jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory());
+        jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain());
+        jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout());
+        jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive());
+        jmsTemplate.setPriority(jmsConfig.getPriority());
+        jmsTemplate.setDeliveryMode(jmsConfig.getDeliveryMode());
+        jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled());
+        jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted());
+
+        jmsListener = new DefaultMessageListenerContainer();
+        jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
+        jmsListener.setAutoStartup(false);
+        jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory());
+        jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
+        jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
+        jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
+        jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
+        
+        jmsListener.setMessageListener(this);
+
+        if (jmsConfig.getDestinationResolver() != null) {
+            jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver());
+            jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
+        }
+    }
+
+    /**
+     * Prepare the message for send out. The message will be sent after the caller has written the payload to
+     * the OutputStream of the message and calls the close method of the stream. In the JMS case the
+     * JMSOutputStream will then call back the sendExchange method of this class. {@inheritDoc}
+     */
+    public void prepare(Message message) throws IOException {
+        boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
+        JMSOutputStream out = new JMSOutputStream(this, message.getExchange(), isTextPayload);
+        message.setContent(OutputStream.class, out);
+    }
+
+    /**
      * Send the JMS Request out and if not oneWay receive the response
      * 
      * @param outMessage
@@ -112,58 +143,93 @@
      */
     public void sendExchange(final Exchange exchange, final Object request) {
         LOG.log(Level.FINE, "JMSConduit send message");
-        final JmsTemplate jmsTemplate = jmsConfig.getJmsTemplate();
-        final Destination replyTo = determineReplyToDestination(jmsTemplate,
-                                                                jmsConfig.getReplyDestination(), jmsConfig
-                                                                    .isPubSubDomain(), exchange.isOneWay());
         final Message outMessage = exchange.getOutMessage();
         if (outMessage == null) {
             throw new RuntimeException("Exchange to be sent has no outMessage");
         }
+        
+        if (!exchange.isOneWay() && !jmsListener.isRunning()) {
+            Destination replyTo = determineReplyToDestination(jmsTemplate, 
+                                                              jmsConfig.getReplyDestination(), 
+                                                              jmsConfig.isPubSubDomain());
+            jmsListener.setDestination(replyTo);
+            jmsListener.start();
+            jmsListener.initialize();
+        }
 
         JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
             .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
         final String correlationId = (headers != null && headers.isSetJMSCorrelationID()) ? headers
-            .getJMSCorrelationID() : JMSUtils.generateUniqueSelector();
-        String selector = "JMSCorrelationID = '" + correlationId + "'";
-        
-        // TODO This is not thread safe
-        jmsTemplate.setPriority(JMSUtils.getJMSPriority(headers));
-        jmsTemplate.send(jmsConfig.getTargetDestination(), new MessageCreator() {
+            .getJMSCorrelationID() : JMSUtils.generateCorrelationId();
+        // String selector = "JMSCorrelationID = '" + correlationId + "'";
+
+        jmsTemplate.send(new MessageCreator() {
             public javax.jms.Message createMessage(Session session) throws JMSException {
                 String messageType = jmsConfig.getMessageType();
                 final javax.jms.Message jmsMessage;
                 jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(outMessage, request, messageType,
-                                                                    session, replyTo, correlationId);
+                                                                    session, jmsListener.getDestination(),
+                                                                    correlationId);
                 LOG.log(Level.FINE, "client sending request: ", jmsMessage);
                 return jmsMessage;
             }
         });
 
+        /**
+         * If the message is not oneWay we will expect to receive a reply on the listener.
+         * To receive this reply we add the correlationId and an empty CXF Message to the
+         * correlationMap. The listener will fill to Message and notify this thread
+         */
         if (!exchange.isOneWay()) {
-            javax.jms.Message jmsMessage = jmsTemplate.receiveSelected(replyTo, selector);
-            if (jmsMessage == null) {
-                throw new RuntimeException("JMS receive timed out");
-            }
             Message inMessage = new MessageImpl();
-            LOG.log(Level.FINE, "client received reply: ", jmsMessage);
-            JMSUtils
-                .populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-            byte[] response = JMSUtils.retrievePayload(jmsMessage);
-            LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
-            inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
-            exchange.setInMessage(inMessage);
+            synchronized (inMessage) {
+                correlationMap.put(correlationId, inMessage);
+                try {
+                    inMessage.wait(jmsTemplate.getReceiveTimeout());
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                correlationMap.remove(correlationId);
+            }
             if (incomingObserver != null) {
                 incomingObserver.onMessage(inMessage);
             }
+            exchange.setInMessage(inMessage);
         }
     }
 
-    private boolean isTextPayload() {
-        return JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
+    /**
+     * When a message is received on the reply destination the correlation map is searched
+     * for the correlationId. If it is found the message is converted to a CXF message and the
+     * thread sending the request is notified
+     * 
+     * {@inheritDoc}
+     */
+    public void onMessage(javax.jms.Message jmsMessage) {
+        String correlationId;
+        try {
+            correlationId = jmsMessage.getJMSCorrelationID();
+        } catch (JMSException e) {
+            throw JmsUtils.convertJmsAccessException(e);
+        }
+        Message inMessage = correlationMap.get(correlationId);
+        if (inMessage == null) {
+            LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId);
+        }
+        LOG.log(Level.FINE, "client received reply: ", jmsMessage);
+        JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+        byte[] response = JMSUtils.retrievePayload(jmsMessage);
+        LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
+        inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
+        
+        synchronized (inMessage) {
+            inMessage.notifyAll();
+        }
+
     }
 
     public void close() {
+        jmsListener.shutdown();
         LOG.log(Level.FINE, "JMSConduit closed ");
     }
 
@@ -171,10 +237,6 @@
         return LOG;
     }
 
-    public String getBeanName() {
-        return endpointInfo.getName().toString() + ".jms-conduit";
-    }
-
     public JMSConfiguration getJmsConfig() {
         return jmsConfig;
     }
@@ -183,4 +245,14 @@
         this.jmsConfig = jmsConfig;
     }
 
+    @Override
+    protected void finalize() throws Throwable {
+        if (jmsListener.isRunning()) {
+            jmsListener.shutdown();
+        }
+        super.finalize();
+    }
+    
+    
+
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Mon Sep 29 13:11:34 2008
@@ -19,52 +19,139 @@
 package org.apache.cxf.transport.jms;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.Message;
 
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Required;
 import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.listener.AbstractJmsListeningContainer;
+import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.transaction.PlatformTransactionManager;
 
 public class JMSConfiguration implements InitializingBean {
     private ConnectionFactory connectionFactory;
-    private JmsTemplate jmsTemplate;
-    private AbstractJmsListeningContainer jmsListener;
+    private DestinationResolver destinationResolver;
+    private PlatformTransactionManager transactionManager;
+    private boolean useJms11 = true;
+    private boolean useJndi;
+    private boolean messageIdEnabled = true;
+    private boolean messageTimestampEnabled = true;
+    private boolean pubSubNoLocal;
+    private long receiveTimeout = JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT;
+    private boolean explicitQosEnabled;
+    private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+    private int priority = Message.DEFAULT_PRIORITY;
+    private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+    private boolean sessionTransacted;
+
+    private volatile String messageSelector;
+    private boolean subscriptionDurable;
+    private String durableSubscriptionName;
+
     private String targetDestination;
     private String replyDestination;
-    private String messageType;
+    private String messageType = JMSConstants.TEXT_MESSAGE_TYPE;
     private boolean pubSubDomain;
 
-    public JMSConfiguration() {
-        targetDestination = null;
-        replyDestination = null;
-        messageType = JMSConstants.TEXT_MESSAGE_TYPE;
-        pubSubDomain = false;
+    public boolean isUseJndi() {
+        return useJndi;
     }
 
-    public void afterPropertiesSet() throws Exception {
-        /*
-         * if (connectionFactory == null) { throw new RuntimeException("Required property connectionfactory
-         * was not set"); } jmsTemplate.setConnectionFactory(connectionFactory);
-         * jmsListener.setConnectionFactory(connectionFactory);
-         */
+    public void setUseJndi(boolean useJndi) {
+        this.useJndi = useJndi;
     }
 
-    public JmsTemplate getJmsTemplate() {
-        return jmsTemplate;
+    public boolean isMessageIdEnabled() {
+        return messageIdEnabled;
     }
 
-    @Required
-    public void setJmsTemplate(JmsTemplate jmsTemplate) {
-        this.jmsTemplate = jmsTemplate;
+    public void setMessageIdEnabled(boolean messageIdEnabled) {
+        this.messageIdEnabled = messageIdEnabled;
     }
 
-    public AbstractJmsListeningContainer getJmsListener() {
-        return jmsListener;
+    public boolean isMessageTimestampEnabled() {
+        return messageTimestampEnabled;
     }
 
-    @Required
-    public void setJmsListener(AbstractJmsListeningContainer jmsListener) {
-        this.jmsListener = jmsListener;
+    public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
+        this.messageTimestampEnabled = messageTimestampEnabled;
+    }
+
+    public boolean isPubSubNoLocal() {
+        return pubSubNoLocal;
+    }
+
+    public void setPubSubNoLocal(boolean pubSubNoLocal) {
+        this.pubSubNoLocal = pubSubNoLocal;
+    }
+
+    public long getReceiveTimeout() {
+        return receiveTimeout;
+    }
+
+    public void setReceiveTimeout(long receiveTimeout) {
+        this.receiveTimeout = receiveTimeout;
+    }
+
+    public boolean isExplicitQosEnabled() {
+        return explicitQosEnabled;
+    }
+
+    public void setExplicitQosEnabled(boolean explicitQosEnabled) {
+        this.explicitQosEnabled = explicitQosEnabled;
+    }
+
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    public void setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+
+    public long getTimeToLive() {
+        return timeToLive;
+    }
+
+    public void setTimeToLive(long timeToLive) {
+        this.timeToLive = timeToLive;
+    }
+
+    public String getMessageSelector() {
+        return messageSelector;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    public boolean isSubscriptionDurable() {
+        return subscriptionDurable;
+    }
+
+    public void setSubscriptionDurable(boolean subscriptionDurable) {
+        this.subscriptionDurable = subscriptionDurable;
+    }
+
+    public String getDurableSubscriptionName() {
+        return durableSubscriptionName;
+    }
+
+    public void setDurableSubscriptionName(String durableSubscriptionName) {
+        this.durableSubscriptionName = durableSubscriptionName;
+    }
+
+    public void afterPropertiesSet() throws Exception {
+        if (connectionFactory == null) {
+            throw new RuntimeException("Required property connectionfactory was not set");
+        }
     }
 
     public ConnectionFactory getConnectionFactory() {
@@ -108,4 +195,36 @@
         this.pubSubDomain = pubSubDomain;
     }
 
+    public boolean isUseJms11() {
+        return useJms11;
+    }
+
+    public void setUseJms11(boolean useJms11) {
+        this.useJms11 = useJms11;
+    }
+
+    public DestinationResolver getDestinationResolver() {
+        return destinationResolver;
+    }
+
+    public void setDestinationResolver(DestinationResolver destinationResolver) {
+        this.destinationResolver = destinationResolver;
+    }
+
+    public boolean isSessionTransacted() {
+        return sessionTransacted;
+    }
+
+    public void setSessionTransacted(boolean sessionTransacted) {
+        this.sessionTransacted = sessionTransacted;
+    }
+
+    public PlatformTransactionManager getTransactionManager() {
+        return transactionManager;
+    }
+
+    public void setTransactionManager(PlatformTransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Mon Sep 29 13:11:34 2008
@@ -29,27 +29,19 @@
 import java.util.Map;
 import java.util.SimpleTimeZone;
 import java.util.TimeZone;
-import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.BytesMessage;
-import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
-import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.configuration.Configurable;
-import org.apache.cxf.configuration.Configurer;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
@@ -58,56 +50,31 @@
 import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.transport.AbstractMultiplexDestination;
 import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.ConduitInitiator;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.workqueue.SynchronousExecutor;
-import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.JmsUtils;
+import org.springframework.jms.support.destination.DestinationResolver;
 
-public class JMSDestination extends AbstractMultiplexDestination implements Configurable, MessageListener,
+public class JMSDestination extends AbstractMultiplexDestination implements MessageListener,
     JMSExchangeSender {
 
-    protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
-
     private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
 
-    protected ServerConfig serverConfig;
-    protected ServerBehaviorPolicyType runtimePolicy;
-    protected AddressType address;
-    protected SessionPoolType sessionPool;
-    protected Destination targetDestination;
-    protected Destination replyToDestination;
-    protected JMSSessionFactory sessionFactory;
-    protected Bus bus;
-    protected EndpointInfo endpointInfo;
-    protected String beanNameSuffix;
-
-    final ConduitInitiator conduitInitiator;
-    Session listenerSession;
-    JMSListenerThread listenerThread;
+    private JMSConfiguration jmsConfig;
+    private Bus bus;
+    private DefaultMessageListenerContainer jmsListener;
+    private JmsTemplate jmsTemplate;
 
-    public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info) throws IOException {
+    public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
         super(b, getTargetReference(info, b), info);
-
         this.bus = b;
-        this.endpointInfo = info;
-        this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
-        conduitInitiator = ci;
-
-        initConfig();
-    }
-
-    private void initConfig() {
-        this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
-                                                               ServerBehaviorPolicyType.class);
-        this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
-        this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
-        this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
-        Configurer configurer = bus.getExtension(Configurer.class);
-        if (null != configurer) {
-            configurer.configureBean(this);
-        }
+        this.jmsConfig = jmsConfig;
     }
 
     /**
@@ -119,64 +86,46 @@
         return new BackChannelConduit(this, anon, inMessage);
     }
 
-    private Executor getExecutor(WorkQueueManager wqm, QName name) {
-        // Fallback if no Workqueuemanager
-        Executor executor = SynchronousExecutor.getInstance();
-        if (wqm != null) {
-            if (name != null) {
-                executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}" + name.getLocalPart());
-            }
-            if (executor == null) {
-                executor = wqm.getNamedWorkQueue("jms");
-            }
-            if (executor == null) {
-                executor = wqm.getAutomaticWorkQueue();
-            }
-        }
-        return executor;
-    }
-
     /**
-     * Initialize Sessionfactory, Initialize and start ListenerThread {@inheritDoc}
+     * Initialize jmsTemplate and jmsListener from jms configuration data in jmsConfig {@inheritDoc}
      */
     public void activate() {
         getLogger().log(Level.INFO, "JMSDestination activate().... ");
 
-        if (this.address == null || this.address.getJndiConnectionFactoryName() == null) {
-            throw new RuntimeException("Insufficient configuration for Destination. "
-                                       + "Did you configure a <jms:destination name=\"" + getBeanName()
-                                       + "\"> and set the jndiConnectionFactoryName ?");
+        jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102();
+        jmsTemplate.setDefaultDestinationName(jmsConfig.getReplyDestination());
+        jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory());
+        jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain());
+        jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout());
+        jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive());
+        jmsTemplate.setPriority(jmsConfig.getPriority());
+        jmsTemplate.setDeliveryMode(jmsConfig.getDeliveryMode());
+        jmsTemplate.setExplicitQosEnabled(true);
+        jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted());
+
+        jmsListener = new DefaultMessageListenerContainer();
+        jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
+        jmsListener.setAutoStartup(true);
+        jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory());
+        jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
+        jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
+        jmsListener.setDestinationName(jmsConfig.getTargetDestination());
+        jmsListener.setMessageListener(this);
+        jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
+        jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
+
+        if (jmsConfig.getDestinationResolver() != null) {
+            jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver());
+            jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
         }
 
-        try {
-            getLogger().log(Level.FINE, "establishing JMS connection");
-            sessionFactory = JMSSessionFactory.connect(getJMSAddress(), getSessionPool(), serverConfig);
-            Connection connection = sessionFactory.getConnection();
-            Context context = sessionFactory.getInitialContext();
-            this.targetDestination = JMSUtils.resolveRequestDestination(context, connection, address);
-            this.replyToDestination = JMSUtils.resolveRequestDestination(context, connection, address);
-            WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
-            QName name = null;
-            if (endpointInfo != null) {
-                name = endpointInfo.getName();
-            }
-            Executor executor = getExecutor(wqm, name);
-            String messageSelector = runtimePolicy.getMessageSelector();
-            String durableName = runtimePolicy.getDurableSubscriberName();
-            listenerThread = new JMSListenerThread(executor, this);
-            listenerThread.start(connection, targetDestination, messageSelector, durableName);
-        } catch (JMSException ex) {
-            getLogger().log(Level.SEVERE, "JMS connect failed with JMSException : ", ex);
-        } catch (NamingException nex) {
-            getLogger().log(Level.SEVERE, "JMS connect failed with NamingException : ", nex);
+        if (!jmsListener.isRunning()) {
+            jmsListener.initialize();
         }
     }
 
     public void deactivate() {
-        if (listenerThread != null) {
-            listenerThread.close();
-        }
-        sessionFactory.shutdown();
+        jmsListener.shutdown();
     }
 
     public void shutdown() {
@@ -184,24 +133,31 @@
         this.deactivate();
     }
 
-    public Queue getReplyToDestination(Message inMessage) throws JMSException, NamingException {
+    private Destination resolveDestinationName(final String name) {
+        return (Destination)jmsTemplate.execute(new SessionCallback() {
+            public Object doInJms(Session session) throws JMSException {
+                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+                return resolv.resolveDestinationName(session, name, jmsConfig.isPubSubDomain());
+            }
+        });
+    }
+
+    public Destination getReplyToDestination(Message inMessage) throws JMSException {
         javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
         // If WS-Addressing had set the replyTo header.
-        String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
+        final String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
         if (replyToName != null) {
-            Context context = sessionFactory.getInitialContext();
-            return (Queue)context.lookup(replyToName);
+            return resolveDestinationName(replyToName);
         } else if (message.getJMSReplyTo() != null) {
-            return (Queue)message.getJMSReplyTo();
+            return message.getJMSReplyTo();
         } else {
-            return (Queue)replyToDestination;
+            throw new RuntimeException("No replyTo destination set on request message or cxf message");
         }
     }
 
     /**
      * Decides what correlationId to use for the reply by looking at the request headers. If the request has a
-     * correlationId set this is taken. Else if the useMessageIDAsCorrelationID is true then the messageId
-     * from the request message is used as correlation Id
+     * correlationId set this is taken. Else the messageId from the request message is used as correlation Id
      * 
      * @param request
      * @return
@@ -209,10 +165,7 @@
      */
     public String determineCorrelationID(javax.jms.Message request) throws JMSException {
         String correlationID = request.getJMSCorrelationID();
-        if ("".equals(correlationID)) {
-            correlationID = null;
-        }
-        if (correlationID == null && getRuntimePolicy().isUseMessageIDAsCorrelationID()) {
+        if (correlationID == null || "".equals(correlationID)) {
             correlationID = request.getJMSMessageID();
         }
         return correlationID;
@@ -250,10 +203,10 @@
         }
     }
 
-    public void sendExchange(Exchange exchange, Object replyObj) {
+    public void sendExchange(Exchange exchange, final Object replyObj) {
         Message inMessage = exchange.getInMessage();
-        Message outMessage = exchange.getOutMessage();
-        if (!JMSUtils.isDestinationStyleQueue(address)) {
+        final Message outMessage = exchange.getOutMessage();
+        if (jmsConfig.isPubSubDomain()) {
             // we will never receive a non-oneway invocation in pub-sub
             // domain from CXF client - however a mis-behaving pure JMS
             // client could conceivably make suce an invocation, in which
@@ -262,12 +215,11 @@
                             "with 'topic' destinationStyle");
             return;
         }
-        PooledSession replySession = null;
         try {
             // setup the reply message
-            replySession = sessionFactory.get();
-            javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
-            String msgType = null;
+            final javax.jms.Message request = (javax.jms.Message)inMessage
+                .get(JMSConstants.JMS_REQUEST_MESSAGE);
+            final String msgType;
             if (request instanceof TextMessage) {
                 msgType = JMSConstants.TEXT_MESSAGE_TYPE;
             } else if (request instanceof BytesMessage) {
@@ -275,57 +227,50 @@
             } else {
                 msgType = JMSConstants.BINARY_MESSAGE_TYPE;
             }
-            javax.jms.Message reply = JMSUtils
-                .createAndSetPayload(replyObj, replySession.session(), msgType);
 
-            reply.setJMSCorrelationID(determineCorrelationID(request));
-            JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
-                .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
-            JMSUtils.setMessageProperties(headers, reply);
-            // ensure that the contentType is set to the out jms message header
-            JMSUtils.addContentTypeToProtocolHeader(outMessage);
-            Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
-                .get(Message.PROTOCOL_HEADERS));
-            JMSUtils.addProtocolHeaders(reply, protHeaders);
             Destination replyTo = getReplyToDestination(inMessage);
-
+            final JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+                .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
             JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)inMessage
                 .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
 
-            long timeToLive = 0;
             if (request.getJMSExpiration() > 0) {
                 TimeZone tz = new SimpleTimeZone(0, "GMT");
                 Calendar cal = new GregorianCalendar(tz);
-                timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
-            }
-
-            if (timeToLive < 0) {
-                getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
-                return;
+                long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
+                if (timeToLive < 0) {
+                    getLogger()
+                        .log(Level.INFO, "Message time to live is already expired skipping response.");
+                    return;
+                }
             }
 
             int deliveryMode = JMSUtils.getJMSDeliveryMode(inHeaders);
             int priority = JMSUtils.getJMSPriority(inHeaders);
-            long ttl = JMSUtils.getTimeToLive(headers);
-            if (ttl <= 0) {
-                ttl = getServerConfig().getMessageTimeToLive();
-            }
-            if (ttl <= 0) {
-                ttl = timeToLive;
-            }
+
+            jmsTemplate.setDeliveryMode(deliveryMode);
+            jmsTemplate.setPriority(priority);
             getLogger().log(Level.FINE, "send out the message!");
-            replySession.producer().send(replyTo, reply, deliveryMode, priority, ttl);
+            jmsTemplate.send(replyTo, new MessageCreator() {
+                public javax.jms.Message createMessage(Session session) throws JMSException {
+                    javax.jms.Message reply = JMSUtils.createAndSetPayload(replyObj, session, msgType);
+
+                    reply.setJMSCorrelationID(determineCorrelationID(request));
+
+                    JMSUtils.setMessageProperties(headers, reply);
+                    // ensure that the contentType is set to the out jms message header
+                    JMSUtils.addContentTypeToProtocolHeader(outMessage);
+                    Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+                        .get(Message.PROTOCOL_HEADERS));
+                    JMSUtils.addProtocolHeaders(reply, protHeaders);
+
+                    LOG.log(Level.FINE, "server sending reply: ", reply);
+                    return reply;
+                }
+            });
 
-            getLogger().log(Level.FINE, "just server sending reply: ", reply);
-            // Check the reply time limit Stream close will call for this
         } catch (JMSException ex) {
-            getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
-            throw new RuntimeException(ex.getMessage());
-        } catch (NamingException nex) {
-            getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
-            throw new RuntimeException(nex.getMessage());
-        } finally {
-            sessionFactory.recycle(replySession);
+            JmsUtils.convertJmsAccessException(ex);
         }
     }
 
@@ -333,43 +278,9 @@
         return LOG;
     }
 
-    public String getBeanName() {
-        return endpointInfo.getName().toString() + ".jms-destination";
-    }
-
-    public AddressType getJMSAddress() {
-        return address;
-    }
-
-    public void setJMSAddress(AddressType a) {
-        this.address = a;
-    }
-
-    public ServerBehaviorPolicyType getRuntimePolicy() {
-        return runtimePolicy;
-    }
-
-    public void setRuntimePolicy(ServerBehaviorPolicyType runtimePolicy) {
-        this.runtimePolicy = runtimePolicy;
-    }
-
-    public ServerConfig getServerConfig() {
-        return serverConfig;
-    }
-
-    public void setServerConfig(ServerConfig serverConfig) {
-        this.serverConfig = serverConfig;
-    }
-
-    public SessionPoolType getSessionPool() {
-        return sessionPool;
-    }
-
-    public void setSessionPool(SessionPoolType sessionPool) {
-        this.sessionPool = sessionPool;
-    }
-
-    // this should deal with the cxf message
+    /**
+     * Conduit for sending the reply back to the client
+     */
     protected class BackChannelConduit extends AbstractConduit {
 
         protected Message inMessage;
@@ -419,4 +330,12 @@
         }
     }
 
+    public JMSConfiguration getJmsConfig() {
+        return jmsConfig;
+    }
+
+    public void setJmsConfig(JMSConfiguration jmsConfig) {
+        this.jmsConfig = jmsConfig;
+    }
+
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java Mon Sep 29 13:11:34 2008
@@ -20,6 +20,19 @@
 
 import org.apache.cxf.message.Exchange;
 
+/**
+ * Callback interface for JMSOutputStream
+ */
 interface JMSExchangeSender {
+    
+    /**
+     * Is called from JMSOutputStream.doClose() when the stream is fully
+     * written. Sends the outMessage of the given exchange with the given payload
+     * from the JMSOutputStream. If the exchange is not oneway a reply should be recieved
+     * and set as inMessage
+     * 
+     * @param exchange
+     * @param payload
+     */
     void sendExchange(Exchange exchange, Object payload);
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java Mon Sep 29 13:11:34 2008
@@ -19,7 +19,6 @@
 package org.apache.cxf.transport.jms;
 
 import javax.jms.ConnectionFactory;
-import javax.jms.Message;
 import javax.naming.NamingException;
 
 import org.apache.cxf.Bus;
@@ -27,15 +26,18 @@
 import org.apache.cxf.service.model.EndpointInfo;
 import org.springframework.jms.connection.SingleConnectionFactory;
 import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
-import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.destination.JndiDestinationResolver;
 import org.springframework.jndi.JndiTemplate;
 
 public class JMSOldConfigHolder {
-    protected ClientConfig clientConfig;
-    protected ClientBehaviorPolicyType runtimePolicy;
-    protected AddressType address;
-    protected SessionPoolType sessionPool;
+    private ClientConfig clientConfig;
+    private ClientBehaviorPolicyType runtimePolicy;
+
+    private AddressType address;
+    private SessionPoolType sessionPool;
+    private JMSConfiguration jmsConfig;
+    private ServerConfig serverConfig;
+    private ServerBehaviorPolicyType serverBehavior;
 
     private ConnectionFactory getConnectionFactoryFromJndi(String connectionFactoryName, String userName,
                                                            String password, JndiTemplate jt) {
@@ -43,7 +45,6 @@
             return null;
         }
         try {
-
             ConnectionFactory connectionFactory = (ConnectionFactory)jt.lookup(connectionFactoryName);
             UserCredentialsConnectionFactoryAdapter uccf = new UserCredentialsConnectionFactoryAdapter();
             uccf.setUsername(userName);
@@ -58,20 +59,26 @@
         }
     }
 
-    public JMSConfiguration createJMSConfigurationFromEndpointInfo(Bus bus, EndpointInfo endpointInfo) {
-        JMSConfiguration jmsConf = new JMSConfiguration();
+    public JMSConfiguration createJMSConfigurationFromEndpointInfo(Bus bus, EndpointInfo endpointInfo,
+                                                                   boolean isConduit) {
+        jmsConfig = new JMSConfiguration();
 
-        // Retrieve configuration information that was extracted from the wsdl
+        // Retrieve configuration information that was extracted from the WSDL
         address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
         clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
         runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
                                                           ClientBehaviorPolicyType.class);
+        serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
+        sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+        serverBehavior = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
+                                                           ServerBehaviorPolicyType.class);
+        String name = endpointInfo.getName().toString() + (isConduit ? ".jms-conduit" : ".jms-destination");
 
         // Try to retrieve configuration information from the spring
-        // config. Search for a tag <jms:conduit> with name=endpoint name + ".jms-conduit"
+        // config. Search for a conduit or destination with name=endpoint name + ".jms-conduit"
+        // or ".jms-destination"
         Configurer configurer = bus.getExtension(Configurer.class);
         if (null != configurer) {
-            String name = endpointInfo.getName().toString() + ".jms-conduit";
             configurer.configureBean(name, this);
         }
 
@@ -80,38 +87,57 @@
         ConnectionFactory cf = getConnectionFactoryFromJndi(address.getJndiConnectionFactoryName(), address
             .getConnectionUserName(), address.getConnectionPassword(), jt);
 
-        // TODO Use JmsTemplate102 in case JMS 1.1 is not available
-        JmsTemplate jmsTemplate = new JmsTemplate();
-        jmsTemplate.setConnectionFactory(cf);
         boolean pubSubDomain = false;
         if (address.isSetDestinationStyle()) {
             pubSubDomain = DestinationStyleType.TOPIC == address.getDestinationStyle();
         }
-        jmsTemplate.setPubSubDomain(pubSubDomain);
-        jmsTemplate.setReceiveTimeout(clientConfig.getClientReceiveTimeout());
-        jmsTemplate.setTimeToLive(clientConfig.getMessageTimeToLive());
-        jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
-        jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
-        jmsTemplate.setExplicitQosEnabled(true);
+        jmsConfig.setConnectionFactory(cf);
+        jmsConfig.setDurableSubscriptionName(serverBehavior.getDurableSubscriberName());
+        jmsConfig.setExplicitQosEnabled(true);
+        // jmsConfig.setMessageIdEnabled(messageIdEnabled);
+        jmsConfig.setMessageSelector(serverBehavior.getMessageSelector());
+        // jmsConfig.setMessageTimestampEnabled(messageTimestampEnabled);
+        if (runtimePolicy.isSetMessageType()) {
+            jmsConfig.setMessageType(runtimePolicy.getMessageType().value());
+        }
+        // jmsConfig.setOneWay(oneWay);
+        // jmsConfig.setPriority(priority);
+        jmsConfig.setPubSubDomain(pubSubDomain);
+        jmsConfig.setPubSubNoLocal(true);
+        jmsConfig.setReceiveTimeout(clientConfig.getClientReceiveTimeout());
+        jmsConfig.setSubscriptionDurable(serverBehavior.isSetDurableSubscriberName());
+        long timeToLive = isConduit ? clientConfig.getMessageTimeToLive() : serverConfig
+            .getMessageTimeToLive();
+        jmsConfig.setTimeToLive(timeToLive);
+        jmsConfig.setUseJms11(true);
+        boolean useJndi = address.isSetJndiDestinationName();
+        jmsConfig.setUseJndi(useJndi);
+        jmsConfig.setSessionTransacted(serverBehavior.isSetTransactional());
 
-        if (address.isSetJndiDestinationName()) {
+        if (useJndi) {
             // Setup Destination jndi destination resolver
             final JndiDestinationResolver jndiDestinationResolver = new JndiDestinationResolver();
             jndiDestinationResolver.setJndiTemplate(jt);
-            jmsTemplate.setDestinationResolver(jndiDestinationResolver);
-            jmsConf.setTargetDestination(address.getJndiDestinationName());
-            jmsConf.setReplyDestination(address.getJndiReplyDestinationName());
+            jmsConfig.setDestinationResolver(jndiDestinationResolver);
+            jmsConfig.setTargetDestination(address.getJndiDestinationName());
+            jmsConfig.setReplyDestination(address.getJndiReplyDestinationName());
         } else {
             // Use the default dynamic destination resolver
-            jmsConf.setTargetDestination(address.getJmsDestinationName());
-            jmsConf.setReplyDestination(address.getJmsReplyDestinationName());
+            jmsConfig.setTargetDestination(address.getJmsDestinationName());
+            jmsConfig.setReplyDestination(address.getJmsReplyDestinationName());
         }
-        if (runtimePolicy.isSetMessageType()) {
-            jmsConf.setMessageType(runtimePolicy.getMessageType().value());
+
+        jmsConfig.setConnectionFactory(cf);
+
+        if (jmsConfig.getTargetDestination() == null || jmsConfig.getConnectionFactory() == null) {
+            throw new RuntimeException("Insufficient configuration for "
+                                       + (isConduit ? "Conduit" : "Destination") + ". "
+                                       + "Did you configure a <jms:"
+                                       + (isConduit ? "conduit" : "destination") + " name=\"" + name
+                                       + "\"> and set the jndiConnectionFactoryName ?");
         }
 
-        jmsConf.setJmsTemplate(jmsTemplate);
-        return jmsConf;
+        return jmsConfig;
     }
 
     public ClientConfig getClientConfig() {
@@ -145,4 +171,28 @@
     public void setSessionPool(SessionPoolType sessionPool) {
         this.sessionPool = sessionPool;
     }
+
+    public JMSConfiguration getJmsConfig() {
+        return jmsConfig;
+    }
+
+    public void setJmsConfig(JMSConfiguration jmsConfig) {
+        this.jmsConfig = jmsConfig;
+    }
+
+    public ServerConfig getServerConfig() {
+        return serverConfig;
+    }
+
+    public void setServerConfig(ServerConfig serverConfig) {
+        this.serverConfig = serverConfig;
+    }
+
+    public ServerBehaviorPolicyType getServerBehavior() {
+        return serverBehavior;
+    }
+
+    public void setServerBehavior(ServerBehaviorPolicyType serverBehavior) {
+        this.serverBehavior = serverBehavior;
+    }
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java Mon Sep 29 13:11:34 2008
@@ -25,7 +25,7 @@
 import org.apache.cxf.message.Exchange;
 
 /**
- *
+ * Outputstream that sends a message when the exchange is closed
  */
 class JMSOutputStream extends CachedOutputStream {
     private final JMSExchangeSender sender;
@@ -44,6 +44,9 @@
     }
 
     @Override
+    /**
+     * Close the stream and send the message out
+     */
     protected void doClose() throws IOException {
         Object payload = retrieveRequestFromStream(isTextPayload);
         this.sender.sendExchange(exchange, payload);

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java Mon Sep 29 13:11:34 2008
@@ -26,7 +26,6 @@
 import javax.annotation.Resource;
 
 import org.apache.cxf.Bus;
-import org.apache.cxf.configuration.Configurer;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractTransportFactory;
 import org.apache.cxf.transport.Conduit;
@@ -44,7 +43,6 @@
     }
 
     private Bus bus;
-    private JMSConfiguration jmsConfig;
 
     @Resource(name = "cxf")
     public void setBus(Bus b) {
@@ -55,37 +53,32 @@
         return bus;
     }
 
-    public Conduit getConduit(EndpointInfo targetInfo) throws IOException {
-        return getConduit(targetInfo, targetInfo.getTarget());
+    public Conduit getConduit(EndpointInfo endpointInfo) throws IOException {
+        return getConduit(endpointInfo, endpointInfo.getTarget());
     }
 
+    /**
+     * {@inheritDoc}
+     */
     public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target) throws IOException {
-        JMSConduit conduit = target == null
-            ? new JMSConduit(bus, endpointInfo) : new JMSConduit(bus, endpointInfo, target);
         JMSOldConfigHolder old = new JMSOldConfigHolder();
-        JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo);
-        conduit.setJmsConfig(jmsConf);
-        return conduit;
+        JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true);
+        JMSConduit jmsConduit = new JMSConduit(target, jmsConf);
+        jmsConduit.afterPropertiesSet();
+        return jmsConduit;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     public Destination getDestination(EndpointInfo endpointInfo) throws IOException {
-        JMSDestination destination = new JMSDestination(bus, this, endpointInfo);
-        Configurer configurer = bus.getExtension(Configurer.class);
-        if (null != configurer) {
-            configurer.configureBean(destination);
-        }
-        return destination;
+        JMSOldConfigHolder old = new JMSOldConfigHolder();
+        JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, false);
+        return new JMSDestination(bus, endpointInfo, jmsConf);
     }
     
     public Set<String> getUriPrefixes() {
         return URI_PREFIXES;
     }
 
-    public JMSConfiguration getJmsConfig() {
-        return jmsConfig;
-    }
-
-    public void setJmsConfig(JMSConfiguration jmsConfig) {
-        this.jmsConfig = jmsConfig;
-    }
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Mon Sep 29 13:11:34 2008
@@ -33,19 +33,11 @@
 import java.util.logging.Logger;
 
 import javax.jms.BytesMessage;
-import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueSender;
 import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.naming.Context;
-import javax.naming.NamingException;
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.helpers.CastUtils;
@@ -63,37 +55,26 @@
 
     public static Properties getInitialContextEnv(AddressType addrType) {
         Properties env = new Properties();
-        populateContextEnvironment(addrType, env);
-
+        java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator();
+        while (listIter.hasNext()) {
+            JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next();
+            if (null != propertyPair.getValue()) {
+                env.setProperty(propertyPair.getName(), propertyPair.getValue());
+            }
+        }
         if (LOG.isLoggable(Level.FINE)) {
             Enumeration props = env.propertyNames();
-
             while (props.hasMoreElements()) {
                 String name = (String)props.nextElement();
                 String value = env.getProperty(name);
                 LOG.log(Level.FINE, "Context property: " + name + " | " + value);
             }
         }
-
         return env;
     }
 
-    protected static void populateContextEnvironment(AddressType addrType, Properties env) {
-
-        java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator();
-
-        while (listIter.hasNext()) {
-            JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next();
-
-            if (null != propertyPair.getValue()) {
-                env.setProperty(propertyPair.getName(), propertyPair.getValue());
-            }
-        }
-    }
-
     public static int getJMSDeliveryMode(JMSMessageHeadersType headers) {
         int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
-
         if (headers != null && headers.isSetJMSDeliveryMode()) {
             deliveryMode = headers.getJMSDeliveryMode();
         }
@@ -101,11 +82,8 @@
     }
 
     public static int getJMSPriority(JMSMessageHeadersType headers) {
-        int priority = Message.DEFAULT_PRIORITY;
-        if (headers != null && headers.isSetJMSPriority()) {
-            priority = headers.getJMSPriority();
-        }
-        return priority;
+        return (headers != null && headers.isSetJMSPriority())
+            ? headers.getJMSPriority() : Message.DEFAULT_PRIORITY;
     }
 
     public static long getTimeToLive(JMSMessageHeadersType headers) {
@@ -118,7 +96,6 @@
 
     public static void setMessageProperties(JMSMessageHeadersType headers, Message message)
         throws JMSException {
-
         if (headers != null && headers.isSetProperty()) {
             List<JMSPropertyType> props = headers.getProperty();
             for (int x = 0; x < props.size(); x++) {
@@ -139,7 +116,6 @@
     public static Message createAndSetPayload(Object payload, Session session, String messageType)
         throws JMSException {
         Message message = null;
-
         if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) {
             message = session.createTextMessage((String)payload);
         } else if (JMSConstants.BYTE_MESSAGE_TYPE.equals(messageType)) {
@@ -149,7 +125,6 @@
             message = session.createObjectMessage();
             ((ObjectMessage)message).setObject((byte[])payload);
         }
-
         return message;
     }
 
@@ -177,9 +152,8 @@
         }
     }
 
-    public static JMSMessageHeadersType populateIncomingContext(javax.jms.Message message,
-                                                                org.apache.cxf.message.Message inMessage,
-                                                                String headerType) {
+    public static void populateIncomingContext(javax.jms.Message message,
+                                               org.apache.cxf.message.Message inMessage, String headerType) {
         try {
             JMSMessageHeadersType headers = null;
             headers = (JMSMessageHeadersType)inMessage.get(headerType);
@@ -220,7 +194,6 @@
                 }
             }
             inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, protHeaders);
-            return headers;
         } catch (JMSException ex) {
             throw JmsUtils.convertJmsAccessException(ex);
         }
@@ -242,7 +215,7 @@
                 value.append(s);
                 first = false;
             }
-            // Incase if the Content-Type header key is Content-Type replace with JMS_Content_Type
+            // If the Content-Type header key is Content-Type replace with JMS_Content_Type
             if (entry.getKey().equals(org.apache.cxf.message.Message.CONTENT_TYPE)) {
                 message.setStringProperty(JMSConstants.JMS_CONTENT_TYPE, value.toString());
             } else {
@@ -252,20 +225,18 @@
         }
     }
 
-    public static Map<String, List<String>> getSetProtocolHeaders(org.apache.cxf.message.Message message) {
+    public static void addContentTypeToProtocolHeader(org.apache.cxf.message.Message message) {
+        String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
+
+        // Retrieve or create protocol headers
         Map<String, List<String>> headers = CastUtils.cast((Map<?, ?>)message
             .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
         if (null == headers) {
             headers = new HashMap<String, List<String>>();
             message.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, headers);
         }
-        return headers;
-    }
-
-    public static void addContentTypeToProtocolHeader(org.apache.cxf.message.Message message) {
-        String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
-
-        Map<String, List<String>> headers = JMSUtils.getSetProtocolHeaders(message);
+        
+        // Add content type to the protocol headers
         List<String> ct;
         if (headers.get(JMSConstants.JMS_CONTENT_TYPE) != null) {
             ct = headers.get(JMSConstants.JMS_CONTENT_TYPE);
@@ -275,14 +246,9 @@
             ct = new ArrayList<String>();
             headers.put(JMSConstants.JMS_CONTENT_TYPE, ct);
         }
-
         ct.add(contentType);
     }
 
-    public static boolean isDestinationStyleQueue(AddressType address) {
-        return JMSConstants.JMS_QUEUE.equals(address.getDestinationStyle().value());
-    }
-
     public static Message buildJMSMessageFromCXFMessage(org.apache.cxf.message.Message outMessage,
                                                         Object payload, String messageType, Session session,
                                                         Destination replyTo, String correlationId)
@@ -312,76 +278,19 @@
         return jmsMessage;
     }
 
-    public static void sendMessage(MessageProducer producer, Destination destination, Message jmsMessage,
-                                   long timeToLive, int deliveryMode, int priority) throws JMSException {
-        /*
-         * Can this be changed to producer.send(destination, jmsMessage, deliveryMode, priority, timeToLive);
-         */
-
-        if (destination instanceof Queue) {
-            QueueSender sender = (QueueSender)producer;
-            sender.setTimeToLive(timeToLive);
-            sender.send((Queue)destination, jmsMessage, deliveryMode, priority, timeToLive);
-        } else {
-            TopicPublisher publisher = (TopicPublisher)producer;
-            publisher.setTimeToLive(timeToLive);
-            publisher.publish((Topic)destination, jmsMessage, deliveryMode, priority, timeToLive);
-        }
-    }
-
-    public static Destination resolveRequestDestination(Context context, Connection connection,
-                                                        AddressType addrDetails) throws JMSException,
-        NamingException {
-        Destination requestDestination = null;
-        // see if jndiDestination is set
-        if (addrDetails.getJndiDestinationName() != null) {
-            requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName());
-        }
-
-        // if no jndiDestination or it fails see if jmsDestination is set
-        // and try to create it.
-        if (requestDestination == null && addrDetails.getJmsDestinationName() != null) {
-            if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
-                requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
-                    .createQueue(addrDetails.getJmsDestinationName());
-            } else {
-                requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
-                    .createTopic(addrDetails.getJmsDestinationName());
-            }
-        }
-        return requestDestination;
-    }
-
-    public static Queue resolveReplyDestination(Context context, Connection connection,
-                                                AddressType addrDetails) throws NamingException,
-        JMSException {
-        Queue replyDestination = null;
-
-        // Reply Destination is used (if present) only if the session is
-        // point-to-point session
-        if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
-            if (addrDetails.getJndiReplyDestinationName() != null) {
-                replyDestination = (Queue)context.lookup(addrDetails.getJndiReplyDestinationName());
-            }
-            if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) {
-                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                replyDestination = session.createQueue(addrDetails.getJmsReplyDestinationName());
-                session.close();
-            }
-        }
-        return replyDestination;
-    }
-
-    public static String generateUniqueSelector() {
+    /**
+     * Create a unique correlation Id from
+     * <host>_<user.name>_<currentThread><time>
+     * @return correlationId
+     */
+    public static String generateCorrelationId() {
         String host = "localhost";
-
         try {
             InetAddress addr = InetAddress.getLocalHost();
             host = addr.getHostName();
         } catch (UnknownHostException ukex) {
-            // Default to localhost.
+            // Default to localhost
         }
-
         long time = Calendar.getInstance().getTimeInMillis();
         return host + "_" + System.getProperty("user.name") + "_" + Thread.currentThread() + time;
     }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java Mon Sep 29 13:11:34 2008
@@ -20,7 +20,10 @@
 
 import javax.xml.namespace.QName;
 
+
 import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
 
 import org.apache.cxf.configuration.spring.AbstractBeanDefinitionParser;
 import org.apache.cxf.transport.jms.AddressType;
@@ -40,12 +43,17 @@
         bean.setAbstract(true);
         mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "clientConfig"), "clientConfig",
                                  ClientConfig.class);
-        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "runtimePolicy", 
+        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "runtimePolicy",
                                  ClientBehaviorPolicyType.class);
-        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "address", 
-                                 AddressType.class);
-        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool", 
+        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "address", AddressType.class);
+        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool",
                                  SessionPoolType.class);
+        NodeList el = element.getElementsByTagNameNS(JMS_NS, "jmsConfig-ref");
+
+        if (el.getLength() == 1) {
+            Node el1 = el.item(0);
+            bean.addPropertyReference("jmsConfig", el1.getTextContent());
+        }
     }
 
     @Override

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java Mon Sep 29 13:11:34 2008
@@ -38,13 +38,13 @@
     @Override
     protected void doParse(Element element, ParserContext ctx, BeanDefinitionBuilder bean) {
         bean.setAbstract(true);
-        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "serverConfig"), "serverConfig", 
+        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "serverConfig"), "serverConfig",
                                  ServerConfig.class);
-        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "runtimePolicy", 
+        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "serverBehavior",
                                  ServerBehaviorPolicyType.class);
-        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "JMSAddress",
+        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "address",
                                  AddressType.class);
-        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool", 
+        mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool",
                                  SessionPoolType.class);
     }
 

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java Mon Sep 29 13:11:34 2008
@@ -20,6 +20,7 @@
 package org.apache.cxf.transport.jms.wsdl11;
 
 import java.util.Map;
+
 import javax.wsdl.Port;
 import javax.wsdl.WSDLException;
 import javax.wsdl.extensions.ExtensibilityElement;

Modified: cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd (original)
+++ cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd Mon Sep 29 13:11:34 2008
@@ -51,10 +51,17 @@
             <xs:complexContent>
                 <xs:extension base="beans:identifiedType">
                     <xs:sequence>
-                        <xs:element name="clientConfig" type="jms:ClientConfig" minOccurs="0"/>
-                        <xs:element name="runtimePolicy" type="jms:ClientBehaviorPolicyType" minOccurs="0"/>
-                        <xs:element name="sessionPool" type="jms:SessionPoolType" minOccurs="0"/>
-                        <xs:element name="address" type="jms:AddressType" minOccurs="0"/>
+                    	<xs:element name="clientConfig"
+                    		type="jms:ClientConfig" minOccurs="0" />
+                    	<xs:element name="runtimePolicy"
+                    		type="jms:ClientBehaviorPolicyType" minOccurs="0" />
+                    	<xs:element name="sessionPool"
+                    		type="jms:SessionPoolType" minOccurs="0" />
+                    	<xs:element name="address"
+                    		type="jms:AddressType" minOccurs="0" />
+                    	<xs:element name="jmsConfig-ref"
+                    		type="xs:string">
+                    	</xs:element>
                     </xs:sequence>
                     <xs:attributeGroup ref="cxf-beans:beanAttributes"/>
                 </xs:extension>

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Mon Sep 29 13:11:34 2008
@@ -111,11 +111,14 @@
         } else {
             target = EasyMock.createMock(EndpointReferenceType.class);
         }
-
-        JMSConduit jmsConduit = new JMSConduit(bus, endpointInfo, target);
+        
         JMSConfiguration jmsConfig = new JMSOldConfigHolder()
-            .createJMSConfigurationFromEndpointInfo(bus, endpointInfo);
-        jmsConduit.setJmsConfig(jmsConfig);
+            .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true);
+        jmsConfig.setDeliveryMode(3);
+        jmsConfig.setPriority(1);
+        jmsConfig.setTimeToLive(1000);
+        JMSConduit jmsConduit = new JMSConduit(target, jmsConfig);
+        jmsConduit.afterPropertiesSet();
         if (send) {
             // setMessageObserver
             observer = new MessageObserver() {

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Mon Sep 29 13:11:34 2008
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -56,7 +57,7 @@
                          "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
         JMSConduit conduit = setupJMSConduit(false, false);
         assertEquals("Can't get the right ClientReceiveTimeout", 500L, conduit.getJmsConfig()
-            .getJmsTemplate().getReceiveTimeout());
+            .getReceiveTimeout());
         bus.shutdown(false);
         BusFactory.setDefaultBus(null);
 
@@ -121,21 +122,16 @@
         JMSConduit conduit = setupJMSConduit(true, false);
         Message msg = new MessageImpl();
         conduit.prepare(msg);
-        final byte[] b = testMsg.getBytes(); // TODO encoding
-        JmsTemplate jmsTemplate = conduit.getJmsConfig().getJmsTemplate();
+        final byte[] testBytes = testMsg.getBytes(Charset.defaultCharset()); // TODO encoding
+        JMSConfiguration jmsConfig = conduit.getJmsConfig();
+        JmsTemplate jmsTemplate = new JmsTemplate();
+        jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory());
         javax.jms.Message message = (javax.jms.Message)jmsTemplate.execute(new SessionCallback() {
-
             public Object doInJms(Session session) throws JMSException {
-                return JMSUtils.createAndSetPayload(b, session, JMSConstants.BYTE_MESSAGE_TYPE);
+                return JMSUtils.createAndSetPayload(testBytes, session, JMSConstants.BYTE_MESSAGE_TYPE);
             }
-
         });
-
         assertTrue("Message should have been of type BytesMessage ", message instanceof BytesMessage);
-        // byte[] returnBytes = new byte[(int)((BytesMessage) message).getBodyLength()];
-        // ((BytesMessage) message).readBytes(returnBytes);
-        // assertTrue("Message marshalled was incorrect",
-        // testMsg.equals(new String(returnBytes)));
     }
 
 }

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Mon Sep 29 13:11:34 2008
@@ -31,10 +31,8 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.ConduitInitiator;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.MultiplexDestination;
-import org.easymock.classextension.EasyMock;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -63,20 +61,23 @@
 
     private void waitForReceiveDestMessage() {
         int waitTime = 0;
-        while (destMessage == null && waitTime < 3000) {
+        while (destMessage == null && waitTime < MAX_RECEIVE_TIME) {
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
                 // do nothing here
             }
-            waitTime = waitTime + 1000;
+            waitTime++;
         }
-        assertTrue("Can't receive the Destination message in 3 seconds", destMessage != null);
+        assertTrue("Can't receive the Destination message in " + MAX_RECEIVE_TIME 
+                   + " seconds", destMessage != null);
     }
 
-    public JMSDestination setupJMSDestination(boolean send) throws IOException {
-        ConduitInitiator conduitInitiator = EasyMock.createMock(ConduitInitiator.class);
-        JMSDestination jmsDestination = new JMSDestination(bus, conduitInitiator, endpointInfo);
+    public JMSDestination setupJMSDestination(boolean send) {
+        JMSConfiguration jmsConfig = new JMSOldConfigHolder()
+            .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, false);
+        JMSDestination jmsDestination = new JMSDestination(bus, endpointInfo, jmsConfig);
+
         if (send) {
             // setMessageObserver
             observer = new MessageObserver() {
@@ -101,24 +102,25 @@
         setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
                          "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
         JMSDestination destination = setupJMSDestination(false);
-        assertEquals("Can't get the right ServerConfig's MessageTimeToLive ", 500L, destination
-            .getServerConfig().getMessageTimeToLive());
-        assertEquals("Can't get the right Server's MessageSelector", "cxf_message_selector", destination
-            .getRuntimePolicy().getMessageSelector());
-        assertEquals("Can't get the right SessionPoolConfig's LowWaterMark", 10, destination
-            .getSessionPool().getLowWaterMark());
-        assertEquals("Can't get the right AddressPolicy's ConnectionPassword", "testPassword", destination
-            .getJMSAddress().getConnectionPassword());
-        assertEquals("Can't get the right DurableSubscriberName", "cxf_subscriber", destination
-            .getRuntimePolicy().getDurableSubscriberName());
-        assertEquals("Can't get the right MessageSelectorName", "cxf_message_selector", destination
-            .getRuntimePolicy().getMessageSelector());
+        JMSConfiguration jmsConfig = destination.getJmsConfig();
+        //JmsTemplate jmsTemplate = destination.getJmsTemplate();
+        //AbstractMessageListenerContainer jmsListener = destination.getJmsListener();
+        assertEquals("Can't get the right ServerConfig's MessageTimeToLive ", 500L, jmsConfig
+            .getTimeToLive());
+        assertEquals("Can't get the right Server's MessageSelector", "cxf_message_selector", jmsConfig
+            .getMessageSelector());
+        // assertEquals("Can't get the right SessionPoolConfig's LowWaterMark", 10,
+        // jmsListener.getLowWaterMark());
+        // assertEquals("Can't get the right AddressPolicy's ConnectionPassword", "testPassword",
+        // .getConnectionPassword());
+        assertEquals("Can't get the right DurableSubscriberName", "cxf_subscriber", jmsConfig
+            .getDurableSubscriptionName());
         BusFactory.setDefaultBus(null);
 
     }
 
     @Test
-    public void testGetConfigurationFormWSDL() throws Exception {
+    public void testGetConfigurationFromWSDL() throws Exception {
         SpringBusFactory bf = new SpringBusFactory();
         BusFactory.setDefaultBus(null);
         bus = bf.createBus();
@@ -129,11 +131,11 @@
         JMSDestination destination = setupJMSDestination(false);
 
         assertEquals("Can't get the right DurableSubscriberName", "CXF_subscriber", destination
-            .getRuntimePolicy().getDurableSubscriberName());
+            .getJmsConfig().getDurableSubscriptionName());
 
-        assertEquals("Can't get the right AddressPolicy's ConnectionPassword",
-                     "dynamicQueues/test.jmstransport.binary", destination.getJMSAddress()
-                         .getJndiDestinationName());
+        assertEquals("Can't get the right AddressPolicy's Destination",
+                     "dynamicQueues/test.jmstransport.binary", destination.getJmsConfig()
+                         .getTargetDestination());
 
         BusFactory.setDefaultBus(null);
 
@@ -153,7 +155,7 @@
         Message outMessage = new MessageImpl();
         setupMessageHeader(outMessage);
         JMSDestination destination = setupJMSDestination(true);
-        // destination.activate();
+        destination.activate();
         sendoutMessage(conduit, outMessage, true);
         // wait for the message to be get from the destination
         waitForReceiveDestMessage();
@@ -161,6 +163,7 @@
         assertTrue("The destiantion should have got the message ", destMessage != null);
         verifyReceivedMessage(destMessage);
         verifyHeaders(destMessage, outMessage);
+        conduit.close();
         destination.shutdown();
     }
 
@@ -173,14 +176,8 @@
         JMSConduit conduit = setupJMSConduit(true, false);
         Message outMessage = new MessageImpl();
         setupMessageHeader(outMessage);
-        JMSDestination destination = null;
-        try {
-            destination = setupJMSDestination(true);
-            destination.activate();
-        } catch (IOException e) {
-            assertFalse("The JMSDestination activate should not throw exception ", false);
-            e.printStackTrace();
-        }
+        JMSDestination destination = setupJMSDestination(true);
+        destination.activate();
         sendoutMessage(conduit, outMessage, true);
         // wait for the message to be get from the destination
         waitForReceiveDestMessage();
@@ -188,6 +185,7 @@
         assertTrue("The destiantion should have got the message ", destMessage != null);
         verifyReceivedMessage(destMessage);
         verifyHeaders(destMessage, outMessage);
+        conduit.close();
         destination.shutdown();
     }
 
@@ -248,8 +246,8 @@
     public void testRoundTripDestination() throws Exception {
 
         inMessage = null;
-        setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl", "HelloWorldService",
-                         "HelloWorldPort");
+        setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+                         "HelloWorldService", "HelloWorldPort");
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduit(true, false);
         final Message outMessage = new MessageImpl();
@@ -293,14 +291,14 @@
         verifyReceivedMessage(inMessage);
 
         Thread.sleep(1000);
+        conduit.close();
         destination.shutdown();
     }
 
     @Test
     public void testPropertyExclusion() throws Exception {
 
-        final String customPropertyName =
-            "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
+        final String customPropertyName = "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
 
         inMessage = null;
         setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
@@ -360,6 +358,7 @@
                    inHeader.getProperty().get(0).getName().equals(JMSConstants.JMS_CONTENT_TYPE));
         // wait for a while for the jms session recycling
         Thread.sleep(1000);
+        conduit.close();
         destination.shutdown();
     }
 

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java?rev=700236&r1=700235&r2=700236&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java Mon Sep 29 13:11:34 2008
@@ -28,9 +28,6 @@
 
 public class JMSUtilsTest extends Assert {
 
-    
-    // This is just a place holder for now it will be chaning in next task 
-    // when the new JMS address policies and configurations are introdced.
     @Test
     public void testpopulateIncomingContextNonNull() throws Exception {
         AddressType addrType =  new AddressType();
@@ -40,14 +37,12 @@
         prop.setValue("testValue");
         addrType.getJMSNamingProperty().add(prop);      
         
-        prop.setName(Context.BATCHSIZE);
-        prop.setValue("12");
-        addrType.getJMSNamingProperty().add(prop);
+        JMSNamingPropertyType prop2 = new JMSNamingPropertyType();
+        prop2.setName(Context.BATCHSIZE);
+        prop2.setValue("12");
+        addrType.getJMSNamingProperty().add(prop2);
         
-        
-        Properties env = new Properties();
-        assertTrue(env.size() <= 0);
-        JMSUtils.populateContextEnvironment(addrType, env);
+        Properties env = JMSUtils.getInitialContextEnv(addrType);
         assertTrue("Environment should not be empty", env.size() > 0);
         assertTrue("Environemnt should contain NamingBatchSize property", env.get(Context.BATCHSIZE) != null);
     }