You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by up...@apache.org on 2007/12/06 05:49:57 UTC

svn commit: r601611 - in /webservices/synapse/trunk/java/modules: samples/src/main/java/samples/userguide/ transports/src/main/java/org/apache/synapse/transport/jms/

Author: upul
Date: Wed Dec  5 20:49:55 2007
New Revision: 601611

URL: http://svn.apache.org/viewvc?rev=601611&view=rev
Log:
fixed jms support for queue and topic types using parameters

Modified:
    webservices/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/GenericJMSClient.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConstants.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSOutTransportInfo.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java

Modified: webservices/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/GenericJMSClient.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/GenericJMSClient.java?rev=601611&r1=601610&r2=601611&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/GenericJMSClient.java (original)
+++ webservices/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/GenericJMSClient.java Wed Dec  5 20:49:55 2007
@@ -19,6 +19,7 @@
 
 package samples.userguide;
 
+import org.apache.synapse.transport.jms.JMSConstants;
 import org.apache.synapse.transport.jms.JMSUtils;
 
 import javax.jms.*;
@@ -69,8 +70,8 @@
     private void sendBytesMessage(String destName, byte[] payload) throws Exception {
         InitialContext ic = getInitialContext();
         ConnectionFactory confac = (ConnectionFactory) ic.lookup("ConnectionFactory");
-        Connection connection = JMSUtils.createConnection(confac, null, null);
-        Session session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE);
+        Connection connection = JMSUtils.createConnection(confac, null, null, JMSConstants.DESTINATION_TYPE_QUEUE);
+        Session session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE, JMSConstants.DESTINATION_TYPE_QUEUE);
 
         BytesMessage bm = session.createBytesMessage();
         bm.writeBytes(payload);
@@ -81,8 +82,8 @@
     private void sendTextMessage(String destName, String payload) throws Exception {
         InitialContext ic = getInitialContext();
         ConnectionFactory confac = (ConnectionFactory) ic.lookup("ConnectionFactory");
-        Connection connection = JMSUtils.createConnection(confac, null, null);
-        Session session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE);
+        Connection connection = JMSUtils.createConnection(confac, null, null, JMSConstants.DESTINATION_TYPE_QUEUE);
+        Session session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE, JMSConstants.DESTINATION_TYPE_QUEUE);
 
         TextMessage tm = session.createTextMessage(payload);
         JMSUtils.sendMessageToJMSDestination(session, (Destination) ic.lookup(destName), tm);

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java?rev=601611&r1=601610&r2=601611&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java Wed Dec  5 20:49:55 2007
@@ -82,6 +82,8 @@
     private String connFactoryJNDIName = null;
     /** Map of destination JNDI names to service names */
     private Map serviceJNDINameMapping = null;
+    /** Map of destination JNDI names to destination types*/
+    private Map destinationTypeMapping = null;
     /** Map of JMS destination names to service names */
     private Map serviceDestinationNameMapping = null;
     /** JMS Sessions currently active. One session for each Destination / Service */
@@ -92,6 +94,8 @@
     private Context context = null;
     /** The actual ConnectionFactory instance held within */
     private ConnectionFactory conFactory = null;
+    /** The JMS connection factory type */
+    private String connectionFactoryType = null;
     /** The JMS Connection opened */
     private Connection connection = null;
     /** The JMS Message receiver for this connection factory */
@@ -111,6 +115,7 @@
         this.name = name;
         this.cfgCtx = cfgCtx;
         serviceJNDINameMapping = new HashMap();
+        destinationTypeMapping = new HashMap();
         serviceDestinationNameMapping = new HashMap();
         jndiProperties = new Hashtable();
         jmsSessions = new HashMap();
@@ -123,7 +128,7 @@
      * @param destinationJNDIName destination JNDI name
      * @param serviceName     the service to which it belongs
      */
