You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2010/05/13 09:43:40 UTC

svn commit: r943835 - in /servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src: main/java/org/apache/servicemix/cxf/transport/nmr/ test/java/org/apache/servicemix/cxf/transport/nmr/

Author: ffang
Date: Thu May 13 07:43:39 2010
New Revision: 943835

URL: http://svn.apache.org/viewvc?rev=943835&view=rev
Log:
[SMX4-527] cxf nmr transport should copy over property headers and attachment between cxf message and nmr message

Modified:
    servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
    servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java
    servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java
    servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java
    servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java

Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java Thu May 13 07:43:39 2010
@@ -21,19 +21,25 @@ package org.apache.servicemix.cxf.transp
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.lang.reflect.Member;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Logger;
 
+import javax.activation.DataHandler;
 import javax.jws.WebService;
 import javax.xml.namespace.QName;
 import javax.xml.transform.Source;
 import javax.xml.transform.stream.StreamSource;
 
+import org.apache.cxf.attachment.AttachmentImpl;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Attachment;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
@@ -114,6 +120,26 @@ public class NMRConduitOutputStream exte
             LOG.info(new org.apache.cxf.common.i18n.Message("EXCHANGE.ENDPOINT", LOG).toString() + serviceName);
             LOG.info("setup message contents on " + inMsg);
             inMsg.setBody(getMessageContent(message));
+            //copy attachments
+            if (message != null && message.getAttachments() != null) {
+                for (Attachment att : message.getAttachments()) {
+                	inMsg.addAttachment(att.getId(), att
+                            .getDataHandler());
+                }
+            }
+            
+            //copy properties
+            for (Map.Entry<String, Object> ent : message.entrySet()) {
+                //check if value is Serializable, and if value is Map or collection,
+                //just exclude it since the entry of it may not be Serializable as well
+                if (ent.getValue() instanceof Serializable
+                        && !(ent.getValue() instanceof Map)
+                        && !(ent.getValue() instanceof Collection)) {
+                	inMsg.setHeader(ent.getKey(), ent.getValue());
+                }
+            }
+            
+            
             LOG.info("service for exchange " + serviceName);
 
             Map<String,Object> refProps = new HashMap<String,Object>();
@@ -126,10 +152,13 @@ public class NMRConduitOutputStream exte
             if (!isOneWay) {
                 channel.sendSync(xchng);
                 Source content = null;
+                org.apache.servicemix.nmr.api.Message nm = null;
                 if (xchng.getFault(false) != null) {
                     content = xchng.getFault().getBody(Source.class);
+                    nm = xchng.getFault();
                 } else {
                     content = xchng.getOut().getBody(Source.class);
+                    nm = xchng.getOut();
                 }
                 Message inMessage = new MessageImpl();
                 message.getExchange().setInMessage(inMessage);
@@ -138,6 +167,20 @@ public class NMRConduitOutputStream exte
                     throw new IOException(new org.apache.cxf.common.i18n.Message("UNABLE.RETRIEVE.MESSAGE", LOG).toString());
                 }
                 inMessage.setContent(InputStream.class, ins);
+                //copy attachments
+                Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
+                for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
+                	cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler) ent.getValue()));
+                }
+                inMessage.setAttachments(cxfAttachmentList);
+                
+                //copy properties
+                for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
+                	if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
+                		inMessage.put(ent.getKey(), ent.getValue());
+                	}
+                }
+                
                 conduit.getMessageObserver().onMessage(inMessage);
 
                 xchng.setStatus(Status.Done);

Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java Thu May 13 07:43:39 2010
@@ -22,15 +22,20 @@ package org.apache.servicemix.cxf.transp
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.activation.DataHandler;
 import javax.xml.namespace.QName;
 import javax.xml.transform.Source;
 
+import org.apache.cxf.attachment.AttachmentImpl;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.Attachment;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
@@ -118,9 +123,23 @@ public class NMRDestination extends Abst
 
             MessageImpl inMessage = new MessageImpl();
             inMessage.put(Exchange.class, exchange);
-
+            
             final InputStream in = NMRMessageHelper.convertMessageToInputStream(nm.getBody(Source.class));
             inMessage.setContent(InputStream.class, in);
+            //copy attachments
+            Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
+            for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
+            	cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler) ent.getValue()));
+            }
+            inMessage.setAttachments(cxfAttachmentList);
+            
+            //copy properties
+            for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
+            	if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
+            		inMessage.put(ent.getKey(), ent.getValue());
+            	}
+            }
+            
             inMessage.setDestination(this);
             getMessageObserver().onMessage(inMessage);
 
