You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/09/12 08:16:29 UTC

svn commit: r574793 - in /webservices/synapse/trunk/java/modules: core/src/main/java/org/apache/synapse/mediators/builtin/ transports/src/main/java/org/apache/axis2/transport/base/ transports/src/main/java/org/apache/axis2/transport/jms/

Author: asankha
Date: Tue Sep 11 23:16:25 2007
New Revision: 574793

URL: http://svn.apache.org/viewvc?rev=574793&view=rev
Log:
More JMS 1.0.2b support conversions from 1.1 and minor fixes
Also use a message selector on the JMSCorrelationID when waiting for a response on a destination

Modified:
    webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java

Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java Tue Sep 11 23:16:25 2007
@@ -100,7 +100,7 @@
                 Axis2MessageContext axis2smc = (Axis2MessageContext) smc;
                 org.apache.axis2.context.MessageContext axis2MessageCtx =
                         axis2smc.getAxis2MessageContext();
-                axis2MessageCtx.setProperty(name, resultValue);
+                axis2MessageCtx.getOptions().setProperty(name, resultValue);
 
             } else if (Constants.SCOPE_TRANSPORT.equals(scope)
                     && smc instanceof Axis2MessageContext) {

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java Tue Sep 11 23:16:25 2007
@@ -203,7 +203,6 @@
                 }
 
             } catch (Exception e) {
-                e.printStackTrace();
                 envelope = handleLegacyMessage(msgContext, message);
             }
         }
@@ -257,7 +256,7 @@
             OMTextImpl textData = (OMTextImpl) soapFactory.createOMText(textPayload);
 
             if (wrapperQName == null) {
-                wrapperQName = BaseConstants.DEFAULT_BINARY_WRAPPER;
+                wrapperQName = BaseConstants.DEFAULT_TEXT_WRAPPER;
             }
             wrapper = soapFactory.createOMElement(wrapperQName, null);
             wrapper.addChild(textData);

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java Tue Sep 11 23:16:25 2007
@@ -133,7 +133,7 @@
             try {
                 log.info("Creating a JMS Queue with the JNDI name : " + destinationJNDIName +
                     " using the connection factory definition named : " + name);
-                JMSUtils.createJMSQueue(conFactory.createConnection(), destinationJNDIName);
+                JMSUtils.createDestination(conFactory, destinationJNDIName);
 
                 destinationName = getPhysicalDestinationName(destinationJNDIName);
                 
@@ -298,7 +298,7 @@
         }
 
         try {
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE);
             Destination destination = null;
 
             try {
@@ -306,10 +306,10 @@
 
             } catch (NameNotFoundException e) {
                 log.warn("Cannot find destination : " + destinationJNDIname + ". Creating a Queue");
-                destination = session.createQueue(destinationJNDIname);
+                destination = JMSUtils.createDestination(session, destinationJNDIname);
             }
 
-            MessageConsumer consumer = session.createConsumer(destination);
+            MessageConsumer consumer = JMSUtils.createConsumer(session, destination);
             consumer.setMessageListener(jmsMessageReceiver);
             jmsSessions.put(destinationJNDIname, session);
 

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Tue Sep 11 23:16:25 2007
@@ -19,6 +19,7 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMText;
 import org.apache.axiom.om.OMNode;
+import org.apache.axiom.om.util.UUIDGenerator;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.ConfigurationContext;
@@ -34,6 +35,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import javax.jms.*;
+import javax.jms.Queue;
 import javax.activation.DataHandler;
 import javax.naming.Context;
 import javax.naming.NamingException;
@@ -169,9 +171,13 @@
                 destination = jmsOut.getDestination();
             }
 
-            String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY);
+            String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
             if (replyDestName != null) {
-                replyDestination = jmsOut.getReplyDestination(replyDestName);
+                if (jmsConnectionFactory != null) {
+                    replyDestination = jmsConnectionFactory.getDestination(replyDestName);
+                } else {
+                    replyDestination = jmsOut.getReplyDestination(replyDestName);
+                }
             }
 
             // now we are going to use the JMS session, but if this was a session from a
