You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/12 10:32:08 UTC

svn commit: r713330 - in /servicemix/components/bindings/servicemix-jms/trunk: ./ src/main/java/org/apache/servicemix/jms/endpoints/ src/test/java/org/apache/servicemix/jms/

Author: gnodet
Date: Wed Nov 12 01:32:04 2008
New Revision: 713330

URL: http://svn.apache.org/viewvc?rev=713330&view=rev
Log:
SM-1680, SM-1681, SM-1682: handle attachments, faults and errors on the new JMS endpoints

Added:
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Modified:
    servicemix/components/bindings/servicemix-jms/trunk/pom.xml
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/pom.xml?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/pom.xml (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/pom.xml Wed Nov 12 01:32:04 2008
@@ -155,12 +155,24 @@
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-javamail_1.3.1_spec</artifactId>
-      <version>1.2</version>
-      <scope>provided</scope>
-    </dependency>
+      <dependency>
+        <groupId>org.apache.geronimo.specs</groupId>
+        <artifactId>geronimo-javamail_1.4_spec</artifactId>
+        <version>1.5</version>
+        <scope>provided</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.geronimo.javamail</groupId>
+        <artifactId>geronimo-javamail_1.4_mail</artifactId>
+        <version>1.5</version>
+        <scope>provided</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.geronimo.javamail</groupId>
+        <artifactId>geronimo-javamail_1.4_provider</artifactId>
+        <version>1.5</version>
+        <scope>provided</scope>
+      </dependency>
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-jms_1.1_spec</artifactId>
@@ -181,7 +193,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
+      <artifactId>geronimo-jta_1.1_spec</artifactId>
       <version>1.1</version>
       <scope>provided</scope>
     </dependency>
@@ -254,18 +266,6 @@
     </dependency>
     <!-- for unit/integration testing -->
     <dependency>
-      <groupId>org.apache.servicemix</groupId>
-      <artifactId>servicemix-core</artifactId>
-      <version>${servicemix-version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.springframework</groupId>
-          <artifactId>spring-jmx</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-context-support</artifactId>
       <version>${spring-version}</version>

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Wed Nov 12 01:32:04 2008
@@ -426,8 +426,8 @@
     }
 
     protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
-        // Ignore DONE exchanges
-        if (exchange.getStatus() == ExchangeStatus.DONE) {
+        // Ignore InOnly exchanges which are currently handled in fire-and-forget mode
+        if (exchange instanceof InOnly) {
             return;
         }
         // Create session if needed
@@ -475,6 +475,10 @@
             dest = getReplyDestination(exchange, error, session, context);
             setCorrelationId(context.getMessage(), msg);
             send(msg, session, dest);
+        } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+            msg = session.createMessage();
+            msg.setBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY, true);
+            send(msg, session, dest);
         } else {
             throw new IllegalStateException("Unrecognized exchange status");
         }
@@ -541,9 +545,7 @@
                 } catch (Exception e) {
                     handleException(exchange, e, session, context);
                 }
