You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2008/08/31 06:49:48 UTC

svn commit: r690638 [1/2] - in /cxf/trunk/rt/transports/jms/src: main/java/org/apache/cxf/transport/jms/ test/java/org/apache/cxf/transport/jms/

Author: ningjiang
Date: Sat Aug 30 21:49:48 2008
New Revision: 690638

URL: http://svn.apache.org/viewvc?rev=690638&view=rev
Log:
CXF-1773 applied patch with thanks to Christian

Added:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java
      - copied, changed from r690626, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java
Removed:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportBase.java
Modified:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=690638&r1=690637&r2=690638&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Sat Aug 30 21:49:48 2008
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.transport.jms;
 
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,6 +28,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.QueueSender;
@@ -48,104 +48,106 @@
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
-public class JMSConduit extends AbstractConduit implements Configurable, JMSTransport {
+public class JMSConduit extends AbstractConduit implements Configurable, JMSOnConnectCallback {
 
     protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
 
     private static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
-    
-    protected final JMSTransportBase base;
+
+    protected Destination targetDestination;
+    protected Destination replyDestination;
+    protected JMSSessionFactory sessionFactory;
+    protected Bus bus;
+    protected EndpointInfo endpointInfo;
+    protected String beanNameSuffix;
+
     protected ClientConfig clientConfig;
     protected ClientBehaviorPolicyType runtimePolicy;
     protected AddressType address;
     protected SessionPoolType sessionPool;
-       
+
     public JMSConduit(Bus b, EndpointInfo endpointInfo) {
         this(b, endpointInfo, null);
     }
-    
-    public JMSConduit(Bus b,
-                      EndpointInfo endpointInfo,
-                      EndpointReferenceType target) {           
-        super(target);        
 
-        base = new JMSTransportBase(b, endpointInfo, false, BASE_BEAN_NAME_SUFFIX, this);
-        
+    public JMSConduit(Bus b, EndpointInfo endpointInfo, EndpointReferenceType target) {
+        super(target);
+
+        this.bus = b;
+        this.endpointInfo = endpointInfo;
+        this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
         initConfig();
-    } 
-    
+    }
+
     // prepare the message for send out , not actually send out the message
-    public void prepare(Message message) throws IOException {        
+    public void prepare(Message message) throws IOException {
         getLogger().log(Level.FINE, "JMSConduit send message");
 
         try {
-            if (null == base.sessionFactory) {
-                JMSProviderHub.connect(this);
+            if (null == sessionFactory) {
+                JMSProviderHub.connect(this, getJMSAddress(), getSessionPool());
             }
         } catch (JMSException jmsex) {
-            getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);            
+            getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
             throw new IOException(jmsex.toString());
         } catch (NamingException ne) {
             getLogger().log(Level.WARNING, "JMS connect failed with NamingException : ", ne);
             throw new IOException(ne.toString());
         }
 
-        if (base.sessionFactory == null) {
+        if (sessionFactory == null) {
             throw new java.lang.IllegalStateException("JMSClientTransport not connected");
         }
 
         try {
-            boolean isOneWay = false;        
-            //test if the message is oneway message
+            boolean isOneWay = false;
+            // test if the message is oneway message
             Exchange ex = message.getExchange();
             if (null != ex) {
                 isOneWay = ex.isOneWay();
-            }    
-            //get the pooledSession with response expected 
-            PooledSession pooledSession = base.sessionFactory.get(!isOneWay);
+            }
+            // get the pooledSession with response expected
+            PooledSession pooledSession = sessionFactory.get(!isOneWay);
             // put the PooledSession into the outMessage
             message.put(JMSConstants.JMS_POOLEDSESSION, pooledSession);
-            
+
         } catch (JMSException jmsex) {
             throw new IOException(jmsex.getMessage());
         }
-        
-        message.setContent(OutputStream.class,
-                           new JMSOutputStream(message));
-      
+
+        message.setContent(OutputStream.class, new JMSOutputStream(message));
+
     }
 
-    public void close() {       
+    public void close() {
         getLogger().log(Level.FINE, "JMSConduit closed ");
 
         // ensure resources held by session factory are released
         //
-        if (base.sessionFactory != null) {
-            base.sessionFactory.shutdown();
+        if (sessionFactory != null) {
+            sessionFactory.shutdown();
         }
     }
-    
+
     protected Logger getLogger() {
         return LOG;
     }
-    
 
     /**
      * Receive mechanics.
-     *
+     * 
      * @param pooledSession the shared JMS resources
-     * @param inMessage 
+     * @param inMessage
      * @retrun the response buffer
      */
-    private Object receive(PooledSession pooledSession,
-                           Message outMessage, Message inMessage) throws JMSException {
-        
+    private Object receive(PooledSession pooledSession, Message outMessage, Message inMessage)
+        throws JMSException {
+
         Object result = null;
-        
+
         long timeout = getClientConfig().getClientReceiveTimeout();
 
         Long receiveTimeout = (Long)outMessage.get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
@@ -153,56 +155,52 @@
         if (receiveTimeout != null) {
             timeout = receiveTimeout.longValue();
         }
-        
+
         javax.jms.Message jmsMessage = pooledSession.consumer().receive(timeout);
-        getLogger().log(Level.FINE, "client received reply: " , jmsMessage);
+        getLogger().log(Level.FINE, "client received reply: ", jmsMessage);
 
         if (jmsMessage != null) {
-            
-            base.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-            result = base.unmarshal(jmsMessage);
+
+            JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+            result = JMSUtils.unmarshal(jmsMessage);
             return result;
         } else {
             String error = "JMSClientTransport.receive() timed out. No message available.";
             getLogger().log(Level.SEVERE, error);
-            //TODO: Review what exception should we throw.
+            // TODO: Review what exception should we throw.
             throw new JMSException(error);
-            
+
         }
     }
 
-    public void connected(javax.jms.Destination target, 
-                          javax.jms.Destination reply, 
-                          JMSSessionFactory factory) {
-        base.connected(target, reply, factory);
+    public void connected(Destination target, Destination reply, JMSSessionFactory factory) {
+        this.targetDestination = target;
+        this.replyDestination = reply;
+        this.sessionFactory = factory;
     }
 
     public String getBeanName() {
-        return base.endpointInfo.getName().toString() + ".jms-conduit";
+        return endpointInfo.getName().toString() + ".jms-conduit";
     }
-    
+
     private void initConfig() {
 
-        this.address = base.endpointInfo.getTraversedExtensor(new AddressType(), 
-                                                              AddressType.class);
-        this.sessionPool = base.endpointInfo.getTraversedExtensor(new SessionPoolType(), 
-                                                                  SessionPoolType.class);
-        this.clientConfig = base.endpointInfo.getTraversedExtensor(new ClientConfig(),
-                                                                   ClientConfig.class);
-        this.runtimePolicy = base.endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
-                                                                    ClientBehaviorPolicyType.class);
+        this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
+        this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+        this.clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
+        this.runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
+                                                               ClientBehaviorPolicyType.class);
 
-        Configurer configurer = base.bus.getExtension(Configurer.class);
+        Configurer configurer = bus.getExtension(Configurer.class);
         if (null != configurer) {
             configurer.configureBean(this);
         }
     }
 
     private boolean isTextPayload() {
-        return JMSConstants.TEXT_MESSAGE_TYPE.equals(
-            getRuntimePolicy().getMessageType().value());
+        return JMSConstants.TEXT_MESSAGE_TYPE.equals(getRuntimePolicy().getMessageType().value());
     }
-    
+
     public AddressType getJMSAddress() {
         return address;
     }
@@ -235,23 +233,22 @@
         this.sessionPool = sessionPool;
     }
 
-    
     private class JMSOutputStream extends CachedOutputStream {
         private Message outMessage;
         private javax.jms.Message jmsMessage;
         private PooledSession pooledSession;
         private boolean isOneWay;
-                
+
         public JMSOutputStream(Message m) {
             outMessage = m;
             pooledSession = (PooledSession)outMessage.get(JMSConstants.JMS_POOLEDSESSION);
-        }     
-        
+        }
+
         protected void doFlush() throws IOException {
-          //do nothing here    
+            // do nothing here
         }
-        
-        protected void doClose() throws IOException {            
+
+        protected void doClose() throws IOException {
             try {
                 isOneWay = outMessage.getExchange().isOneWay();
                 commitOutputMessage();
@@ -259,30 +256,28 @@
                     handleResponse();
                 }
             } catch (JMSException jmsex) {
-                getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);            
+                getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
                 throw new IOException(jmsex.toString());
             } finally {
-                base.sessionFactory.recycle(pooledSession);
+                sessionFactory.recycle(pooledSession);
             }
         }
-        
+
         protected void onWrite() throws IOException {
-            
+
         }
-        
+
         private void commitOutputMessage() throws JMSException {
             javax.jms.Destination replyTo = pooledSession.destination();
-            //TODO setting up the responseExpected
-            
-            
-            //We don't want to send temp queue in
-            //replyTo header for oneway calls
-            if (isOneWay
-                && (getJMSAddress().getJndiReplyDestinationName() == null)) {
+            // TODO setting up the responseExpected
+
+            // We don't want to send temp queue in
+            // replyTo header for oneway calls
+            if (isOneWay && (getJMSAddress().getJndiReplyDestinationName() == null)) {
                 replyTo = null;
             }
 
-            Object request = null;            
+            Object request = null;
             try {
                 if (isTextPayload()) {
                     StringBuilder builder = new StringBuilder(2048);
@@ -299,36 +294,35 @@
             if (getLogger().isLoggable(Level.FINE)) {
                 getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
             }
-            
-            
-            jmsMessage = base.marshal(request, pooledSession.session(), replyTo,
-                getRuntimePolicy().getMessageType().value());
-            
-            JMSMessageHeadersType headers =
-                (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
-            int deliveryMode = base.getJMSDeliveryMode(headers);
-            int priority = base.getJMSPriority(headers);
-            String correlationID = base.getCorrelationId(headers);
-            long ttl = base.getTimeToLive(headers);
+
+            jmsMessage = JMSUtils.marshal(request, pooledSession.session(), replyTo, getRuntimePolicy()
+                .getMessageType().value());
+
+            JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+                .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+            int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
+            int priority = JMSUtils.getJMSPriority(headers);
+            String correlationID = JMSUtils.getCorrelationId(headers);
+            long ttl = JMSUtils.getTimeToLive(headers);
             if (ttl <= 0) {
                 ttl = getClientConfig().getMessageTimeToLive();
             }
-             
-            base.setMessageProperties(headers, jmsMessage);
-            //ensure that the contentType is set to the out jms message header
-            base.setContentToProtocalHeader(outMessage);
-            Map<String, List<String>> protHeaders = 
-                CastUtils.cast((Map<?, ?>)outMessage.get(Message.PROTOCOL_HEADERS));
-            base.addProtocolHeaders(jmsMessage, protHeaders);
+
+            JMSUtils.setMessageProperties(headers, jmsMessage);
+            // ensure that the contentType is set to the out jms message header
+            JMSUtils.setContentToProtocalHeader(outMessage);
+            Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+                .get(Message.PROTOCOL_HEADERS));
+            JMSUtils.addProtocolHeaders(jmsMessage, protHeaders);
             if (!isOneWay) {
                 String id = pooledSession.getCorrelationID();
 
                 if (id != null) {
                     if (correlationID != null) {
                         String error = "User cannot set JMSCorrelationID when "
-                            + "making a request/reply invocation using "
-                            + "a static replyTo Queue.";
+                                       + "making a request/reply invocation using "
+                                       + "a static replyTo Queue.";
                         throw new JMSException(error);
                     }
                     correlationID = id;
@@ -338,48 +332,47 @@
             if (correlationID != null) {
                 jmsMessage.setJMSCorrelationID(correlationID);
             } else {
-                //No message correlation id is set. Whatever comeback will be accepted as responses.
+                // No message correlation id is set. Whatever comeback will be accepted as responses.
                 // We assume that it will only happen in case of the temp. reply queue.
             }
 
-            getLogger().log(Level.FINE, "client sending request: ",  jmsMessage);
-            //getting  Destination Style
-            if (base.isDestinationStyleQueue()) {
+            getLogger().log(Level.FINE, "client sending request: ", jmsMessage);
+            // getting Destination Style
+            if (JMSUtils.isDestinationStyleQueue(address)) {
                 QueueSender sender = (QueueSender)pooledSession.producer();
                 sender.setTimeToLive(ttl);
-                sender.send((Queue)base.targetDestination, jmsMessage, deliveryMode, priority, ttl);
+                sender.send((Queue)targetDestination, jmsMessage, deliveryMode, priority, ttl);
             } else {
                 TopicPublisher publisher = (TopicPublisher)pooledSession.producer();
                 publisher.setTimeToLive(ttl);
-                publisher.publish((Topic)base.targetDestination, jmsMessage, deliveryMode, priority, ttl);
+                publisher.publish((Topic)targetDestination, jmsMessage, deliveryMode, priority, ttl);
             }
         }
 
         private void handleResponse() throws IOException {
             // REVISIT distinguish decoupled case or oneway call
             Object response = null;
-            
-            //TODO if outMessage need to get the response
+
+            // TODO if outMessage need to get the response
             Message inMessage = new MessageImpl();
-            outMessage.getExchange().setInMessage(inMessage);            
-            //set the message header back to the incomeMessage
-            //inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, 
-            //              outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
-                        
+            outMessage.getExchange().setInMessage(inMessage);
+            // set the message header back to the incomeMessage
+            // inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
+            // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
+
             try {
                 response = receive(pooledSession, outMessage, inMessage);
             } catch (JMSException jmsex) {
-                getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex);            
+                getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex);
                 throw new IOException(jmsex.toString());
-            }  
-
+            }
 
             getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
-            
+
             // setup the inMessage response stream
             byte[] bytes = null;
             if (response instanceof String) {
-                String requestString = (String)response;                
+                String requestString = (String)response;
                 bytes = requestString.getBytes();
             } else {
                 bytes = (byte[])response;
@@ -390,16 +383,14 @@
         }
     }
 
-    
     /**
      * Represented decoupled response endpoint.
      */
     protected class DecoupledDestination implements Destination {
         protected MessageObserver decoupledMessageObserver;
         private EndpointReferenceType address;
-        
-        DecoupledDestination(EndpointReferenceType ref,
-                             MessageObserver incomingObserver) {
+
+        DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) {
             address = ref;
             decoupledMessageObserver = incomingObserver;
         }
@@ -408,25 +399,23 @@
             return address;
         }
 
-        public Conduit getBackChannel(Message inMessage,
-                                      Message partialResponse,
-                                      EndpointReferenceType addr)
+        public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr)
             throws IOException {
             // shouldn't be called on decoupled endpoint
             return null;
         }
 
         public void shutdown() {
-            // TODO Auto-generated method stub            
+            // TODO Auto-generated method stub
         }
 
         public synchronized void setMessageObserver(MessageObserver observer) {
             decoupledMessageObserver = observer;
         }
-        
+
         public synchronized MessageObserver getMessageObserver() {
             return decoupledMessageObserver;
         }
-    }     
+    }
 
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=690638&r1=690637&r2=690638&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Sat Aug 30 21:49:48 2008
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.transport.jms;
 
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -36,6 +35,7 @@
 import java.util.logging.Logger;
 
 import javax.jms.BytesMessage;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.QueueSender;
@@ -63,32 +63,36 @@
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
 
+public class JMSDestination extends AbstractMultiplexDestination implements Configurable,
+    JMSOnConnectCallback {
 
-
-public class JMSDestination extends AbstractMultiplexDestination implements Configurable, JMSTransport {
-        
     protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
 
     private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
-    
+
     protected ServerConfig serverConfig;
     protected ServerBehaviorPolicyType runtimePolicy;
     protected AddressType address;
     protected SessionPoolType sessionPool;
-     
+    protected Destination targetDestination;
+    protected Destination replyDestination;
+    protected JMSSessionFactory sessionFactory;
+    protected Bus bus;
+    protected EndpointInfo endpointInfo;
+    protected String beanNameSuffix;
+    
     final ConduitInitiator conduitInitiator;
-    final JMSTransportBase base;
-  
+
+
     PooledSession listenerSession;
     JMSListenerThread listenerThread;
-    
-    public JMSDestination(Bus b,
-                          ConduitInitiator ci,
-                          EndpointInfo info) throws IOException {
-        super(b, getTargetReference(info, b), info);    
-        
-        base = new JMSTransportBase(b, endpointInfo, true, BASE_BEAN_NAME_SUFFIX, this);
 
+    public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info) throws IOException {
+        super(b, getTargetReference(info, b), info);
+
+        this.bus = b;
+        this.endpointInfo = info;
+        this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
         conduitInitiator = ci;
 
         initConfig();
@@ -97,27 +101,25 @@
     protected Logger getLogger() {
         return LOG;
     }
-    
+
     /**
      * @param inMessage the incoming message
      * @return the inbuilt backchannel
      */
     protected Conduit getInbuiltBackChannel(Message inMessage) {
-        return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(),
-                                      inMessage);
+        return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage);
     }
-     
-    public void activate()  {
-        getLogger().log(Level.INFO, "JMSServerTransport activate().... ");        
+
+    public void activate() {
+        getLogger().log(Level.INFO, "JMSServerTransport activate().... ");
 
         try {
             getLogger().log(Level.FINE, "establishing JMS connection");
-            JMSProviderHub.connect(this, serverConfig, runtimePolicy);
-            //Get a non-pooled session. 
-            listenerSession = base.sessionFactory.get(base.targetDestination);
-            listenerThread = new JMSListenerThread(listenerSession,
-                                                   getEndpointInfo() == null ? null 
-                                                       : getEndpointInfo().getName());
+            JMSProviderHub.connect(this, getJMSAddress(), getSessionPool(), serverConfig, runtimePolicy);
+            // Get a non-pooled session.
+            listenerSession = sessionFactory.get(targetDestination);
+            listenerThread = new JMSListenerThread(listenerSession, getEndpointInfo() == null
+                ? null : getEndpointInfo().getName());
             listenerThread.start();
         } catch (JMSException ex) {
             getLogger().log(Level.SEVERE, "JMS connect failed with JMSException : ", ex);
@@ -125,18 +127,18 @@
             getLogger().log(Level.SEVERE, "JMS connect failed with NamingException : ", nex);
         }
     }
-    
-    public void deactivate()  {
+
+    public void deactivate() {
         try {
             listenerSession.consumer().close();
             if (listenerThread != null) {
                 listenerThread.join();
             }
-            base.sessionFactory.shutdown();
+            sessionFactory.shutdown();
         } catch (InterruptedException e) {
-            //Do nothing here
+            // Do nothing here
         } catch (JMSException ex) {
-            //Do nothing here
+            // Do nothing here
         }
     }
 
@@ -145,43 +147,40 @@
         this.deactivate();
     }
 
-    public Queue getReplyToDestination(Message inMessage) 
-        throws JMSException, NamingException {
+    public Queue getReplyToDestination(Message inMessage) throws JMSException, NamingException {
         Queue replyTo;
-        javax.jms.Message message = 
-            (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
+        javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
         // If WS-Addressing had set the replyTo header.
-        if  (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
-            replyTo = base.sessionFactory.getQueueFromInitialContext(
-                              (String)  inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO));
+        if (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
+            replyTo = sessionFactory.getQueueFromInitialContext((String)inMessage
+                .get(JMSConstants.JMS_REBASED_REPLY_TO));
         } else {
-            replyTo = (null != message.getJMSReplyTo()) 
-                ? (Queue)message.getJMSReplyTo() : (Queue)base.replyDestination;
-        }    
+            replyTo = (null != message.getJMSReplyTo())
+                ? (Queue)message.getJMSReplyTo() : (Queue)replyDestination;
+        }
         return replyTo;
     }
-    
-    public void setReplyCorrelationID(javax.jms.Message request, javax.jms.Message reply) 
-        throws JMSException {
-        
+
+    public void setReplyCorrelationID(javax.jms.Message request, 
+                                      javax.jms.Message reply) throws JMSException {
+
         String correlationID = request.getJMSCorrelationID();
-        
-        if (correlationID == null
-            || "".equals(correlationID)
+
+        if (correlationID == null || "".equals(correlationID)
             && getRuntimePolicy().isUseMessageIDAsCorrelationID()) {
             correlationID = request.getJMSMessageID();
         }
-    
+
         if (correlationID != null && !"".equals(correlationID)) {
             reply.setJMSCorrelationID(correlationID);
         }
     }
-    
+
     protected void incoming(javax.jms.Message message) throws IOException {
         try {
             getLogger().log(Level.FINE, "server received request: ", message);
-           
-            Object request = base.unmarshal(message);
+
+            Object request = JMSUtils.unmarshal(message);
             getLogger().log(Level.FINE, "The Request Message is [ " + request + "]");
             byte[] bytes = null;
 
@@ -190,50 +189,52 @@
                 getLogger().log(Level.FINE, "server received request: ", requestString);
                 bytes = requestString.getBytes();
             } else {
-                //Both ByteMessage and ObjectMessage would get unmarshalled to byte array.
+                // Both ByteMessage and ObjectMessage would get unmarshalled to byte array.
                 bytes = (byte[])request;
             }
 
             // get the message to be interceptor
             MessageImpl inMessage = new MessageImpl();
             inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
-            base.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+            JMSUtils.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
             inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
             inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
-                        
-            inMessage.setDestination(this);            
-            
+
+            inMessage.setDestination(this);
+
             BusFactory.setThreadDefaultBus(bus);
-            
-            //handle the incoming message
+
+            // handle the incoming message
             incomingObserver.onMessage(inMessage);
-           
+
         } catch (JMSException jmsex) {
-            //TODO: need to revisit for which exception should we throw.
+            // TODO: need to revisit for which exception should we throw.
             throw new IOException(jmsex.getMessage());
         } finally {
             BusFactory.setThreadDefaultBus(null);
         }
     }
-    
+
     public void connected(javax.jms.Destination target, 
                           javax.jms.Destination reply, 
                           JMSSessionFactory factory) {
-        base.connected(target, reply, factory);
+        this.targetDestination = target;
+        this.replyDestination = reply;
+        this.sessionFactory = factory;
     }
 
     public String getBeanName() {
         return endpointInfo.getName().toString() + ".jms-destination";
     }
-    
+
     private void initConfig() {
         this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
                                                                ServerBehaviorPolicyType.class);
         this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
         this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
         this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
-        
-        Configurer configurer = base.bus.getExtension(Configurer.class);
+
+        Configurer configurer = bus.getExtension(Configurer.class);
         if (null != configurer) {
             configurer.configureBean(this);
         }
@@ -270,10 +271,11 @@
     public void setSessionPool(SessionPoolType sessionPool) {
         this.sessionPool = sessionPool;
     }
-    
+
     protected class JMSListenerThread extends Thread {
         private final PooledSession listenSession;
         private final QName name;
+
         public JMSListenerThread(PooledSession session, QName n) {
             listenSession = session;
             name = n;
@@ -283,49 +285,47 @@
             try {
                 Executor executor = null;
                 if (executor == null) {
-                    WorkQueueManager wqm =
-                        base.bus.getExtension(WorkQueueManager.class);
+                    WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
                     if (null != wqm) {
                         if (name != null) {
-                            executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}" 
+                            executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}"
                                                              + name.getLocalPart());
-                        }                        
+                        }
                         if (executor == null) {
                             executor = wqm.getNamedWorkQueue("jms");
                         }
                         if (executor == null) {
                             executor = wqm.getAutomaticWorkQueue();
                         }
-                    }    
+                    }
                 }
                 while (true) {
-                    javax.jms.Message message = listenSession.consumer().receive();                   
+                    javax.jms.Message message = listenSession.consumer().receive();
                     if (message == null) {
-                        getLogger().log(Level.WARNING,
-                                "Null message received from message consumer.",
-                                " Exiting ListenerThread::run().");
+                        getLogger().log(Level.WARNING, "Null message received from message consumer.",
+                                        " Exiting ListenerThread::run().");
                         return;
                     }
                     while (message != null) {
-                        //REVISIT  to get the thread pool                        
-                        //Executor executor = jmsDestination.callback.getExecutor();
+                        // REVISIT to get the thread pool
+                        // Executor executor = jmsDestination.callback.getExecutor();
                         if (executor != null) {
                             try {
                                 executor.execute(new JMSExecutor(message));
                                 message = null;
                             } catch (RejectedExecutionException ree) {
-                                //FIXME - no room left on workqueue, what to do
-                                //for now, loop until it WILL fit on the queue, 
-                                //although we could just dispatch on this thread.
-                            }                            
+                                // FIXME - no room left on workqueue, what to do
+                                // for now, loop until it WILL fit on the queue,
+                                // although we could just dispatch on this thread.
+                            }
                         } else {
                             getLogger().log(Level.INFO, "handle the incoming message in listener thread");
                             try {
                                 incoming(message);
                             } catch (IOException ex) {
                                 getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
-                            }                            
-                        }                        
+                            }
+                        }
                         message = null;
                     }
                 }
@@ -338,10 +338,10 @@
             }
         }
     }