@@ -181,6 +187,7 @@
 
                 // convert the axis message context into a JMS Message that we can send over JMS
                 Message message = null;
+                String correlationId = null;
                 try {
                     message = createJMSMessage(msgCtx, session);
                 } catch (JMSException e) {
@@ -198,6 +205,14 @@
                 if (waitForResponse) {
                     replyDestination = JMSUtils.setReplyDestination(
                         replyDestination, session, message);
+                    // force the use of a JMS correlation ID if synchronous
+                    try {
+                        correlationId = message.getJMSCorrelationID();
+                        if (correlationId == null) {
+                            correlationId = UUIDGenerator.getUUID();
+                            message.setJMSCorrelationID(correlationId);
+                        }
+                    } catch (JMSException ignore) {}
                 }
 
                 // send the outgoing message over JMS to the destination selected
@@ -205,7 +220,7 @@
 
                 // if we are expecting a synchronous response back for the message sent out
                 if (waitForResponse) {
-                    waitForResponseAndProcess(session, replyDestination, msgCtx);
+                    waitForResponseAndProcess(session, replyDestination, msgCtx, correlationId);
                 }
             }
 
@@ -228,9 +243,25 @@
      * @throws AxisFault on error
      */
     private void waitForResponseAndProcess(Session session, Destination replyDestination,
-        MessageContext msgCtx) throws AxisFault {
+        MessageContext msgCtx, String correlationId) throws AxisFault {
+
         try {
-            MessageConsumer consumer = session.createConsumer(replyDestination);
+            MessageConsumer consumer = null;
+            if (replyDestination instanceof Queue) {
+                if (correlationId != null) {
+                    consumer = ((QueueSession) session).createReceiver((Queue) replyDestination,
+                        "JMSCorrelationID = '" + correlationId + "'");
+                } else {
+                    consumer = ((QueueSession) session).createReceiver((Queue) replyDestination);
+                }
+            } else {
+                if (correlationId != null) {
+                    consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination,
+                        correlationId, false);
+                } else {
+                    consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination);
+                }
+            }
 
             // how long are we willing to wait for the sync response
             long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
@@ -241,7 +272,8 @@
 
             if (log.isDebugEnabled()) {
                 log.debug("Waiting for a maximum of " + timeout +
-                    "ms for a response message to destination : " + replyDestination);
+                    "ms for a response message to destination : " + replyDestination +
+                    " with JMS correlation ID : " + correlationId);
             }
 
             Message reply = consumer.receive(timeout);