-                if (exchange.getStatus() != ExchangeStatus.DONE) {
-                    processExchange(exchange, session, context);
-                }
+                processExchange(exchange, session, context);
             } else {
                 if (stateless) {
                     exchange.setProperty(PROP_JMS_CONTEXT, context);

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java Wed Nov 12 01:32:04 2008
@@ -33,6 +33,12 @@
  */
 public abstract class AbstractJmsMarshaler {
 
+    public static final String DONE_JMS_PROPERTY = "JBIDone";
+
+    public static final String FAULT_JMS_PROPERTY = "JBIFault";
+
+    public static final String ERROR_JMS_PROPERTY = "JBIError";
+
     /**
      * Should marshaler copy properties set in messages?
      */
@@ -111,7 +117,7 @@
      * @param value the property value
      * @return true if it should be copied
      */
-    private boolean shouldIncludeHeader(String name, Object value) {
+    protected boolean shouldIncludeHeader(String name, Object value) {
         return (value instanceof String || value instanceof Number || value instanceof Date)
                && (!isNeedJavaIdentifiers() || isJavaIdentifier(name));
     }

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java Wed Nov 12 01:32:04 2008
@@ -20,7 +20,13 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
 import java.net.URI;
+import java.util.Map;
+import java.util.Set;
 
 import javax.jbi.component.ComponentContext;
 import javax.jbi.messaging.Fault;
@@ -29,11 +35,22 @@
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.ObjectMessage;
 import javax.xml.transform.Source;
+import javax.xml.stream.XMLStreamReader;
+import javax.activation.DataHandler;
 
 import org.apache.servicemix.common.JbiConstants;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.soap.core.PhaseInterceptorChain;
+import org.apache.servicemix.soap.core.MessageImpl;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsInInterceptor;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxInInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.BodyOutInterceptor;
+import org.apache.servicemix.soap.util.stax.StaxSource;
 
 public class DefaultConsumerMarshaler extends AbstractJmsMarshaler implements JmsConsumerMarshaler {
     
@@ -78,28 +95,73 @@
     }
 
     public Message createOut(MessageExchange exchange, NormalizedMessage outMsg, Session session, JmsContext context) throws Exception {
-        String text = new SourceTransformer().contentToString(outMsg);
-        TextMessage textMessage = session.createTextMessage(text);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PhaseInterceptorChain chain = new PhaseInterceptorChain();
+        chain.add(new AttachmentsOutInterceptor());
+        chain.add(new StaxOutInterceptor());
+        chain.add(new BodyOutInterceptor());
+        org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+        msg.setContent(Source.class, outMsg.getContent());
+        msg.setContent(OutputStream.class, baos);
+        for (String attId : (Set<String>) outMsg.getAttachmentNames()) {
+            msg. getAttachments().put(attId, outMsg.getAttachment(attId));
+        }
+        chain.doIntercept(msg);
+        TextMessage text = session.createTextMessage(baos.toString());
+        text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+                               (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
         if (isCopyProperties()) {
-            copyPropertiesFromNM(outMsg, textMessage);
+            copyPropertiesFromNM(outMsg, text);
         }
-        return textMessage;
+        return text;
     }
 
     public Message createFault(MessageExchange exchange, Fault fault, Session session, JmsContext context) throws Exception {
-        String text = new SourceTransformer().contentToString(fault);
-        return session.createTextMessage(text);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PhaseInterceptorChain chain = new PhaseInterceptorChain();
+        chain.add(new AttachmentsOutInterceptor());
+        chain.add(new StaxOutInterceptor());
+        chain.add(new BodyOutInterceptor());
+        org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+        msg.setContent(Source.class, fault.getContent());
+        msg.setContent(OutputStream.class, baos);
+        for (String attId : (Set<String>) fault.getAttachmentNames()) {
+            msg. getAttachments().put(attId, fault.getAttachment(attId));
+        }
+        chain.doIntercept(msg);
+        TextMessage text = session.createTextMessage(baos.toString());
+        text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+                               (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
+        text.setBooleanProperty(FAULT_JMS_PROPERTY, true);
+        if (isCopyProperties()) {
+            copyPropertiesFromNM(fault, text);
+        }
+        return text;
     }
 
     public Message createError(MessageExchange exchange, Exception error, Session session, JmsContext context) throws Exception {
-        throw error;
+        ObjectMessage message = session.createObjectMessage(error);
+        message.setBooleanProperty(ERROR_JMS_PROPERTY, true);
+        return message;
     }
 
     protected void populateMessage(Message message, NormalizedMessage normalizedMessage) throws Exception {
         if (message instanceof TextMessage) {
-            TextMessage textMessage = (TextMessage) message;
-            Source source = new StringSource(textMessage.getText());
-            normalizedMessage.setContent(source);
+            PhaseInterceptorChain chain = new PhaseInterceptorChain();
+            chain.add(new AttachmentsInInterceptor());
+            chain.add(new StaxInInterceptor());
+            org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+            msg.setContent(InputStream.class, new ByteArrayInputStream(((TextMessage) message).getText().getBytes()));
+            String contentType = message.getStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE);
+            if (contentType != null) {
+                msg.put(org.apache.servicemix.soap.api.Message.CONTENT_TYPE, contentType);
+            }
+            chain.doIntercept(msg);
+            XMLStreamReader xmlReader = msg.getContent(XMLStreamReader.class);
+            normalizedMessage.setContent(new StaxSource(xmlReader));
+            for (Map.Entry<String, DataHandler> attachment : msg.getAttachments().entrySet()) {
+                normalizedMessage.addAttachment(attachment.getKey(), attachment.getValue());
+            }
         } else {
             throw new UnsupportedOperationException("JMS message is not a TextMessage");
         }

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java Wed Nov 12 01:32:04 2008
@@ -17,6 +17,12 @@
 package org.apache.servicemix.jms.endpoints;
 
 import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
 
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
@@ -24,9 +30,19 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.xml.transform.Source;
+import javax.xml.stream.XMLStreamReader;
+import javax.activation.DataHandler;
 
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.soap.core.PhaseInterceptorChain;
+import org.apache.servicemix.soap.core.MessageImpl;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsInInterceptor;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxInInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.BodyOutInterceptor;
+import org.apache.servicemix.soap.util.stax.StaxSource;
 
 public class DefaultProviderMarshaler extends AbstractJmsMarshaler implements
     JmsProviderMarshaler {
@@ -49,8 +65,21 @@
     }
 
     public Message createMessage(MessageExchange exchange, NormalizedMessage in, Session session) throws Exception {
-        TextMessage text = session.createTextMessage();
-        text.setText(transformer.contentToString(in));
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PhaseInterceptorChain chain = new PhaseInterceptorChain();
+        chain.add(new AttachmentsOutInterceptor());
+        chain.add(new StaxOutInterceptor());
+        chain.add(new BodyOutInterceptor());
+        org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+        msg.setContent(Source.class, in.getContent());
+        msg.setContent(OutputStream.class, baos);
+        for (String attId : (Set<String>) in.getAttachmentNames()) {
+            msg. getAttachments().put(attId, in.getAttachment(attId));
+        }
+        chain.doIntercept(msg);
+        TextMessage text = session.createTextMessage(baos.toString());
+        text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+                               (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
         if (jmsProperties != null) {
             for (Map.Entry<String, Object> e : jmsProperties.entrySet()) {
                 text.setObjectProperty(e.getKey(), e.getValue());
@@ -66,12 +95,23 @@
 
     public void populateMessage(Message message, MessageExchange exchange, NormalizedMessage normalizedMessage) throws Exception {
         if (message instanceof TextMessage) {
-            TextMessage textMessage = (TextMessage) message;
-            Source source = new StringSource(textMessage.getText());
-            normalizedMessage.setContent(source);
-
+            PhaseInterceptorChain chain = new PhaseInterceptorChain();
+            chain.add(new AttachmentsInInterceptor());
+            chain.add(new StaxInInterceptor());
+            org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+            msg.setContent(InputStream.class, new ByteArrayInputStream(((TextMessage) message).getText().getBytes()));
+            String contentType = message.getStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE);
+            if (contentType != null) {
+                msg.put(org.apache.servicemix.soap.api.Message.CONTENT_TYPE, contentType);
+            }
+            chain.doIntercept(msg);
+            XMLStreamReader xmlReader = msg.getContent(XMLStreamReader.class);
+            normalizedMessage.setContent(new StaxSource(xmlReader));
+            for (Map.Entry<String, DataHandler> attachment : msg.getAttachments().entrySet()) {
+                normalizedMessage.addAttachment(attachment.getKey(), attachment.getValue());
+            }
             if (isCopyProperties()) {
-                copyPropertiesFromJMS(textMessage, normalizedMessage);
+                copyPropertiesFromJMS(message, normalizedMessage);
             }
         } else {
             throw new UnsupportedOperationException("JMS message is not a TextMessage");

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Wed Nov 12 01:32:04 2008
@@ -23,6 +23,7 @@
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.RobustInOnly;
 import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.Fault;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -30,6 +31,7 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
 
 import org.apache.servicemix.common.endpoints.ProviderEndpoint;
 import org.apache.servicemix.common.JbiConstants;
@@ -239,7 +241,7 @@
     * Specifies if the QoS values specified for the endpoint are explicitly 
     * used when a messages is sent. The default is <code>false</code>.
     *
-     * @param replyExplicitQosEnabled should the QoS values be sent?
+     * @param explicitQosEnabled should the QoS values be sent?
      */
     public void setExplicitQosEnabled(boolean explicitQosEnabled) {
         this.explicitQosEnabled = explicitQosEnabled;
@@ -476,7 +478,7 @@
                     done(exchange);
                 // In message
                 } else if ((in = exchange.getMessage("in")) != null) {
-                    if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
+                    if (exchange instanceof InOnly) {
                         processInOnly(exchange, in);
                         done(exchange);
                     }
@@ -631,14 +633,30 @@
             if (receiveJmsMsg == null) {
                 throw new IllegalStateException("Unable to receive response");
             }
-
-            NormalizedMessage out = exchange.getMessage("out");
-            if (out == null) {
-                out = exchange.createMessage();
-                exchange.setMessage(out, "out");
+            if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY)) {
+                exchange.setStatus(ExchangeStatus.DONE);
+            } else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.ERROR_JMS_PROPERTY)) {
+                Exception e = (Exception) ((ObjectMessage) receiveJmsMsg).getObject();
+                exchange.setError(e);
+                exchange.setStatus(ExchangeStatus.ERROR);
+            } else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.FAULT_JMS_PROPERTY)) {
+                Fault fault = exchange.getFault();
+                if (fault == null) {
+                    fault = exchange.createFault();
+                    exchange.setFault(fault);
+                }
+                marshaler.populateMessage(receiveJmsMsg, exchange, fault);
+            } else {
+                NormalizedMessage out = exchange.getMessage("out");
+                if (out == null) {
+                    out = exchange.createMessage();
+                    exchange.setMessage(out, "out");
+                }
+                marshaler.populateMessage(receiveJmsMsg, exchange, out);
             }
-            marshaler.populateMessage(receiveJmsMsg, exchange, out);
-            boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+            boolean txSync = exchange.getStatus() == ExchangeStatus.ACTIVE
+                                && exchange.isTransacted()
+                                && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
             if (txSync) {
                 sendSync(exchange);
             } else {
@@ -665,12 +683,27 @@
             logger.error("Unable to load exchange related to incoming JMS message " + message, e);
         }
         try {
-            NormalizedMessage out = exchange.getMessage("out");
-            if (out == null) {
-                out = exchange.createMessage();
-                exchange.setMessage(out, "out");
+            if (message.getBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY)) {
+                exchange.setStatus(ExchangeStatus.DONE);
+            } else if (message.getBooleanProperty(AbstractJmsMarshaler.ERROR_JMS_PROPERTY)) {
+                Exception e = (Exception) ((ObjectMessage) message).getObject();
+                exchange.setError(e);
+                exchange.setStatus(ExchangeStatus.ERROR);
+            } else if (message.getBooleanProperty(AbstractJmsMarshaler.FAULT_JMS_PROPERTY)) {
+                Fault fault = exchange.getFault();
+                if (fault == null) {
+                    fault = exchange.createFault();
+                    exchange.setFault(fault);
+                }
+                marshaler.populateMessage(message, exchange, fault);
+            } else {
+                NormalizedMessage out = exchange.getMessage("out");
+                if (out == null) {
+                    out = exchange.createMessage();
+                    exchange.setMessage(out, "out");
+                }
+                marshaler.populateMessage(message, exchange, out);
             }
-            marshaler.populateMessage(message, exchange, out);
         } catch (Exception e) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Error while populating JBI exchange " + exchange, e);
@@ -678,7 +711,9 @@
             exchange.setError(e);
         }
         try {
-            boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+            boolean txSync = exchange.getStatus() == ExchangeStatus.ACTIVE
+                                && exchange.isTransacted()
+                                && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
             if (txSync) {
                 sendSync(exchange);
             } else {

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java Wed Nov 12 01:32:04 2008
@@ -16,6 +16,10 @@
  */
 package org.apache.servicemix.jms;
 
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 
@@ -26,9 +30,12 @@
 import org.apache.activemq.jndi.ActiveMQInitialContextFactory;
 import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.servicemix.MessageExchangeListener;
 import org.apache.servicemix.client.DefaultServiceMixClient;
 import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.ComponentSupport;
 import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.jencks.GeronimoPlatformTransactionManager;
 import org.springframework.core.io.ClassPathResource;
 import org.springframework.jms.core.JmsTemplate;
@@ -106,4 +113,29 @@
     protected void configureJmsBroker() throws Exception {
         
     }
+
+     protected static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+         private Exception exception;
+     
+         public ReturnErrorComponent(Exception exception) {
+             this.exception = exception;
+         }
+     
+         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                 fail(exchange, exception);
+             }
+         }
+     }
+ 
+     protected static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
+         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                 Fault fault = exchange.createFault();
+                 fault.setContent(new StringSource("<fault/>"));
+                 fail(exchange, fault);
+             }
+         }
+     }
+     
 }

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java Wed Nov 12 01:32:04 2008
@@ -20,28 +20,24 @@
 import java.net.URI;
 import java.net.URL;
 
+import javax.activation.DataHandler;
 import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.InOut;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.mail.util.ByteArrayDataSource;
 import javax.xml.namespace.QName;
 import javax.xml.transform.Source;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.MessageExchangeListener;
 import org.apache.servicemix.components.jms.JmsReceiverComponent;
 import org.apache.servicemix.components.jms.JmsServiceComponent;
-import org.apache.servicemix.components.util.ComponentSupport;
 import org.apache.servicemix.components.util.EchoComponent;
-import org.apache.servicemix.jbi.FaultException;
 import org.apache.servicemix.jbi.container.ActivationSpec;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
@@ -199,27 +195,32 @@
 
         InOut inout = null;
         boolean result = false;
+        DataHandler dh = null;
         
         // Test successful return
         inout = client.createInOutExchange();
         inout.setInterfaceName(new QName("http://jms.servicemix.org/Test", "ProviderInterface"));
         inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
+        inout.getInMessage().addAttachment("myImage", dh);
         result = client.sendSync(inout);
         assertTrue(result);
         NormalizedMessage out = inout.getOutMessage();
         assertNotNull(out);
         Source src = out.getContent();
         assertNotNull(src);
+        dh = out.getAttachment("myImage");
+        assertNotNull(dh);
+        
         logger.info(new SourceTransformer().toString(src));
 
-          // TODO
-//        // Test fault return 
+        // Test fault return 
 //        container.deactivateComponent("receiver");
 //        ReturnFaultComponent fault = new ReturnFaultComponent();
 //        ActivationSpec asFault = new ActivationSpec("receiver", fault);
 //        asFault.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
 //        container.activateComponent(asFault);
-//        
+//
 //        inout = client.createInOutExchange();
 //        inout.setInterfaceName(new QName("http://jms.servicemix.org/Test", "ProviderInterface"));
 //        inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
@@ -233,7 +234,7 @@
         ActivationSpec asError = new ActivationSpec("receiver", error);
         asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
         container.activateComponent(asError);
-        
+
         inout = client.createInOutExchange();
         inout.setInterfaceName(new QName("http://jms.servicemix.org/Test", "ProviderInterface"));
         inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
@@ -242,29 +243,5 @@
         assertTrue("An IllegalArgumentException was expected", inout.getError() instanceof IllegalArgumentException);
 
     }