-    
+
     protected class JMSExecutor implements Runnable {
         javax.jms.Message message;
-        
+
         JMSExecutor(javax.jms.Message m) {
             message = m;
         }
@@ -351,24 +351,23 @@
             try {
                 incoming(message);
             } catch (IOException ex) {
-                //TODO: Decide what to do if we receive the exception.
-                getLogger().log(Level.WARNING,
-                        "Failed to process incoming message : ", ex);
+                // TODO: Decide what to do if we receive the exception.
+                getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
             }
         }
-        
+
     }
-    
-    // this should deal with the cxf message 
+
+    // this should deal with the cxf message
     protected class BackChannelConduit extends AbstractConduit {
-        
+
         protected Message inMessage;
-                
+
         BackChannelConduit(EndpointReferenceType ref, Message message) {
             super(ref);
             inMessage = message;
         }
-        
+
         /**
          * Register a message observer for incoming messages.
          * 
@@ -379,65 +378,62 @@
         }
 
         /**
-         * Send an outbound message, assumed to contain all the name-value
-         * mappings of the corresponding input message (if any). 
+         * Send an outbound message, assumed to contain all the name-value mappings of the corresponding input
+         * message (if any).
          * 
          * @param message the message to be sent.
          */
         public void prepare(Message message) throws IOException {
             // setup the message to be send back
-            message.put(JMSConstants.JMS_REQUEST_MESSAGE, 
-                        inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE));
-            
+            message.put(JMSConstants.JMS_REQUEST_MESSAGE, inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE));
+
             if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)
                 && inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) {
-                message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS,
-                            inMessage.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
+                message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage
+                    .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
             }