-    public void addDestination(String destinationJNDIName, String serviceName) {
+    public void addDestination(String destinationJNDIName, String destinationType, String serviceName) {
 
         String destinationName = getPhysicalDestinationName(destinationJNDIName);
 
@@ -133,7 +138,7 @@
             try {
                 log.info("Creating a JMS Queue with the JNDI name : " + destinationJNDIName +
                     " using the connection factory definition named : " + name);
-                JMSUtils.createDestination(conFactory, destinationJNDIName);
+                JMSUtils.createDestination(conFactory, destinationJNDIName, destinationType);
 
                 destinationName = getPhysicalDestinationName(destinationJNDIName);
                 
@@ -148,6 +153,7 @@
         }
 
         serviceJNDINameMapping.put(destinationJNDIName, serviceName);
+        destinationTypeMapping.put(destinationJNDIName, destinationType);
         serviceDestinationNameMapping.put(destinationName, serviceName);
 
         log.info("Mapped JNDI name : " + destinationJNDIName + " and JMS Destination name : " +
@@ -211,12 +217,15 @@
         log.info("Connected to the JMS connection factory : " + connFactoryJNDIName);
 
         try {
+            ConnectionFactory conFac = null;
             QueueConnectionFactory qConFac = null;
             TopicConnectionFactory tConFac = null;
-            if (conFactory instanceof QueueConnectionFactory) {
+            if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(getConnectionFactoryType())) {
                 qConFac = (QueueConnectionFactory) conFactory;
-            } else {
+            } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(getConnectionFactoryType())) {
                 tConFac = (TopicConnectionFactory) conFactory;
+            } else {
+                conFac = conFactory;
             }
 
             String user = (String) jndiProperties.get(Context.SECURITY_PRINCIPAL);
@@ -225,14 +234,18 @@
             if (user != null && pass != null) {
                 if (qConFac != null) {
                     connection = qConFac.createQueueConnection(user, pass);
-                } else {
+                } else if (tConFac != null) {
                     connection = tConFac.createTopicConnection(user, pass);
+                } else {
+                    connection = conFac.createConnection(user, pass);
                 }
             } else {
                 if (qConFac != null) {
                     connection = qConFac.createQueueConnection();
-                } else {
+                } else if (tConFac != null) {
                     connection = tConFac.createTopicConnection();
+                } else {
+                    connection = conFac.createConnection();
                 }
             }
 
@@ -242,7 +255,9 @@
 
         Iterator destJNDINameIter = serviceJNDINameMapping.keySet().iterator();
         while (destJNDINameIter.hasNext()) {
-            startListeningOnDestination((String) destJNDINameIter.next());
+            String destJNDIName = (String) destJNDINameIter.next();
+            String destinationType = (String) destinationTypeMapping.get(destJNDIName);
+            startListeningOnDestination(destJNDIName, destinationType);
         }
 
         connection.start(); // indicate readyness to start receiving messages
@@ -287,7 +302,7 @@
      *
      * @param destinationJNDIname the JMS destination to listen on
      */
-    public void startListeningOnDestination(String destinationJNDIname) {
+    public void startListeningOnDestination(String destinationJNDIname, String destinationType) {
 
         Session session = (Session) jmsSessions.get(destinationJNDIname);
         // if we already had a session open, close it first
@@ -298,7 +313,7 @@
         }
 
         try {
-            session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE);
+            session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE, destinationType);
             Destination destination = null;
 
             try {
@@ -306,7 +321,7 @@
 
             } catch (NameNotFoundException e) {
                 log.warn("Cannot find destination : " + destinationJNDIname + ". Creating a Queue");
-                destination = JMSUtils.createDestination(session, destinationJNDIname);
+                destination = JMSUtils.createDestination(session, destinationJNDIname, destinationType);
             }
 
             MessageConsumer consumer = JMSUtils.createConsumer(session, destination);
@@ -552,5 +567,13 @@
     private void handleException(String msg, Exception e) throws AxisJMSException {
         log.error(msg, e);
         throw new AxisJMSException(msg, e);
+    }
+
+    public String getConnectionFactoryType() {
+      return connectionFactoryType;
+    }
+
+    public void setConnectionFactoryType(String connectionFactoryType) {
+      this.connectionFactoryType = connectionFactoryType;
     }
 }

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConstants.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConstants.java?rev=601611&r1=601610&r2=601611&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConstants.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConstants.java Wed Dec  5 20:49:55 2007
@@ -48,20 +48,39 @@
      */
     public static final String DEST_PARAM = "transport.jms.Destination";
     /**
+     * The Parameter name indicating a JMS destination type for requests. i.e. DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC
+     */
+    public static final String DEST_PARAM_TYPE = "transport.jms.DestinationType";
+    /**
      * The Parameter name indicating the response JMS destination
      */
     public static final String REPLY_PARAM = "transport.jms.ReplyDestination";
     /**
+     * The Parameter name indicating the response JMS destination. i.e. DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC
+     */
+    public static final String REPLY_PARAM_TYPE = "transport.jms.ReplyDestinationType";
+    
+    /**
+     * Values used for DEST_PARAM_TYPE, REPLY_PARAM_TYPE
+     */
+    public static final String DESTINATION_TYPE_QUEUE = "queue";
+    public static final String DESTINATION_TYPE_TOPIC = "topic";
+
+    /**
      * The Parameter name of an Axis2 service, indicating the JMS connection
      * factory which should be used to listen for messages for it. This is
      * the local (Axis2) name of the connection factory and not the JNDI name
      */
     public static final String CONFAC_PARAM = "transport.jms.ConnectionFactory";
     /**
+     * Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC
+     */
+    public static final String CONFAC_TYPE = "transport.jms.ConnectionFactoryType";
+    /**
      * The Parameter name indicating the JMS connection factory JNDI name
      */
     public static final String CONFAC_JNDI_NAME_PARAM = "transport.jms.ConnectionFactoryJNDIName";
-
+    
 
     //------------ message context / transport header properties and client options ------------
     /**

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java?rev=601611&r1=601610&r2=601611&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java Wed Dec  5 20:49:55 2007
@@ -160,10 +160,12 @@
         String destinationName = JMSUtils.getJNDIDestinationNameForService(service);
         serviceNameToEPRMap.put(service.getName(), JMSUtils.getEPR(cf, destinationName));
 
-        log.info("Starting to listen on destination : " + destinationName +
-            " for service " + service.getName());
-        cf.addDestination(destinationName, service.getName());
-        cf.startListeningOnDestination(destinationName);
+        String destinationType = JMSUtils.getDestinationTypeForService(service);
+        
+        log.info("Starting to listen on destination : " + destinationName + " of type " + destinationType
+            + " for service " + service.getName());
+        cf.addDestination(destinationName, destinationType, service.getName());
+        cf.startListeningOnDestination(destinationName, destinationType);
     }
 
     /**

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSOutTransportInfo.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSOutTransportInfo.java?rev=601611&r1=601610&r2=601611&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSOutTransportInfo.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSOutTransportInfo.java Wed Dec  5 20:49:55 2007
@@ -49,6 +49,8 @@
     private JMSConnectionFactory jmsConnectionFactory = null;
     /** the Destination queue or topic for the outgoing message */
     private Destination destination = null;
+    /** the Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
+    private String destinationType = null;
     /** the EPR properties when the out-transport info is generated from a target EPR */
     private Hashtable properties = null;
     /** the target EPR string where applicable */
@@ -88,6 +90,8 @@
             handleException("Invalid prefix for a JMS EPR : " + targetEPR);
         } else {
             properties = JMSUtils.getProperties(targetEPR);
+            String destinationType = (String) properties.get(JMSConstants.DEST_PARAM_TYPE);
+            setDestinationType(destinationType);
         }
     }
 