-    
-    protected static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
-        private Exception exception;
-
-        public ReturnErrorComponent(Exception exception) {
-            this.exception = exception;
-        }
-
-        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, exception);
-            }
-        }
-    }
-
-    protected static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
-        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                Fault fault = exchange.createFault();
-                fault.setContent(new StringSource("<fault/>"));
-                fail(exchange, fault);
-            }
-        }
-    }
 
 }

Added: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=713330&view=auto
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (added)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Wed Nov 12 01:32:04 2008
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.activation.DataHandler;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.ConnectionFactory;
+import javax.mail.util.ByteArrayDataSource;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jms.endpoints.DefaultConsumerMarshaler;
+import org.apache.servicemix.jms.endpoints.DefaultProviderMarshaler;
+import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
+import org.apache.servicemix.jms.endpoints.JmsProviderEndpoint;
+
+public class JmsProviderConsumerEndpointTest extends AbstractJmsTestSupport {
+
+    private static Log logger = LogFactory.getLog(JmsProviderConsumerEndpointTest.class);
+
+    public void testProviderConsumerInOut() throws Exception {
+        ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
+        JmsComponent jmsComponent = new JmsComponent();
+        JmsConsumerEndpoint consumerEndpoint = createConsumerEndpoint(connFactory);
+        JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
+        jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
+        container.activateComponent(jmsComponent, "servicemix-jms");
+
+        // Add an echo component
+        EchoComponent echo = new EchoComponent();
+        ActivationSpec asEcho = new ActivationSpec("receiver", echo);
+        asEcho.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        container.activateComponent(asEcho);
+
+        InOut inout = null;
+        boolean result = false;
+        DataHandler dh = null;
+        
+        // Test successful return
+        inout = client.createInOutExchange();
+        inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
+        inout.getInMessage().addAttachment("myImage", dh);
+        result = client.sendSync(inout);
+        assertTrue(result);
+        NormalizedMessage out = inout.getOutMessage();
+        assertNotNull(out);
+        Source src = out.getContent();
+        assertNotNull(src);
+        dh = out.getAttachment("myImage");
+        assertNotNull(dh);
+        
+        logger.info(new SourceTransformer().toString(src));
+
+        // Test fault return 
+        container.deactivateComponent("receiver");
+        ReturnFaultComponent fault = new ReturnFaultComponent();
+        ActivationSpec asFault = new ActivationSpec("receiver", fault);
+        asFault.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        container.activateComponent(asFault);
+        
+        inout = client.createInOutExchange();
+        inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        result = client.sendSync(inout);
+        assertTrue(result);
+        assertNotNull(inout.getFault());
+        
+        // Test error return
+        container.deactivateComponent("receiver");
+        ReturnErrorComponent error = new ReturnErrorComponent(new IllegalArgumentException());
+        ActivationSpec asError = new ActivationSpec("receiver", error);
+        asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        container.activateComponent(asError);
+        
+        inout = client.createInOutExchange();
+        inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(inout);
+        assertEquals(ExchangeStatus.ERROR, inout.getStatus());
+        assertTrue("An IllegalArgumentException was expected", inout.getError() instanceof IllegalArgumentException);
+
+    }
+
+    private JmsConsumerEndpoint createConsumerEndpoint(ConnectionFactory connFactory) throws URISyntaxException {
+        JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+        endpoint.setService(new QName("http://jms.servicemix.org/Test", "Consumer"));
+        endpoint.setEndpoint("endpoint");
+        DefaultConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
+        marshaler.setMep(new URI("http://www.w3.org/2004/08/wsdl/in-out"));
+        endpoint.setMarshaler(marshaler);
+        endpoint.setListenerType("simple");
+        endpoint.setConnectionFactory(connFactory);
+        endpoint.setDestinationName("destination");
+        endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        return endpoint;
+    }
+    
+    private JmsProviderEndpoint createProviderEndpoint(ConnectionFactory connFactory) {
+        JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
+        endpoint.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        endpoint.setEndpoint("endpoint");
+        DefaultProviderMarshaler marshaler = new DefaultProviderMarshaler();
+        endpoint.setMarshaler(marshaler);
+        endpoint.setConnectionFactory(connFactory);
+        endpoint.setDestinationName("destination");
+        return endpoint;
+    }
+}