-            message.setContent(OutputStream.class,
-                               new JMSOutputStream(inMessage, message));
+            message.setContent(OutputStream.class, new JMSOutputStream(inMessage, message));
         }
-        
+
         protected Logger getLogger() {
             return LOG;
         }
     }
-    
+
     private class JMSOutputStream extends CachedOutputStream {
-                
+
         private Message inMessage;
         private Message outMessage;
         private javax.jms.Message reply;
         private Queue replyTo;
         private QueueSender sender;
-        
+
         // setup the ByteArrayStream
         public JMSOutputStream(Message m, Message o) {
             super();
             inMessage = m;
             outMessage = o;
         }
-        
-        //to prepear the message and get the send out message
+
+        // to prepear the message and get the send out message
         private void commitOutputMessage() throws IOException {
-            
-            JMSMessageHeadersType headers =
-                (JMSMessageHeadersType) outMessage.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
-            javax.jms.Message request = 
-                (javax.jms.Message) inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);              
-            
-            PooledSession replySession = null;          
-            
-            if (base.isDestinationStyleQueue()) {
+
+            JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+                .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
+            javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
+
+            PooledSession replySession = null;
+
+            if (JMSUtils.isDestinationStyleQueue(address)) {
                 try {
-                    //setup the reply message                
+                    // setup the reply message
                     replyTo = getReplyToDestination(inMessage);
-                    replySession = base.sessionFactory.get(false);
+                    replySession = sessionFactory.get(false);
                     sender = (QueueSender)replySession.producer();
-                    
+
                     String msgType = JMSConstants.TEXT_MESSAGE_TYPE;
                     Object replyObj = null;
-                    
+
                     if (request instanceof TextMessage) {
                         StringBuilder builder = new StringBuilder();
                         this.writeCacheTo(builder);
@@ -450,40 +446,37 @@
                         replyObj = getBytes();
                         msgType = JMSConstants.BINARY_MESSAGE_TYPE;
                     }
-                    
+
                     if (getLogger().isLoggable(Level.FINE)) {
-                        getLogger().log(Level.FINE, "The response message is ["
-                                        + (replyObj instanceof String 
-                                           ? (String)replyObj 
-                                               : IOUtils.newStringFromBytes((byte[])replyObj))
-                                    + "]");
+                        getLogger().log(
+                                        Level.FINE,
+                                        "The response message is ["
+                                            + (replyObj instanceof String ? (String)replyObj : IOUtils
+                                                .newStringFromBytes((byte[])replyObj)) + "]");
                     }
 
-                    reply = base.marshal(replyObj, 
-                                         replySession.session(), 
-                                         null, 
-                                         msgType);
+                    reply = JMSUtils.marshal(replyObj, replySession.session(), null, msgType);
 
                     setReplyCorrelationID(request, reply);
-                    base.setMessageProperties(headers, reply);
-                    //ensure that the contentType is set to the out jms message header
-                    base.setContentToProtocalHeader(outMessage);
-                    Map<String, List<String>> protHeaders = 
-                        CastUtils.cast((Map<?, ?>)outMessage.get(Message.PROTOCOL_HEADERS));
-                    base.addProtocolHeaders(reply, protHeaders);
+                    JMSUtils.setMessageProperties(headers, reply);
+                    // ensure that the contentType is set to the out jms message header
+                    JMSUtils.setContentToProtocalHeader(outMessage);
+                    Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+                        .get(Message.PROTOCOL_HEADERS));
+                    JMSUtils.addProtocolHeaders(reply, protHeaders);
 
                     sendResponse();
-                    
+
                 } catch (JMSException ex) {
-                    getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);                
-                    throw new IOException(ex.getMessage());                    
+                    getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
+                    throw new IOException(ex.getMessage());
                 } catch (NamingException nex) {
-                    getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);                
-                    throw new IOException(nex.getMessage());                    
+                    getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
+                    throw new IOException(nex.getMessage());
                 } finally {
                     // house-keeping
                     if (replySession != null) {
-                        base.sessionFactory.recycle(replySession);
+                        sessionFactory.recycle(replySession);
                     }
                 }
             } else {
@@ -491,39 +484,36 @@
                 // domain from CXF client - however a mis-behaving pure JMS
                 // client could conceivably make suce an invocation, in which
                 // case we silently discard the reply
-                getLogger().log(Level.WARNING,
-                        "discarding reply for non-oneway invocation ",
-                        "with 'topic' destinationStyle");
-                
-            }        
-            
+                getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ",
+                                "with 'topic' destinationStyle");
+
+            }
+
             getLogger().log(Level.FINE, "just server sending reply: ", reply);
             // Check the reply time limit Stream close will call for this