@@ -203,4 +207,13 @@
     public String getTargetEPR() {
         return targetEPR;
     }
+
+    public String getDestinationType() {
+      return destinationType;
+    }
+
+    public void setDestinationType(String destinationType) {
+      this.destinationType = destinationType;
+    }
+    
 }

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java?rev=601611&r1=601610&r2=601611&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java Wed Dec  5 20:49:55 2007
@@ -127,32 +127,42 @@
 
                         QueueConnectionFactory qConFac = null;
                         TopicConnectionFactory tConFac = null;
-                        if (jmsOut.getConnectionFactory() instanceof QueueConnectionFactory) {
+                        ConnectionFactory conFac = null;
+                        
+                        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
                             qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
-                        } else {
+                        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
                             tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
+                        } else {
+                            conFac = (ConnectionFactory) jmsOut.getConnectionFactory();
                         }
 
                         if (user != null && pass != null) {
                             if (qConFac != null) {
                                 connection = qConFac.createQueueConnection(user, pass);
-                            } else {
+                            } else if (tConFac != null) {
                                 connection = tConFac.createTopicConnection(user, pass);
+                            } else {
+                                connection = conFac.createConnection(user, pass);
                             }
                         } else {
                            if (qConFac != null) {
                                 connection = qConFac.createQueueConnection();
-                            } else {
+                            } else if (tConFac != null) {
                                 connection = tConFac.createTopicConnection();
+                            } else {
+                                connection = conFac.createConnection();
                             }
                         }
 
-                        if (qConFac != null) {
+                        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
                             session = ((QueueConnection)connection).
                                 createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-                        } else {
+                        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
                             session = ((TopicConnection)connection).
                                 createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+                        } else {
+                            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                         }
 
                     } catch (JMSException e) {
@@ -194,9 +204,11 @@
                     handleException("Error creating a JMS message from the axis message context", e);
                 }
 