@@ -160,7 +179,7 @@ public class NMRDestination extends Abst
             // setup the message to be send back
             Channel dc = channel;
             message.put(Exchange.class, inMessage.get(Exchange.class));
-            message.setContent(OutputStream.class, new NMRDestinationOutputStream(inMessage, dc));
+            message.setContent(OutputStream.class, new NMRDestinationOutputStream(inMessage, message, dc));
         }        
 
         protected Logger getLogger() {

Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java Thu May 13 07:43:39 2010
@@ -21,6 +21,9 @@ package org.apache.servicemix.cxf.transp
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -32,6 +35,7 @@ import org.w3c.dom.Document;
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Attachment;
 import org.apache.cxf.message.Message;
 import org.apache.servicemix.nmr.api.Channel;
 import org.apache.servicemix.nmr.api.Exchange;
@@ -40,12 +44,15 @@ public class NMRDestinationOutputStream 
 
     private static final Logger LOG = LogUtils.getL7dLogger(NMRDestinationOutputStream.class);
     private Message inMessage;
+    private Message outMessage;
     private Channel channel;
     
     public NMRDestinationOutputStream(Message m,
+    						   Message outM,
                                Channel dc) {
         super();
         inMessage = m;
+        outMessage = outM;
         channel = dc;
     }
     
@@ -86,6 +93,26 @@ public class NMRDestinationOutputStream 
                         xchng.setError(f);
                     }
                 } else {
+                	//copy attachments
+                    if (outMessage != null && outMessage.getAttachments() != null) {
+                        for (Attachment att : outMessage.getAttachments()) {
+                        	xchng.getOut().addAttachment(att.getId(), att
+                                    .getDataHandler());
+                        }
+                    }
+                    
+                    //copy properties
+                    for (Map.Entry<String, Object> ent : outMessage.entrySet()) {
+                        //check if value is Serializable, and if value is Map or collection,
+                        //just exclude it since the entry of it may not be Serializable as well
+                        if (ent.getValue() instanceof Serializable
+                                && !(ent.getValue() instanceof Map)
+                                && !(ent.getValue() instanceof Collection)) {
+                        	xchng.getOut().setHeader(ent.getKey(), ent.getValue());
+                        }
+                    }
+
+
                     xchng.getOut().setBody(new DOMSource(doc));
                 }
                 LOG.fine(new org.apache.cxf.common.i18n.Message("POST.DISPATCH", LOG).toString());

Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java Thu May 13 07:43:39 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.logging.Logger;
 
 import javax.xml.stream.XMLStreamReader;
@@ -94,7 +95,9 @@ public class NMRConduitTest extends Abst
         Source source = new StreamSource(new ByteArrayInputStream(
                             "<message>TestHelloWorld</message>".getBytes()));
         EasyMock.expect(outMsg.getBody(Source.class)).andReturn(source);
-        
+        EasyMock.expect(xchg.getOut()).andReturn(outMsg);
+        EasyMock.expect(outMsg.getAttachments()).andReturn(new HashMap<String, Object>());
+        EasyMock.expect(outMsg.getHeaders()).andReturn(new HashMap<String, Object>());
         control.replay();
         try {
             conduit.prepare(message);

Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java Thu May 13 07:43:39 2010
@@ -20,6 +20,7 @@
 package org.apache.servicemix.cxf.transport.nmr;
 
 import java.io.ByteArrayInputStream;
+import java.util.HashMap;
 import java.util.logging.Logger;
 
 import javax.xml.namespace.QName;
@@ -67,7 +68,7 @@ public class NMRDestinationTest extends 
         channel.send(messageExchange);
         EasyMock.replay(channel);
         
-        NMRDestinationOutputStream jbiOS = new NMRDestinationOutputStream(message, channel);
+        NMRDestinationOutputStream jbiOS = new NMRDestinationOutputStream(message, new MessageImpl(), channel);
         
         //Create array of more than what is in threshold in CachedOutputStream, 
         //though the threshold in CachedOutputStream should be made protected 
@@ -120,7 +121,8 @@ public class NMRDestinationTest extends 
         org.apache.servicemix.nmr.api.Message inMsg = control.createMock(org.apache.servicemix.nmr.api.Message.class);
         EasyMock.expect(xchg.getStatus()).andReturn(Status.Active);
         EasyMock.expect(xchg.getIn()).andReturn(inMsg);
-                
+        EasyMock.expect(inMsg.getAttachments()).andReturn(new HashMap<String, Object>());
+        EasyMock.expect(inMsg.getHeaders()).andReturn(new HashMap<String, Object>());
         Source source = new StreamSource(new ByteArrayInputStream(
                             "<message>TestHelloWorld</message>".getBytes()));
         EasyMock.expect(inMsg.getBody(Source.class)).andReturn(source);