You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2010/05/13 09:43:40 UTC
svn commit: r943835 - in
/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src:
main/java/org/apache/servicemix/cxf/transport/nmr/
test/java/org/apache/servicemix/cxf/transport/nmr/
Author: ffang
Date: Thu May 13 07:43:39 2010
New Revision: 943835
URL: http://svn.apache.org/viewvc?rev=943835&view=rev
Log:
[SMX4-527] cxf nmr transport should copy over property headers and attachment between cxf message and nmr message
Modified:
servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java
servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java
servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java
servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java
Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java Thu May 13 07:43:39 2010
@@ -21,19 +21,25 @@ package org.apache.servicemix.cxf.transp
import java.io.IOException;
import java.io.InputStream;
+import java.io.Serializable;
import java.lang.reflect.Member;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
+import javax.activation.DataHandler;
import javax.jws.WebService;
import javax.xml.namespace.QName;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
+import org.apache.cxf.attachment.AttachmentImpl;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Attachment;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
@@ -114,6 +120,26 @@ public class NMRConduitOutputStream exte
LOG.info(new org.apache.cxf.common.i18n.Message("EXCHANGE.ENDPOINT", LOG).toString() + serviceName);
LOG.info("setup message contents on " + inMsg);
inMsg.setBody(getMessageContent(message));
+ //copy attachments
+ if (message != null && message.getAttachments() != null) {
+ for (Attachment att : message.getAttachments()) {
+ inMsg.addAttachment(att.getId(), att
+ .getDataHandler());
+ }
+ }
+
+ //copy properties
+ for (Map.Entry<String, Object> ent : message.entrySet()) {
+ //check if value is Serializable, and if value is Map or collection,
+ //just exclude it since the entry of it may not be Serializable as well
+ if (ent.getValue() instanceof Serializable
+ && !(ent.getValue() instanceof Map)
+ && !(ent.getValue() instanceof Collection)) {
+ inMsg.setHeader(ent.getKey(), ent.getValue());
+ }
+ }
+
+
LOG.info("service for exchange " + serviceName);
Map<String,Object> refProps = new HashMap<String,Object>();
@@ -126,10 +152,13 @@ public class NMRConduitOutputStream exte
if (!isOneWay) {
channel.sendSync(xchng);
Source content = null;
+ org.apache.servicemix.nmr.api.Message nm = null;
if (xchng.getFault(false) != null) {
content = xchng.getFault().getBody(Source.class);
+ nm = xchng.getFault();
} else {
content = xchng.getOut().getBody(Source.class);
+ nm = xchng.getOut();
}
Message inMessage = new MessageImpl();
message.getExchange().setInMessage(inMessage);
@@ -138,6 +167,20 @@ public class NMRConduitOutputStream exte
throw new IOException(new org.apache.cxf.common.i18n.Message("UNABLE.RETRIEVE.MESSAGE", LOG).toString());
}
inMessage.setContent(InputStream.class, ins);
+ //copy attachments
+ Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
+ for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
+ cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler) ent.getValue()));
+ }
+ inMessage.setAttachments(cxfAttachmentList);
+
+ //copy properties
+ for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
+ if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
+ inMessage.put(ent.getKey(), ent.getValue());
+ }
+ }
+
conduit.getMessageObserver().onMessage(inMessage);
xchng.setStatus(Status.Done);
Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestination.java Thu May 13 07:43:39 2010
@@ -22,15 +22,20 @@ package org.apache.servicemix.cxf.transp
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.activation.DataHandler;
import javax.xml.namespace.QName;
import javax.xml.transform.Source;
+import org.apache.cxf.attachment.AttachmentImpl;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.Attachment;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
@@ -118,9 +123,23 @@ public class NMRDestination extends Abst
MessageImpl inMessage = new MessageImpl();
inMessage.put(Exchange.class, exchange);
-
+
final InputStream in = NMRMessageHelper.convertMessageToInputStream(nm.getBody(Source.class));
inMessage.setContent(InputStream.class, in);
+ //copy attachments
+ Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
+ for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
+ cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler) ent.getValue()));
+ }
+ inMessage.setAttachments(cxfAttachmentList);
+
+ //copy properties
+ for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
+ if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
+ inMessage.put(ent.getKey(), ent.getValue());
+ }
+ }
+
inMessage.setDestination(this);
getMessageObserver().onMessage(inMessage);
@@ -160,7 +179,7 @@ public class NMRDestination extends Abst
// setup the message to be send back
Channel dc = channel;
message.put(Exchange.class, inMessage.get(Exchange.class));
- message.setContent(OutputStream.class, new NMRDestinationOutputStream(inMessage, dc));
+ message.setContent(OutputStream.class, new NMRDestinationOutputStream(inMessage, message, dc));
}
protected Logger getLogger() {
Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationOutputStream.java Thu May 13 07:43:39 2010
@@ -21,6 +21,9 @@ package org.apache.servicemix.cxf.transp
import java.io.IOException;
import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,6 +35,7 @@ import org.w3c.dom.Document;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Attachment;
import org.apache.cxf.message.Message;
import org.apache.servicemix.nmr.api.Channel;
import org.apache.servicemix.nmr.api.Exchange;
@@ -40,12 +44,15 @@ public class NMRDestinationOutputStream
private static final Logger LOG = LogUtils.getL7dLogger(NMRDestinationOutputStream.class);
private Message inMessage;
+ private Message outMessage;
private Channel channel;
public NMRDestinationOutputStream(Message m,
+ Message outM,
Channel dc) {
super();
inMessage = m;
+ outMessage = outM;
channel = dc;
}
@@ -86,6 +93,26 @@ public class NMRDestinationOutputStream
xchng.setError(f);
}
} else {
+ //copy attachments
+ if (outMessage != null && outMessage.getAttachments() != null) {
+ for (Attachment att : outMessage.getAttachments()) {
+ xchng.getOut().addAttachment(att.getId(), att
+ .getDataHandler());
+ }
+ }
+
+ //copy properties
+ for (Map.Entry<String, Object> ent : outMessage.entrySet()) {
+ //check if value is Serializable, and if value is Map or collection,
+ //just exclude it since the entry of it may not be Serializable as well
+ if (ent.getValue() instanceof Serializable
+ && !(ent.getValue() instanceof Map)
+ && !(ent.getValue() instanceof Collection)) {
+ xchng.getOut().setHeader(ent.getKey(), ent.getValue());
+ }
+ }
+
+
xchng.getOut().setBody(new DOMSource(doc));
}
LOG.fine(new org.apache.cxf.common.i18n.Message("POST.DISPATCH", LOG).toString());
Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitTest.java Thu May 13 07:43:39 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
+import java.util.HashMap;
import java.util.logging.Logger;
import javax.xml.stream.XMLStreamReader;
@@ -94,7 +95,9 @@ public class NMRConduitTest extends Abst
Source source = new StreamSource(new ByteArrayInputStream(
"<message>TestHelloWorld</message>".getBytes()));
EasyMock.expect(outMsg.getBody(Source.class)).andReturn(source);
-
+ EasyMock.expect(xchg.getOut()).andReturn(outMsg);
+ EasyMock.expect(outMsg.getAttachments()).andReturn(new HashMap<String, Object>());
+ EasyMock.expect(outMsg.getHeaders()).andReturn(new HashMap<String, Object>());
control.replay();
try {
conduit.prepare(message);
Modified: servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java?rev=943835&r1=943834&r2=943835&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java (original)
+++ servicemix/smx4/features/trunk/cxf/cxf-transport-nmr/src/test/java/org/apache/servicemix/cxf/transport/nmr/NMRDestinationTest.java Thu May 13 07:43:39 2010
@@ -20,6 +20,7 @@
package org.apache.servicemix.cxf.transport.nmr;
import java.io.ByteArrayInputStream;
+import java.util.HashMap;
import java.util.logging.Logger;
import javax.xml.namespace.QName;
@@ -67,7 +68,7 @@ public class NMRDestinationTest extends
channel.send(messageExchange);
EasyMock.replay(channel);
- NMRDestinationOutputStream jbiOS = new NMRDestinationOutputStream(message, channel);
+ NMRDestinationOutputStream jbiOS = new NMRDestinationOutputStream(message, new MessageImpl(), channel);
//Create array of more than what is in threshold in CachedOutputStream,
//though the threshold in CachedOutputStream should be made protected
@@ -120,7 +121,8 @@ public class NMRDestinationTest extends
org.apache.servicemix.nmr.api.Message inMsg = control.createMock(org.apache.servicemix.nmr.api.Message.class);
EasyMock.expect(xchg.getStatus()).andReturn(Status.Active);
EasyMock.expect(xchg.getIn()).andReturn(inMsg);
-
+ EasyMock.expect(inMsg.getAttachments()).andReturn(new HashMap<String, Object>());
+ EasyMock.expect(inMsg.getHeaders()).andReturn(new HashMap<String, Object>());
Source source = new StreamSource(new ByteArrayInputStream(
"<message>TestHelloWorld</message>".getBytes()));
EasyMock.expect(inMsg.getBody(Source.class)).andReturn(source);