+                String destinationType = jmsOut.getDestinationType();
+                
                 // if the destination does not exist, see if we can create it
                 destination = JMSUtils.createDestinationIfRequired(
-                    destination, targetAddress, session);
+                    destination, destinationType, targetAddress, session);
 
                 // should we wait for a synchronous response on this same thread?
                 boolean waitForResponse = waitForSynchronousResponse(msgCtx);

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java?rev=601611&r1=601610&r2=601611&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java Wed Dec  5 20:49:55 2007
@@ -148,6 +148,28 @@
     }
 
     /**
+     * Get the JMS destination type of this service
+     *
+     * @param service the Axis Service
+     * @return the name of the JMS destination
+     */
+    public static String getDestinationTypeForService(AxisService service) {
+        Parameter destTypeParam = service.getParameter(JMSConstants.DEST_PARAM_TYPE);
+        if (destTypeParam != null) {
+            String paramValue = (String) destTypeParam.getValue();
+            if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) )  {
+                return paramValue;
+            } else {
+               handleException("Invalid destinaton type value " + paramValue);
+               return null;
+            }
+        } else {
+            log.debug("JMS destination type not given. default queue");
+            return JMSConstants.DESTINATION_TYPE_QUEUE;
+        }
+    }
+    
+    /**
      * Extract connection factory properties from a given URL
      *
      * @param url a JMS URL of the form jms:/<destination>?[<key>=<value>&]*
@@ -248,7 +270,10 @@
 
             Parameter p = (Parameter) params.next();
 
-            if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) {
+            if (JMSConstants.CONFAC_TYPE.equals(p.getName())) {
+                String connectionFactoryType = (String) p.getValue();
+                jmsConFactory.setConnectionFactoryType(connectionFactoryType);
+            } else if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) {
                 jmsConFactory.addJNDIContextProperty(
                     Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue());
             } else if (Context.PROVIDER_URL.equals(p.getName())) {
@@ -357,7 +382,7 @@
      * @return the JMS Destination where messages could be posted
      * @throws AxisFault if the target Destination does not exist and cannot be created
      */
-    public static Destination createDestinationIfRequired(Destination destination,
+    public static Destination createDestinationIfRequired(Destination destination, String destinationType,
         String targetAddress, Session session) throws AxisFault {
         if (destination == null) {
             if (targetAddress != null) {
@@ -367,7 +392,7 @@
                 }
 
                 try {
-                    destination = createDestination(session, name);
+                    destination = createDestination(session, name, destinationType);
                 } catch (JMSException e) {
                     handleException("Error creating destination Queue : " + name, e);
                 }
@@ -653,51 +678,65 @@
 
     // ----------- JMS 1.0.2b compatibility methods -------------
     public static Connection createConnection(
-        ConnectionFactory conFactory, String user, String pass) throws JMSException {
+        ConnectionFactory conFactory, String user, String pass, String destinationType) throws JMSException {
 
-        if (conFactory instanceof QueueConnectionFactory) {
+        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) {
             if (user != null && pass != null) {
                 return ((QueueConnectionFactory) conFactory).createQueueConnection(user, pass);
             } else {
                 return ((QueueConnectionFactory) conFactory).createQueueConnection();
             }
-        } else {
+            
+        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
             if (user != null && pass != null) {
                 return ((TopicConnectionFactory) conFactory).createTopicConnection(user, pass);
             } else {
                 return ((TopicConnectionFactory) conFactory).createTopicConnection();
             }
+            
+        } else {
+            if (user != null && pass != null) {
+                return ((ConnectionFactory) conFactory).createConnection(user, pass);
+            } else {
+                return ((ConnectionFactory) conFactory).createConnection();
+            }
         }
     }
 
     public static Session createSession(Connection con,
-        boolean transacted, int acknowledgeMode) throws JMSException {
+        boolean transacted, int acknowledgeMode, String destinationType) throws JMSException {
 
-        if (con instanceof QueueConnection) {
+        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) {
             return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
-        } else {
+        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
             return ((TopicConnection) con).createTopicSession(transacted, acknowledgeMode);
+        } else {
+            log.debug("JMS destination type not given or invalid. default queue. was " + destinationType);
+            return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
         }
     }
 
-    public static Destination createDestination(Session session, String destName)
+    public static Destination createDestination(Session session, String destName, String destinationType)
         throws JMSException {
 
-        if (session instanceof QueueSession) {
+        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
             return ((QueueSession) session).createQueue(destName);
-        } else {
+        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
             return ((TopicSession) session).createTopic(destName);
+        } else {
+            log.debug("JMS destination type not given or invalid. default queue. was " + destinationType);
+            return ((QueueSession) session).createQueue(destName);          
         }
     }
 
     public static void createDestination(ConnectionFactory conFactory,
-        String destinationJNDIName) throws JMSException {
+        String destinationJNDIName, String destinationType) throws JMSException {
 
-        if (conFactory instanceof QueueConnectionFactory) {
+        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
             JMSUtils.createJMSQueue(
                 ((QueueConnectionFactory) conFactory).createQueueConnection(),
                 destinationJNDIName);
-        } else {
+        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
             JMSUtils.createJMSTopic(
                 ((TopicConnectionFactory) conFactory).createTopicConnection(),
                 destinationJNDIName);



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org