-            
-           
+
         }
 
         private void sendResponse() throws JMSException {
-            JMSMessageHeadersType headers =
-                (JMSMessageHeadersType) inMessage.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-            javax.jms.Message request = 
-                (javax.jms.Message) inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);   
-            
-            int deliveryMode = base.getJMSDeliveryMode(headers);
-            int priority = base.getJMSPriority(headers);
-            long ttl = base.getTimeToLive(headers);
-            
+            JMSMessageHeadersType headers = (JMSMessageHeadersType)inMessage
+                .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+            javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
+
+            int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
+            int priority = JMSUtils.getJMSPriority(headers);
+            long ttl = JMSUtils.getTimeToLive(headers);
+
             if (ttl <= 0) {
                 ttl = getServerConfig().getMessageTimeToLive();
             }
-            
+
             long timeToLive = 0;
             if (request.getJMSExpiration() > 0) {
                 TimeZone tz = new SimpleTimeZone(0, "GMT");
                 Calendar cal = new GregorianCalendar(tz);
-                timeToLive =  request.getJMSExpiration() - cal.getTimeInMillis();
+                timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
             }
-            
+
             if (timeToLive >= 0) {
                 ttl = ttl > 0 ? ttl : timeToLive;
                 getLogger().log(Level.FINE, "send out the message!");
@@ -531,27 +521,24 @@
             } else {
                 // the request message had dead
                 getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
-            }         
+            }
         }