@@ -250,7 +282,8 @@
 
             } else {
                 log.warn("Did not receive a JMS response within " +
-                    timeout + " ms to destination : " + replyDestination);
+                    timeout + " ms to destination : " + replyDestination +
+                    " with JMS correlation ID : " + correlationId);
             }
 
         } catch (JMSException e) {

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=574793&r1=574792&r2=574793&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Tue Sep 11 23:16:25 2007
@@ -50,6 +50,7 @@
 import javax.naming.Context;
 import java.io.*;
 import java.util.*;
+import java.nio.ByteBuffer;
 
 /**
  * Miscallaneous methods used for the JMS transport
@@ -75,9 +76,9 @@
      */
     public static String createJMSQueue(Connection con, String destinationJNDIName) throws JMSException {
         try {
-            Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            QueueSession session = ((QueueConnection) con).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
             Queue queue = session.createQueue(destinationJNDIName);
-            log.info("JMS Destination with JNDI name : " + destinationJNDIName + " created");
+            log.info("JMS Queue with JNDI name : " + destinationJNDIName + " created");
             return queue.getQueueName();
 
         } finally {
@@ -88,6 +89,29 @@
     }
 
     /**
+     * Create a JMS Topic using the given connection with the JNDI destination name, and return the
+     * JMS Destination name of the created queue
+     *
+     * @param con the JMS Connection to be used
+     * @param destinationJNDIName the JNDI name of the Topic to be created
+     * @return the JMS Destination name of the created Topic
+     * @throws JMSException on error
+     */
+    public static String createJMSTopic(Connection con, String destinationJNDIName) throws JMSException {
+        try {
+            TopicSession session = ((TopicConnection) con).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic topic = session.createTopic(destinationJNDIName);
+            log.info("JMS Topic with JNDI name : " + destinationJNDIName + " created");
+            return topic.getTopicName();
+
+        } finally {
+            try {
+                con.close();
+            } catch (JMSException ignore) {}
+        }
+    }
+
+    /**
      * Should this service be enabled over the JMS transport?
      *
      * @param service the Axis service
@@ -301,7 +325,7 @@
         if (replyDestination == null) {
            try {
                // create temporary queue to receive the reply
-               replyDestination = session.createTemporaryQueue();
+               replyDestination = createTemporaryDestination(session);
            } catch (JMSException e) {
                handleException("Error creating temporary queue for response");
            }
@@ -343,7 +367,7 @@
                 }
 
                 try {
-                    destination = session.createQueue(name);
+                    destination = createDestination(session, name);
                 } catch (JMSException e) {
                     handleException("Error creating destination Queue : " + name, e);
                 }
@@ -604,18 +628,79 @@
     }
 
     public byte[] getMessageBinaryPayload(Object message) {
+
         if (message instanceof BytesMessage) {
             BytesMessage bytesMessage = (BytesMessage) message;
-            byte[] msgBytes;
+            ByteBuffer msgBytes = ByteBuffer.allocate(1024);
             try {
-                msgBytes = new byte[(int) bytesMessage.getBodyLength()];
-                bytesMessage.reset();
-                bytesMessage.readBytes(msgBytes);
+                while (true) {
+                    byte[] temp = new byte[1024];
+                    int read = bytesMessage.readBytes(temp);
+                    if (read > 0) {
+                        msgBytes.put(temp, 0, read);
+                    } else {
+                        msgBytes.flip();
+                        return msgBytes.array();
+                    }
+                }
 
             } catch (JMSException e) {
                 handleException("Error reading JMS binary message payload", e);
             }
         }
         return null;
+    }
+
+    // ----------- JMS 1.0.2b compatibility methods -------------
+    public static Session createSession(Connection con,
+        boolean transacted, int acknowledgeMode) throws JMSException {
+
+        if (con instanceof QueueConnection) {
+            return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
+        } else {
+            return ((TopicConnection) con).createTopicSession(transacted, acknowledgeMode);
+        }
+    }
+
+    public static Destination createDestination(Session session, String destName)
+        throws JMSException {
+
+        if (session instanceof QueueSession) {
+            return ((QueueSession) session).createQueue(destName);
+        } else {
+            return ((TopicSession) session).createTopic(destName);
+        }
+    }
+
+    public static void createDestination(ConnectionFactory conFactory,
+        String destinationJNDIName) throws JMSException {
+
+        if (conFactory instanceof QueueConnectionFactory) {
+            JMSUtils.createJMSQueue(
+                ((QueueConnectionFactory) conFactory).createQueueConnection(),
+                destinationJNDIName);
+        } else {
+            JMSUtils.createJMSTopic(
+                ((TopicConnectionFactory) conFactory).createTopicConnection(),
+                destinationJNDIName);
+        }
+    }
+    public static MessageConsumer createConsumer(Session session, Destination dest)
+        throws JMSException {
+
+        if (dest instanceof Queue) {
+            return ((QueueSession) session).createReceiver((Queue) dest);
+        } else {
+            return ((TopicSession) session).createSubscriber((Topic) dest);
+        }
+    }
+
+    public static Destination createTemporaryDestination(Session session) throws JMSException {
+
+        if (session instanceof QueueSession) {
+            return ((QueueSession) session).createTemporaryQueue();
+        } else {
+            return ((TopicSession) session).createTemporaryTopic();
+        }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org