-           
-        
 
         @Override
         protected void doFlush() throws IOException {
             // TODO Auto-generated method stub
-            
+
         }
 
-        
         @Override
         protected void doClose() throws IOException {
-            
-            commitOutputMessage();        
+
+            commitOutputMessage();
         }
 
         @Override
         protected void onWrite() throws IOException {
-            // Do nothing here        
+            // Do nothing here
         }
 
     }

Copied: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java (from r690626, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java)
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java?p2=cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java&p1=cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java&r1=690626&r2=690638&rev=690638&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java Sat Aug 30 21:49:48 2008
@@ -25,15 +25,14 @@
  * Common accessors between the conduit and destination which are needed for common code.
  *
  */
-public interface JMSTransport {
+public interface JMSOnConnectCallback {
 
     AddressType getJMSAddress();
 
     SessionPoolType getSessionPool();
 
     /**
-     * Callback from the JMSProviderHub indicating the ClientTransport has
-     * been sucessfully connected.
+     * Callback from the JMSProviderHub indicating the ClientTransport has been sucessfully connected.
      *
      * @param targetDestination the target destination
      * @param sessionFactory used to get access to a pooled JMS resources

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java?rev=690638&r1=690637&r2=690638&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java Sat Aug 30 21:49:48 2008
@@ -28,20 +28,16 @@
 import javax.naming.Context;
 import javax.naming.NamingException;
 
-
 /**
- * This class acts as the hub of JMS provider usage, creating shared
- * JMS Connections and providing access to a pool of JMS Sessions.
+ * This class acts as the hub of JMS provider usage, creating shared JMS Connections and providing access to a
+ * pool of JMS Sessions.
  * <p>
- * A new JMS connection is created for each each port based
- * <jms:address> - however its likely that in practice the same JMS
- * provider will be specified for each port, and hence the connection
- * resources could be shared accross ports.
+ * A new JMS connection is created for each each port based <jms:address> - however its likely that in
+ * practice the same JMS provider will be specified for each port, and hence the connection resources could be
+ * shared accross ports.
  * <p>
- * For the moment this class is realized as just a container for
- * static methods, but the intention is to support in future sharing
- * of JMS resources accross compatible ports.
- *
+ * For the moment this class is realized as just a container for static methods, but the intention is to
+ * support in future sharing of JMS resources accross compatible ports.
  */
 public final class JMSProviderHub {
 
@@ -51,111 +47,115 @@
     private JMSProviderHub() {
     }
 
-  
     public String toString() {
         return "JMSProviderHub";
     }
 
-    protected static void connect(JMSTransport jmsTransport) throws JMSException, NamingException {
-        connect(jmsTransport, null, null);
+    protected static void connect(JMSOnConnectCallback onConnectCallback, AddressType addrDetails,
+                                  SessionPoolType sessionPoolConfig) throws JMSException, NamingException {
+        connect(onConnectCallback, addrDetails, sessionPoolConfig, null, null);
     }
-    
-    protected static void connect(JMSTransport jmsTransport, 
-                                  ServerConfig jmsDestConfigBean,
-                                  ServerBehaviorPolicyType runtimePolicy)
-        throws JMSException, NamingException {
-
-        AddressType  addrDetails = jmsTransport.getJMSAddress();
-        boolean isQueue = JMSConstants.JMS_QUEUE.equals(addrDetails.getDestinationStyle().value());
-      
+
+    private static Destination resolveRequestDestination(Context context, Connection connection,
+                                                         AddressType addrDetails) throws JMSException,
+        NamingException {
+        Destination requestDestination = null;
+        try {
+            // see if jndiDestination is set
+            if (addrDetails.getJndiDestinationName() != null) {
+                requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName());
+            }
+
+            // if no jndiDestination or it fails see if jmsDestination is set
+            // and try to create it.
+            if (requestDestination == null && addrDetails.getJmsDestinationName() != null) {
+                if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+                    requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+                        .createQueue(addrDetails.getJmsDestinationName());
+                } else {
+                    requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+                        .createTopic(addrDetails.getJmsDestinationName());
+                }
+            }
+            return requestDestination;
+        } catch (NamingException ne) {
+            // Propogate NamingException.
+            throw ne;
+        }
+    }
+
+    protected static void connect(JMSOnConnectCallback onConnectCallBack, AddressType addrDetails,
+                                  SessionPoolType sessionPoolConfig, ServerConfig jmsDestConfigBean,
+                                  ServerBehaviorPolicyType runtimePolicy) throws JMSException,
+        NamingException {
+
         // get JMS connection resources and destination
         //
         Context context = JMSUtils.getInitialContext(addrDetails);
         Connection connection = null;
-        
-        if (isQueue) {
-            QueueConnectionFactory qcf =
-                (QueueConnectionFactory)context.lookup(addrDetails.getJndiConnectionFactoryName());
+
+        if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+            QueueConnectionFactory qcf = (QueueConnectionFactory)context.lookup(addrDetails
+                .getJndiConnectionFactoryName());
             if (addrDetails.isSetConnectionUserName()) {
-                connection = qcf.createQueueConnection(addrDetails.getConnectionUserName(), 
-                                                       addrDetails.getConnectionPassword());
+                connection = qcf.createQueueConnection(addrDetails.getConnectionUserName(), addrDetails
+                    .getConnectionPassword());
             } else {
                 connection = qcf.createQueueConnection();
             }
         } else {
-            TopicConnectionFactory tcf =
-                (TopicConnectionFactory)context.lookup(addrDetails.getJndiConnectionFactoryName());
+            TopicConnectionFactory tcf = (TopicConnectionFactory)context.lookup(addrDetails
+                .getJndiConnectionFactoryName());
             if (addrDetails.isSetConnectionUserName()) {
-                connection = tcf.createTopicConnection(addrDetails.getConnectionUserName(), 
-                                                       addrDetails.getConnectionPassword());
+                connection = tcf.createTopicConnection(addrDetails.getConnectionUserName(), addrDetails
+                    .getConnectionPassword());
             } else {
                 connection = tcf.createTopicConnection();
             }
         }
-        
+
         if (null != jmsDestConfigBean) {
             String clientID = jmsDestConfigBean.getDurableSubscriptionClientId();
-            
-            if  (clientID != null) {
+
+            if (clientID != null) {
                 connection.setClientID(clientID);
             }
         }
         connection.start();
-
-        Destination requestDestination = null;
-        try {
-            //see if jndiDestination is set
-            if (addrDetails.getJndiDestinationName() != null) {
-                requestDestination = 
-                    (Destination)context.lookup(addrDetails.getJndiDestinationName());    
-            }
-            
-            //if no jndiDestination or it fails see if jmsDestination is set and try to create it.
-            if (requestDestination == null && addrDetails.getJmsDestinationName() != null) {
-                if (isQueue) {
-                    requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
-                        .createQueue(addrDetails.getJmsDestinationName());    
-                } else {
-                    requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
-                        .createTopic(addrDetails.getJmsDestinationName());
-                }
-            }
-            
-            if (requestDestination == null) {
-                //fail to locate or create requestDestination throw Exception.
-                throw new JMSException("Failed to lookup or create requestDestination");
-            }
-            
-        } catch (NamingException ne) {
-            //Propogate NamingException.
-            throw ne;
+        Destination requestDestination = resolveRequestDestination(context, connection, addrDetails);
+        if (requestDestination == null) {
+            // fail to locate or create requestDestination throw Exception.
+            throw new JMSException("Failed to lookup or create requestDestination");
         }
-         
+
+        Destination replyDestination = resolveReplyDestination(addrDetails, context, connection);
+
+        // create session factory to manage session, reply destination,
+        // producer and consumer pooling
+        //
+        JMSSessionFactory sf = new JMSSessionFactory(connection, replyDestination, context, JMSUtils
+            .isDestinationStyleQueue(addrDetails), sessionPoolConfig, runtimePolicy);
+
+        // notify transport that connection is complete
+        onConnectCallBack.connected(requestDestination, replyDestination, sf);
+    }
+
+    private static Destination resolveReplyDestination(AddressType addrDetails, Context context,
+                                                       Connection connection) throws NamingException,
+        JMSException {
         Destination replyDestination = null;
-        
-        //Reply Destination is used (if present) only if the session is point-to-point session 
-        if (isQueue) {
+
+        // Reply Destination is used (if present) only if the session is
+        // point-to-point session
+        if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
             if (addrDetails.getJndiReplyDestinationName() != null) {
                 replyDestination = (Destination)context.lookup(addrDetails.getJndiReplyDestinationName());
-            } 
+            }
             if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) {
                 replyDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
                     .createQueue(addrDetails.getJmsReplyDestinationName());
             }
         }
-
-        // create session factory to manage session, reply destination,
-        // producer and consumer pooling
-        //
-            
-        JMSSessionFactory sf =
-            new JMSSessionFactory(connection,
-                                  replyDestination,
-                                  context,
-                                  jmsTransport,
-                                  runtimePolicy);
-
-        // notify transport that connection is complete        
-        jmsTransport.connected(requestDestination, replyDestination, sf);
+        return replyDestination;
     }
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java?rev=690638&r1=690637&r2=690638&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java Sat Aug 30 21:49:48 2008
@@ -44,114 +44,92 @@
 import org.apache.cxf.common.util.AbstractTwoStageCache;
 
 /**
- * This class encapsulates the creation and pooling logic for JMS Sessions.
- * The usage patterns for sessions, producers & consumers are as follows ...
+ * This class encapsulates the creation and pooling logic for JMS Sessions. The usage patterns for sessions,
+ * producers & consumers are as follows ...
  * <p>
- * client-side: an invoking thread requires relatively short-term exclusive
- * use of a session, an unidentified producer to send the request message,
- * and in the point-to-point domain a consumer for the temporary ReplyTo
- * destination to synchronously receive the reply if the operation is twoway
- * (in the pub-sub domain only oneway operations are supported, so a there
- * is never a requirement for a reply destination)
+ * client-side: an invoking thread requires relatively short-term exclusive use of a session, an unidentified
+ * producer to send the request message, and in the point-to-point domain a consumer for the temporary ReplyTo
+ * destination to synchronously receive the reply if the operation is twoway (in the pub-sub domain only
+ * oneway operations are supported, so a there is never a requirement for a reply destination)
  * <p>
- * server-side receive: each port based on <jms:address> requires relatively
- * long-term exclusive use of a session, a consumer with a MessageListener for
- * the JMS destination specified for the port, and an unidentified producer
- * to send the request message
+ * server-side receive: each port based on <jms:address> requires relatively long-term exclusive use of a
+ * session, a consumer with a MessageListener for the JMS destination specified for the port, and an
+ * unidentified producer to send the request message
  * <p>
- * server-side send: each dispatch of a twoway request requires relatively
- * short-term exclusive use of a session and an indentified producer (but
- * not a consumer) - note that the session used for the recieve side cannot
- * be re-used for the send, as MessageListener usage precludes any synchronous
- * sends or receives on that session
+ * server-side send: each dispatch of a twoway request requires relatively short-term exclusive use of a
+ * session and an indentified producer (but not a consumer) - note that the session used for the recieve side
+ * cannot be re-used for the send, as MessageListener usage precludes any synchronous sends or receives on
+ * that session
  * <p>
- * So on the client-side, pooling of sessions is bound up with pooling
- * of temporary reply destinations, whereas on the server receive side
- * the benefit of pooling is marginal as the session is required from
- * the point at which the port was activated until the Bus is shutdown
- * The server send side resembles the client side,
- * except that a consumer for the temporary destination is never required.
- * Hence different pooling strategies make sense ...
+ * So on the client-side, pooling of sessions is bound up with pooling of temporary reply destinations,
+ * whereas on the server receive side the benefit of pooling is marginal as the session is required from the
+ * point at which the port was activated until the Bus is shutdown The server send side resembles the client
+ * side, except that a consumer for the temporary destination is never required. Hence different pooling
+ * strategies make sense ...
  * <p>
- * client-side: a SoftReference-based cache of send/receive sessions is
- * maintained containing an aggregate of a session, indentified producer,
- * temporary reply destination & consumer for same
+ * client-side: a SoftReference-based cache of send/receive sessions is maintained containing an aggregate of
+ * a session, indentified producer, temporary reply destination & consumer for same
  * <p>
- * server-side receive: as sessions cannot be usefully recycled, they are
- * simply created on demand and closed when no longer required
+ * server-side receive: as sessions cannot be usefully recycled, they are simply created on demand and closed
+ * when no longer required
  * <p>
- * server-side send: a SoftReference-based cache of send-only sessions is
- * maintained containing an aggregate of a session and an indentified producer
+ * server-side send: a SoftReference-based cache of send-only sessions is maintained containing an aggregate
+ * of a session and an indentified producer
  * <p>
- * In a pure client or pure server, only a single cache is ever
- * populated.  Where client and server logic is co-located, a client
- * session retrieval for a twoway invocation checks the reply-capable
- * cache first and then the send-only cache - if a session is
- * available in the later then its used after a tempory destination is
- * created before being recycled back into the reply-capable cache. A
- * server send side retrieval or client retrieval for a oneway
- * invocation checks the send-only cache first and then the
- * reply-capable cache - if a session is available in the later then
- * its used and the tempory destination is ignored. So in the
- * co-located case, sessions migrate from the send-only cache to the
- * reply-capable cache as necessary.
+ * In a pure client or pure server, only a single cache is ever populated. Where client and server logic is
+ * co-located, a client session retrieval for a twoway invocation checks the reply-capable cache first and
+ * then the send-only cache - if a session is available in the later then its used after a tempory destination
+ * is created before being recycled back into the reply-capable cache. A server send side retrieval or client
+ * retrieval for a oneway invocation checks the send-only cache first and then the reply-capable cache - if a
+ * session is available in the later then its used and the tempory destination is ignored. So in the
+ * co-located case, sessions migrate from the send-only cache to the reply-capable cache as necessary.
  * <p>
- *
  */
 public class JMSSessionFactory {
 
     private static final Logger LOG = LogUtils.getL7dLogger(JMSSessionFactory.class);
-    
+
     private int lowWaterMark;
     private int highWaterMark;
 
     private final Context initialContext;
-    private final  Connection theConnection;
+    private final Connection theConnection;
     private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
     private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
     private final Destination theReplyDestination;
-    private final JMSTransport jmsTransport;
     private final ServerBehaviorPolicyType runtimePolicy;
- 
+    private boolean destinationIsQueue;
+
     /**
      * Constructor.
-     *
+     * 
      * @param connection the shared {Queue|Topic}Connection
      */
-    public JMSSessionFactory(Connection connection, 
-                             Destination replyDestination,
-                             Context context,
-                             JMSTransport tbb,
+    public JMSSessionFactory(Connection connection, Destination replyDestination, Context context,
+                             boolean destinationIsQueue, SessionPoolType sessionPoolConfig,
                              ServerBehaviorPolicyType runtimePolicy) {
         theConnection = connection;
         theReplyDestination = replyDestination;
         initialContext = context;
-        jmsTransport = tbb;
         this.runtimePolicy = runtimePolicy;
-        
-        SessionPoolType sessionPoolConfig = jmsTransport.getSessionPool();
-        
+
         lowWaterMark = sessionPoolConfig.getLowWaterMark();
         highWaterMark = sessionPoolConfig.getHighWaterMark();
-         
+        this.destinationIsQueue = destinationIsQueue;
 
         // create session caches (REVISIT sizes should be configurable)
         //
 
-        if (isDestinationStyleQueue()) {
+        if (destinationIsQueue) {
             // the reply capable cache is only required in the point-to-point
             // domain
             //
-            replyCapableSessionCache = 
-                new AbstractTwoStageCache<PooledSession>(
-                    lowWaterMark, 
-                    highWaterMark, 
-                            0, 
-                            this) {
-                    public final PooledSession create() throws JMSException {
-                        return createPointToPointReplyCapableSession();
-                    }
-                };
+            replyCapableSessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark,
+                0, this) {
+                public final PooledSession create() throws JMSException {
+                    return createPointToPointReplyCapableSession();
+                }
+            };
 
             try {
                 replyCapableSessionCache.populateCache();
@@ -161,16 +139,12 @@
 
             // send-only cache for point-to-point oneway requests and replies
             //
-            sendOnlySessionCache = 
-                new AbstractTwoStageCache<PooledSession>(
-                    lowWaterMark, 
-                    highWaterMark, 
-                            0, 
-                            this) {
-                    public final PooledSession create() throws JMSException {
-                        return createPointToPointSendOnlySession();
-                    }
-                };
+            sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0,
+                this) {
+                public final PooledSession create() throws JMSException {
+                    return createPointToPointSendOnlySession();
+                }
+            };
 
             try {
                 sendOnlySessionCache.populateCache();
@@ -180,16 +154,12 @@
         } else {
             // send-only cache for pub-sub oneway requests
             //
-            sendOnlySessionCache = 
-                new AbstractTwoStageCache<PooledSession>(
-                    lowWaterMark, 
-                    highWaterMark, 
-                           0, 
-                           this) {
-                    public final PooledSession create() throws JMSException {
-                        return createPubSubSession(true, false, null);
-                    }
-                };
+            sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0,
+                this) {
+                public final PooledSession create() throws JMSException {
+                    return createPubSubSession(true, false, null);
+                }
+            };
 
             try {
                 sendOnlySessionCache.populateCache();
@@ -199,31 +169,30 @@
         }
     }
 
-    //--java.lang.Object Overrides----------------------------------------------
+    // --java.lang.Object Overrides----------------------------------------------
     public String toString() {
         return "JMSSessionFactory";
     }
 
-
-    //--Methods-----------------------------------------------------------------
+    // --Methods-----------------------------------------------------------------
     protected Connection getConnection() {
         return theConnection;
     }
 
-    public Queue getQueueFromInitialContext(String queueName) 
-        throws NamingException {
-        return (Queue) initialContext.lookup(queueName);
+    public Queue getQueueFromInitialContext(String queueName) throws NamingException {
+        return (Queue)initialContext.lookup(queueName);
     }
 
     public PooledSession get(boolean replyCapable) throws JMSException {
         return get(null, replyCapable);
     }
-    
+
     /**
      * Retrieve a new or cached Session.
+     * 
      * @param replyDest Destination name if coming from wsa:Header
-     * @param replyCapable true iff the session is to be used to receive replies
-     * (implies client side twoway invocation )
+     * @param replyCapable true iff the session is to be used to receive replies (implies client side twoway
+     *                invocation )
      * @return a new or cached Session
      */
     public PooledSession get(Destination replyDest, boolean replyCapable) throws JMSException {
@@ -245,19 +214,19 @@
                         QueueSession session = (QueueSession)ret.session();
                         Queue destination = null;
                         String selector = null;
-                        
+
                         if (null != theReplyDestination || null != replyDest) {
-                            destination = null != replyDest ? (Queue) replyDest : (Queue)theReplyDestination;
-                            
+                            destination = null != replyDest ? (Queue)replyDest : (Queue)theReplyDestination;
+
                             selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
                         }
-                        
+
                         if (destination == null) {
-                            //neither replyDestination not replyDest are present.
+                            // neither replyDestination not replyDest are present.
                             destination = session.createTemporaryQueue();
                             selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
                         }
-                        
+
                         ret.destination(destination);
                         MessageConsumer consumer = session.createReceiver(destination, selector);
                         ret.consumer(consumer);
@@ -309,9 +278,8 @@
 
     /**
      * Retrieve a new
-     *
-     * @param destination the target JMS queue or topic (non-null implies
-     * server receive side)
+     * 
+     * @param destination the target JMS queue or topic (non-null implies server receive side)
      * @return a new or cached Session
      */
     public PooledSession get(Destination destination) throws JMSException {
@@ -320,7 +288,7 @@
         // the destination is only specified on the server receive side,
         // in which case a new session is always created
         //
-        if (isDestinationStyleQueue()) {
+        if (destinationIsQueue) {
             ret = createPointToPointServerSession(destination);
         } else {
             ret = createPubSubSession(false, true, destination);
@@ -331,7 +299,7 @@
 
     /**
      * Return a Session to the pool
-     *
+     * 
      * @param pooled_session the session to recycle
      */
     public void recycle(PooledSession pooledSession) {
@@ -343,8 +311,9 @@
         synchronized (this) {
             // re-cache session, closing if it cannot be it can be accomodated
             //
-            discard = replyCapable ? (!replyCapableSessionCache.recycle(pooledSession))
-                : (!sendOnlySessionCache.recycle(pooledSession));
+            discard = replyCapable
+                ? (!replyCapableSessionCache.recycle(pooledSession)) : (!sendOnlySessionCache
+                    .recycle(pooledSession));
         }
 
         if (discard) {
@@ -356,7 +325,6 @@
         }
     }
 
-
     /**
      * Shutdown the session factory.
      */
@@ -391,79 +359,69 @@
         sendOnlySessionCache = null;
     }
 
-
     /**
      * Helper method to create a point-to-point pooled session.
-     *
+     * 
      * @param producer true iff producing
      * @param consumer true iff consuming
      * @param destination the target destination
      * @return an appropriate pooled session
      */
     PooledSession createPointToPointReplyCapableSession() throws JMSException {
-        QueueSession session =
-            ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
+                                                                                   Session.AUTO_ACKNOWLEDGE);
         Destination destination = null;
         String selector = null;
-        
+
         if (null != theReplyDestination) {
             destination = theReplyDestination;
-            
-            selector =  "JMSCorrelationID = '" + generateUniqueSelector(session) + "'";
-            
-            
+
+            selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'";
+
         } else {
             destination = session.createTemporaryQueue();
         }
-        
+
         MessageConsumer consumer = session.createReceiver((Queue)destination, selector);
-        return new PooledSession(session,
-                                 destination,
-                                 session.createSender(null),
-                                 consumer);
+        return new PooledSession(session, destination, session.createSender(null), consumer);
     }
 
-
     /**
      * Helper method to create a point-to-point pooled session.
-     *
+     * 
      * @return an appropriate pooled session
      */
     PooledSession createPointToPointSendOnlySession() throws JMSException {
-        QueueSession session =
-            ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
+                                                                                   Session.AUTO_ACKNOWLEDGE);
 
         return new PooledSession(session, null, session.createSender(null), null);
     }
 
-
     /**
      * Helper method to create a point-to-point pooled session for consumer only.
-     *
+     * 
      * @param destination the target destination
      * @return an appropriate pooled session
      */
     private PooledSession createPointToPointServerSession(Destination destination) throws JMSException {
-        QueueSession session =
-            ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-        
-        return new PooledSession(session, destination, session.createSender(null),
-                                 session.createReceiver((Queue)destination, 
-                                                        runtimePolicy.getMessageSelector()));
-    }
+        QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
+                                                                                   Session.AUTO_ACKNOWLEDGE);
 
+        return new PooledSession(session, destination, session.createSender(null), session
+            .createReceiver((Queue)destination, runtimePolicy.getMessageSelector()));
+    }
 
     /**
      * Helper method to create a pub-sub pooled session.
-     *
+     * 
      * @param producer true iff producing
      * @param consumer true iff consuming
      * @param destination the target destination
      * @return an appropriate pooled session
      */
-    PooledSession createPubSubSession(boolean producer,
-                                              boolean consumer,
-                                              Destination destination) throws JMSException {
+    PooledSession createPubSubSession(boolean producer, boolean consumer, Destination destination)
+        throws JMSException {
         TopicSession session = ((TopicConnection)theConnection).createTopicSession(false,
                                                                                    Session.AUTO_ACKNOWLEDGE);
         TopicSubscriber sub = null;
@@ -471,23 +429,16 @@
             String messageSelector = runtimePolicy.getMessageSelector();
             String durableName = runtimePolicy.getDurableSubscriberName();
             if (durableName != null) {
-                sub = session.createDurableSubscriber((Topic)destination,
-                                                      durableName,
-                                                      messageSelector,
-                                                      false);
+                sub = session
+                    .createDurableSubscriber((Topic)destination, durableName, messageSelector, false);
             } else {
-                sub = session.createSubscriber((Topic)destination,
-                                               messageSelector,
-                                               false);
+                sub = session.createSubscriber((Topic)destination, messageSelector, false);
             }
         }
 
-        return new PooledSession(session,
-                                 null,
-                                 producer ? session.createPublisher(null) : null,
-                                 sub);
+        return new PooledSession(session, null, producer ? session.createPublisher(null) : null, sub);
     }
-    
+
     private String generateUniqueSelector(Object obj) {
         String host = "localhost";
 
@@ -495,17 +446,10 @@
             InetAddress addr = InetAddress.getLocalHost();
             host = addr.getHostName();
         } catch (UnknownHostException ukex) {
-            //Default to localhost.
+            // Default to localhost.
         }
 
         long time = Calendar.getInstance().getTimeInMillis();
-        return host + "_" 
-            + System.getProperty("user.name") + "_" 
-            + obj + time;    
-    }
-
-    private boolean isDestinationStyleQueue() {
-        return JMSConstants.JMS_QUEUE.equals(
-            jmsTransport.getJMSAddress().getDestinationStyle().value());
+        return host + "_" + System.getProperty("user.name") + "_" + obj + time;
     }
 }