You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/03/20 02:21:50 UTC

svn commit: r1302744 - in /cxf/trunk: api/src/main/java/org/apache/cxf/interceptor/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ systests/jaxrs/src/test/java/org/apache/cxf/...

Author: dkulp
Date: Tue Mar 20 01:21:50 2012
New Revision: 1302744

URL: http://svn.apache.org/viewvc?rev=1302744&view=rev
Log:
[CXF-4184] For JMS TextMessage, keep the payload as String using
Reader/Writer instead of streams.

Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/AbstractLoggingInterceptor.java
    cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java
    cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java
    cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxInInterceptor.java
    cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutEndingInterceptor.java
    cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
    cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSJmsTest.java
    cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JMSBookStore.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/AbstractLoggingInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/AbstractLoggingInterceptor.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/AbstractLoggingInterceptor.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/AbstractLoggingInterceptor.java Tue Mar 20 01:21:50 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.interceptor;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
+import java.io.StringReader;
 import java.io.StringWriter;
 import java.net.URI;
 import java.util.ArrayList;
@@ -171,6 +172,39 @@ public abstract class AbstractLoggingInt
 
         }
     }
+    protected void writePayload(StringBuilder builder, 
+                                StringWriter stringWriter,
+                                String contentType) 
+        throws Exception {
+        // Just transform the XML message when the cos has content
+        if (isPrettyLogging() 
+            && contentType != null 
+            && contentType.indexOf("xml") >= 0 
+            && stringWriter.getBuffer().length() > 0) {
+            Transformer serializer = XMLUtils.newTransformer(2);
+            // Setup indenting to "pretty print"
+            serializer.setOutputProperty(OutputKeys.INDENT, "yes");
+            serializer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+
+            StringWriter swriter = new StringWriter();
+            serializer.transform(new StreamSource(new StringReader(stringWriter.getBuffer().toString())),
+                                 new StreamResult(swriter));
+            String result = swriter.toString();
+            if (result.length() < limit || limit == -1) {
+                builder.append(swriter.toString());
+            } else {
+                builder.append(swriter.toString().substring(0, limit));
+            }
+
+        } else {
+            StringBuffer buffer = stringWriter.getBuffer();
+            if (buffer.length() > limit) {
+                builder.append(buffer.subSequence(0, limit));
+            } else {
+                builder.append(buffer);
+            }
+        }
+    }
 
 
     /**

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java Tue Mar 20 01:21:50 2012
@@ -18,8 +18,10 @@
  */
 package org.apache.cxf.interceptor;
 
+import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.PrintWriter;
+import java.io.Reader;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -153,6 +155,22 @@ public class LoggingInInterceptor extend
             } catch (Exception e) {
                 throw new Fault(e);
             }
+        } else {
+            Reader reader = message.getContent(Reader.class);
+            if (reader != null) {
+                try {
+                    BufferedReader r = new BufferedReader(reader, limit);
+                    r.mark(limit);
+                    char b[] = new char[limit];
+                    int i = r.read(b);
+                    buffer.getPayload().append(b, 0, i);
+                    r.reset();
+                    message.setContent(Reader.class, r);
+                } catch (Exception e) {
+                    throw new Fault(e);
+                }
+                
+            }
         }
         log(logger, buffer.toString());
     }

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java Tue Mar 20 01:21:50 2012
@@ -19,8 +19,12 @@
 
 package org.apache.cxf.interceptor;
 
+import java.io.FilterWriter;
+import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -60,7 +64,8 @@ public class LoggingOutInterceptor exten
 
     public void handleMessage(Message message) throws Fault {
         final OutputStream os = message.getContent(OutputStream.class);
-        if (os == null) {
+        final Writer iowriter = message.getContent(Writer.class);
+        if (os == null && iowriter == null) {
             return;
         }
         Logger logger = getMessageLogger(message);
@@ -69,16 +74,114 @@ public class LoggingOutInterceptor exten
             boolean hasLogged = message.containsKey(LOG_SETUP);
             if (!hasLogged) {
                 message.put(LOG_SETUP, Boolean.TRUE);
-                final CacheAndWriteOutputStream newOut = new CacheAndWriteOutputStream(os);
-                if (threshold > 0) {
-                    newOut.setThreshold(threshold);
+                if (os != null) {
+                    final CacheAndWriteOutputStream newOut = new CacheAndWriteOutputStream(os);
+                    if (threshold > 0) {
+                        newOut.setThreshold(threshold);
+                    }
+                    message.setContent(OutputStream.class, newOut);
+                    newOut.registerCallback(new LoggingCallback(logger, message, os));
+                } else {
+                    message.setContent(Writer.class, new LogWriter(logger, message, iowriter));
                 }
-                message.setContent(OutputStream.class, newOut);
-                newOut.registerCallback(new LoggingCallback(logger, message, os));
             }
         }
     }
     
+    private LoggingMessage setupBuffer(Message message) {
+        String id = (String)message.getExchange().get(LoggingMessage.ID_KEY);
+        if (id == null) {
+            id = LoggingMessage.nextId();
+            message.getExchange().put(LoggingMessage.ID_KEY, id);
+        }
+        final LoggingMessage buffer 
+            = new LoggingMessage("Outbound Message\n---------------------------",
+                                 id);
+        
+        Integer responseCode = (Integer)message.get(Message.RESPONSE_CODE);
+        if (responseCode != null) {
+            buffer.getResponseCode().append(responseCode);
+        }
+        
+        String encoding = (String)message.get(Message.ENCODING);
+        if (encoding != null) {
+            buffer.getEncoding().append(encoding);
+        }            
+        String httpMethod = (String)message.get(Message.HTTP_REQUEST_METHOD);
+        if (httpMethod != null) {
+            buffer.getHttpMethod().append(httpMethod);
+        }
+        String address = (String)message.get(Message.ENDPOINT_ADDRESS);
+        if (address != null) {
+            buffer.getAddress().append(address);
+        }
+        String ct = (String)message.get(Message.CONTENT_TYPE);
+        if (ct != null) {
+            buffer.getContentType().append(ct);
+        }
+        Object headers = message.get(Message.PROTOCOL_HEADERS);
+        if (headers != null) {
+            buffer.getHeader().append(headers);
+        }
+        return buffer;
+    }
+    
+    private class LogWriter extends FilterWriter {
+        StringWriter out2;
+        int count;
+        Logger logger; //NOPMD
+        Message message;
+        
+        public LogWriter(Logger logger, Message message, Writer writer) {
+            super(writer);
+            this.logger = logger;
+            this.message = message;
+            if (!(writer instanceof StringWriter)) {
+                out2 = new StringWriter();
+            }
+        }
+        public void write(int c) throws IOException {
+            super.write(c);
+            if (out2 != null && count < limit) {
+                out2.write(c);
+            }
+            count++;
+        }
+        public void write(char[] cbuf, int off, int len) throws IOException {
+            super.write(cbuf, off, len);
+            if (out2 != null && count < limit) {
+                out2.write(cbuf, off, len);
+            }
+            count += len;
+        }
+        public void write(String str, int off, int len) throws IOException {
+            super.write(str, off, len);
+            if (out2 != null && count < limit) {
+                out2.write(str, off, len);
+            }
+            count += len;
+        }
+        public void close() throws IOException {
+            LoggingMessage buffer = setupBuffer(message);
+            if (count >= limit) {
+                buffer.getMessage().append("(message truncated to " + limit + " bytes)\n");
+            }
+            StringWriter w2 = out2;
+            if (w2 == null) {
+                w2 = (StringWriter)out;
+            }
+            String ct = (String)message.get(Message.CONTENT_TYPE);
+            try {
+                writePayload(buffer.getPayload(), w2, ct); 
+            } catch (Exception ex) {
+                //ignore
+            }
+            log(logger, buffer.toString());
+            message.setContent(Writer.class, out);
+            super.close();
+        }
+    }
+    
     class LoggingCallback implements CachedOutputStreamCallback {
         
         private final Message message;
@@ -96,42 +199,9 @@ public class LoggingOutInterceptor exten
         }
         
         public void onClose(CachedOutputStream cos) {
-            String id = (String)message.getExchange().get(LoggingMessage.ID_KEY);
-            if (id == null) {
-                id = LoggingMessage.nextId();
-                message.getExchange().put(LoggingMessage.ID_KEY, id);
-            }
-            final LoggingMessage buffer 
-                = new LoggingMessage("Outbound Message\n---------------------------",
-                                     id);
-            
-            Integer responseCode = (Integer)message.get(Message.RESPONSE_CODE);
-            if (responseCode != null) {
-                buffer.getResponseCode().append(responseCode);
-            }
-            
-            String encoding = (String)message.get(Message.ENCODING);
+            LoggingMessage buffer = setupBuffer(message);
 
-            if (encoding != null) {
-                buffer.getEncoding().append(encoding);
-            }            
-            String httpMethod = (String)message.get(Message.HTTP_REQUEST_METHOD);
-            if (httpMethod != null) {
-                buffer.getHttpMethod().append(httpMethod);
-            }
-            String address = (String)message.get(Message.ENDPOINT_ADDRESS);
-            if (address != null) {
-                buffer.getAddress().append(address);
-            }
             String ct = (String)message.get(Message.CONTENT_TYPE);
-            if (ct != null) {
-                buffer.getContentType().append(ct);
-            }
-            Object headers = message.get(Message.PROTOCOL_HEADERS);
-            if (headers != null) {
-                buffer.getHeader().append(headers);
-            }
-
             if (!isShowBinaryContent() && isBinaryContent(ct)) {
                 buffer.getMessage().append(BINARY_CONTENT_MESSAGE).append('\n');
                 log(logger, buffer.toString());
@@ -151,6 +221,7 @@ public class LoggingOutInterceptor exten
                 }
             }
             try {
+                String encoding = (String)message.get(Message.ENCODING);
                 writePayload(buffer.getPayload(), cos, encoding, ct); 
             } catch (Exception ex) {
                 //ignore

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxInInterceptor.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxInInterceptor.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxInInterceptor.java Tue Mar 20 01:21:50 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.interceptor;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Reader;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -62,10 +63,13 @@ public class StaxInInterceptor extends A
             return;
         }
         InputStream is = message.getContent(InputStream.class);
+        Reader reader = null;
         if (is == null) {
-            return;
+            reader = message.getContent(Reader.class);
+            if (reader == null) {
+                return;
+            }
         }
-
         String contentType = (String)message.get(Message.CONTENT_TYPE);
         
         if (contentType != null && contentType.contains("text/html")) {
@@ -97,14 +101,22 @@ public class StaxInInterceptor extends A
 
         String encoding = (String)message.get(Message.ENCODING);
 
-        XMLStreamReader reader;
+        XMLStreamReader xreader;
         try {
             XMLInputFactory factory = getXMLInputFactory(message);
             if (factory == null) {
-                reader = StaxUtils.createXMLStreamReader(is, encoding);
+                if (reader != null) {
+                    xreader = StaxUtils.createXMLStreamReader(reader);
+                } else {
+                    xreader = StaxUtils.createXMLStreamReader(is, encoding);
+                }
             } else {
                 synchronized (factory) {
-                    reader = factory.createXMLStreamReader(is, encoding);
+                    if (reader != null) {
+                        xreader = factory.createXMLStreamReader(reader);
+                    } else {
+                        xreader = factory.createXMLStreamReader(is, encoding);
+                    }
                 }                
             }
         } catch (XMLStreamException e) {
@@ -113,7 +125,7 @@ public class StaxInInterceptor extends A
                                                                    encoding), e);
         }
 
-        message.setContent(XMLStreamReader.class, reader);
+        message.setContent(XMLStreamReader.class, xreader);
     }
 
     public static XMLInputFactory getXMLInputFactory(Message m) throws Fault {

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutEndingInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutEndingInterceptor.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutEndingInterceptor.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutEndingInterceptor.java Tue Mar 20 01:21:50 2012
@@ -19,6 +19,7 @@
 package org.apache.cxf.interceptor;
 
 import java.io.OutputStream;
+import java.io.Writer;
 import java.util.ResourceBundle;
 
 import javax.xml.stream.XMLStreamException;
@@ -34,11 +35,16 @@ public class StaxOutEndingInterceptor ex
     private static final ResourceBundle BUNDLE = BundleUtils.getBundle(StaxOutEndingInterceptor.class);
     
     private String outStreamHolder;
+    private String writerHolder;
     
     public StaxOutEndingInterceptor(String outStreamHolder) {
+        this(outStreamHolder, null);
+    }
+    public StaxOutEndingInterceptor(String outStreamHolder, String writerHolder) {
         super(Phase.PRE_STREAM_ENDING);
         getAfter().add(AttachmentOutInterceptor.AttachmentOutEndingInterceptor.class.getName());
         this.outStreamHolder = outStreamHolder;
+        this.writerHolder = writerHolder;
     }
 
     public void handleMessage(Message message) throws Fault {
@@ -54,6 +60,12 @@ public class StaxOutEndingInterceptor ex
             if (os != null) {
                 message.setContent(OutputStream.class, os);
             }
+            if (writerHolder != null) {
+                Writer w = (Writer)message.get(writerHolder);
+                if (w != null) {
+                    message.setContent(Writer.class, w);
+                }
+            }
             message.removeContent(XMLStreamWriter.class);
         } catch (XMLStreamException e) {
             throw new Fault(new org.apache.cxf.common.i18n.Message("STAX_WRITE_EXC", BUNDLE), e);

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java Tue Mar 20 01:21:50 2012
@@ -20,6 +20,7 @@
 package org.apache.cxf.interceptor;
 
 import java.io.OutputStream;
+import java.io.Writer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.ResourceBundle;
@@ -43,8 +44,10 @@ import org.apache.cxf.staxutils.StaxUtil
  */
 public class StaxOutInterceptor extends AbstractPhaseInterceptor<Message> {
     public static final String OUTPUT_STREAM_HOLDER = StaxOutInterceptor.class.getName() + ".outputstream";
+    public static final String WRITER_HOLDER = StaxOutInterceptor.class.getName() + ".writer";
     public static final String FORCE_START_DOCUMENT = "org.apache.cxf.stax.force-start-document";
-    public static final StaxOutEndingInterceptor ENDING = new StaxOutEndingInterceptor(OUTPUT_STREAM_HOLDER);
+    public static final StaxOutEndingInterceptor ENDING 
+        = new StaxOutEndingInterceptor(OUTPUT_STREAM_HOLDER, WRITER_HOLDER);
     
     private static final ResourceBundle BUNDLE = BundleUtils.getBundle(StaxOutInterceptor.class);
     private static Map<Object, XMLOutputFactory> factories = new HashMap<Object, XMLOutputFactory>();
@@ -57,8 +60,12 @@ public class StaxOutInterceptor extends 
 
     public void handleMessage(Message message) {
         OutputStream os = message.getContent(OutputStream.class);
-        XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
-        if (os == null || writer != null) {
+        XMLStreamWriter xwriter = message.getContent(XMLStreamWriter.class);
+        Writer writer = null;
+        if (os == null) {
+            writer = message.getContent(Writer.class);
+        }
+        if ((os == null && writer == null) || xwriter != null) {
             return;
         }
 
@@ -67,21 +74,31 @@ public class StaxOutInterceptor extends 
         try {
             XMLOutputFactory factory = getXMLOutputFactory(message);
             if (factory == null) {
-                writer = StaxUtils.createXMLStreamWriter(os, encoding);
+                if (writer == null) {
+                    xwriter = StaxUtils.createXMLStreamWriter(os, encoding);
+                } else {
+                    xwriter = StaxUtils.createXMLStreamWriter(writer);
+                }
             } else {
                 synchronized (factory) {
-                    writer = factory.createXMLStreamWriter(os, encoding);
+                    if (writer == null) {
+                        xwriter = factory.createXMLStreamWriter(os, encoding);
+                    } else {
+                        xwriter = factory.createXMLStreamWriter(writer);
+                    }
                 }
             }
             if (MessageUtils.getContextualBoolean(message, FORCE_START_DOCUMENT, false)) {
-                writer.writeStartDocument(encoding, "1.0");
+                xwriter.writeStartDocument(encoding, "1.0");
                 message.removeContent(OutputStream.class);
                 message.put(OUTPUT_STREAM_HOLDER, os);
+                message.removeContent(Writer.class);
+                message.put(WRITER_HOLDER, writer);
             }
         } catch (XMLStreamException e) {
             throw new Fault(new org.apache.cxf.common.i18n.Message("STREAM_CREATE_EXC", BUNDLE), e);
         }
-        message.setContent(XMLStreamWriter.class, writer);
+        message.setContent(XMLStreamWriter.class, xwriter);
 
         // Add a final interceptor to write end elements
         message.getInterceptorChain().add(ENDING);
@@ -93,6 +110,10 @@ public class StaxOutInterceptor extends 
         if (os != null) {
             message.setContent(OutputStream.class, os);
         }
+        Writer writer = (Writer)message.get(WRITER_HOLDER);
+        if (writer != null) {
+            message.setContent(Writer.class, writer);
+        }
     }
 
     private String getEncoding(Message message) {

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Tue Mar 20 01:21:50 2012
@@ -21,7 +21,10 @@ package org.apache.cxf.transport.jms;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.Reader;
+import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
+import java.io.Writer;
 import java.lang.ref.WeakReference;
 import java.util.Map;
 import java.util.UUID;
@@ -93,16 +96,37 @@ public class JMSConduit extends Abstract
      * the OutputStream of the message and called the stream's close method. In the JMS case the
      * JMSOutputStream will then call back the sendExchange method of this class. {@inheritDoc}
      */
-    public void prepare(Message message) throws IOException {
+    public void prepare(final Message message) throws IOException {
         String name =  endpointInfo.getName().toString() + ".jms-conduit";
         org.apache.cxf.common.i18n.Message msg = 
             new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_CONDUIT", LOG, name);
         jmsConfig.ensureProperlyConfigured(msg);
         boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
-        JMSOutputStream out = new JMSOutputStream(this, message.getExchange(), isTextPayload);
-        message.setContent(OutputStream.class, out);
+        if (isTextPayload) {
+            message.setContent(Writer.class, new StringWriter() {
+                @Override
+                public void close() throws IOException {
+                    super.close();
+                    sendExchange(message.getExchange(), toString());
+                }
+            });
+        } else {
+            JMSOutputStream out = new JMSOutputStream(this, message.getExchange(), isTextPayload);
+            message.setContent(OutputStream.class, out);
+        }
+    }
+    @Override
+    public void close(Message msg) throws IOException {
+        Writer writer = msg.getContent(Writer.class);
+        if (writer != null) {
+            writer.close();
+        }
+        Reader reader = msg.getContent(Reader.class);
+        if (reader != null) {
+            reader.close();
+        }
+        super.close(msg);
     }
-    
     private synchronized AbstractMessageListenerContainer getJMSListener() {
         if (jmsListener == null) {
             jmsListener = JMSFactory.createJmsListener(jmsConfig, 

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Mar 20 01:21:50 2012
@@ -21,7 +21,10 @@ package org.apache.cxf.transport.jms;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.Reader;
+import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
+import java.io.Writer;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.GregorianCalendar;
@@ -361,7 +364,18 @@ public class JMSDestination extends Abst
             inMessage = message;
             this.sender = sender;
         }
-
+        @Override
+        public void close(Message msg) throws IOException {
+            Writer writer = msg.getContent(Writer.class);
+            if (writer != null) {
+                writer.close();
+            }
+            Reader reader = msg.getContent(Reader.class);
+            if (reader != null) {
+                reader.close();
+            }
+            super.close(msg);
+        }
         /**
          * Register a message observer for incoming messages.
          * 
@@ -377,7 +391,7 @@ public class JMSDestination extends Abst
          * 
          * @param message the message to be sent.
          */
-        public void prepare(Message message) throws IOException {
+        public void prepare(final Message message) throws IOException {
             // setup the message to be send back
             javax.jms.Message jmsMessage = (javax.jms.Message)inMessage
                 .get(JMSConstants.JMS_REQUEST_MESSAGE);
@@ -391,8 +405,19 @@ public class JMSDestination extends Abst
 
             Exchange exchange = inMessage.getExchange();
             exchange.setOutMessage(message);
-            message.setContent(OutputStream.class, new JMSOutputStream(sender, exchange,
-                                                                       jmsMessage instanceof TextMessage));
+            
+            if (jmsMessage instanceof TextMessage) {
+                message.setContent(Writer.class, new StringWriter() {
+                    @Override
+                    public void close() throws IOException {
+                        super.close();
+                        sender.sendExchange(message.getExchange(), toString());
+                    }
+                });
+
+            } else {
+                message.setContent(OutputStream.class, new JMSOutputStream(sender, exchange, false));
+            }
         }
 
         protected Logger getLogger() {

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Tue Mar 20 01:21:50 2012
@@ -21,6 +21,8 @@ package org.apache.cxf.transport.jms;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
 import java.io.UnsupportedEncodingException;
 import java.security.Principal;
 import java.util.ArrayList;
@@ -134,13 +136,7 @@ public final class JMSUtils {
             throw JmsUtils.convertJmsAccessException(e);
         }
         if (converted instanceof String) {
-            if (encoding != null) {
-                result = ((String)converted).getBytes(encoding);
-            } else {
-                // Using the UTF-8 encoding as default
-                result = ((String)converted).getBytes("UTF-8");
-            }
-            inMessage.setContent(InputStream.class, new ByteArrayInputStream(result));
+            inMessage.setContent(Reader.class, new StringReader((String)converted));
             messageType = "text";
         } else if (converted instanceof byte[]) {
             result = (byte[])converted;

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Tue Mar 20 01:21:50 2012
@@ -20,6 +20,7 @@ package org.apache.cxf.transport.jms;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.Writer;
 import java.net.URL;
 
 import javax.xml.namespace.QName;
@@ -107,9 +108,15 @@ public abstract class AbstractJMSTester 
             ex.printStackTrace();
         }
         OutputStream os = message.getContent(OutputStream.class);
-        assertTrue("The OutputStream should not be null ", os != null);
-        os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
-        os.close();
+        Writer writer = message.getContent(Writer.class);
+        assertTrue("The OutputStream and Writer should not both be null ", os != null || writer != null);
+        if (os != null) {
+            os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
+            os.close();
+        } else {
+            writer.write(MESSAGE_CONTENT);
+            writer.close();
+        }
     }
 
     protected void adjustEndpointInfoURL() {

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Tue Mar 20 01:21:50 2012
@@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.Writer;
 import java.nio.charset.Charset;
 import java.util.logging.Logger;
 
@@ -85,7 +86,8 @@ public class JMSConduitTest extends Abst
 
     private void verifySentMessage(boolean send, Message message) {
         OutputStream os = message.getContent(OutputStream.class);
-        assertTrue("OutputStream should not be null", os != null);
+        Writer writer = message.getContent(Writer.class);
+        assertTrue("The OutputStream and Writer should not both be null ", os != null || writer != null);
     }
 
     /*

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Tue Mar 20 01:21:50 2012
@@ -22,6 +22,8 @@ package org.apache.cxf.transport.jms;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -348,14 +350,27 @@ public class JMSDestinationTest extends 
 
     private void verifyReceivedMessage(Message message) {
         ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class);
-        byte bytes[] = new byte[bis.available()];
-        try {
-            bis.read(bytes);
-        } catch (IOException ex) {
-            assertFalse("Read the Destination recieved Message error ", false);
-            ex.printStackTrace();
+        String response = "<not found>";
+        if (bis != null) {
+            byte bytes[] = new byte[bis.available()];
+            try {
+                bis.read(bytes);
+            } catch (IOException ex) {
+                assertFalse("Read the Destination recieved Message error ", false);
+                ex.printStackTrace();
+            }
+            response = IOUtils.newStringFromBytes(bytes);
+        } else {
+            StringReader reader = (StringReader)message.getContent(Reader.class);
+            char buffer[] = new char[5000];
+            try {
+                int i = reader.read(buffer);
+                response = new String(buffer, 0 , i);
+            } catch (IOException e) {
+                assertFalse("Read the Destination recieved Message error ", false);
+                e.printStackTrace();
+            }
         }
-        String response = IOUtils.newStringFromBytes(bytes);
         assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response);
     }
 

Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSJmsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSJmsTest.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSJmsTest.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSJmsTest.java Tue Mar 20 01:21:50 2012
@@ -190,7 +190,7 @@ public class JAXRSJmsTest extends Abstra
         MessageProducer producer = session.createProducer(destination);
         
         Message message = JMSUtils.createAndSetPayload(
-            writeBook(new Book("JMS OneWay", 125L)), session, "text");
+            writeBook(new Book("JMS OneWay", 125L)), session, "byte");
         message.setStringProperty("Content-Type", "application/xml");
         message.setStringProperty(org.apache.cxf.message.Message.REQUEST_URI, "/bookstore/oneway");
         message.setStringProperty(org.apache.cxf.message.Message.HTTP_REQUEST_METHOD, "PUT");
@@ -203,7 +203,7 @@ public class JAXRSJmsTest extends Abstra
         throws Exception {
         MessageProducer producer = session.createProducer(destination);
         
-        Message message = JMSUtils.createAndSetPayload(writeBook(new Book("JMS", 3L)), session, "text");
+        Message message = JMSUtils.createAndSetPayload(writeBook(new Book("JMS", 3L)), session, "byte");
         message.setJMSReplyTo(replyTo);
         // or, if oneway,
         // message.setStringProperty("OnewayRequest", "true");
@@ -235,12 +235,12 @@ public class JAXRSJmsTest extends Abstra
         return (Book)u.unmarshal(is);
     }
     
-    private String writeBook(Book b) throws Exception {
+    private byte[] writeBook(Book b) throws Exception {
         JAXBContext c = JAXBContext.newInstance(new Class[]{Book.class});
         Marshaller m = c.createMarshaller();
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         m.marshal(b, bos);
-        return bos.toString();
+        return bos.toByteArray();
     }
     
     

Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JMSBookStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JMSBookStore.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JMSBookStore.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JMSBookStore.java Tue Mar 20 01:21:50 2012
@@ -146,18 +146,18 @@ public class JMSBookStore {
         MessageProducer producer = session.createProducer(destination);
         
         Message message = JMSUtils.createAndSetPayload(
-            writeBook(book), session, "text");
+            writeBook(book), session, "byte");
                     
         producer.send(message);
         producer.close();
     }
     
-    private String writeBook(Book b) throws Exception {
+    private byte[] writeBook(Book b) throws Exception {
         JAXBContext c = JAXBContext.newInstance(new Class[]{Book.class});
         Marshaller m = c.createMarshaller();
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         m.marshal(b, bos);
-        return bos.toString();
+        return bos.toByteArray();
     }
 }
 

Modified: cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java?rev=1302744&r1=1302743&r2=1302744&view=diff
==============================================================================
--- cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java (original)
+++ cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java Tue Mar 20 01:21:50 2012
@@ -66,6 +66,8 @@ import org.apache.cxf.hello_world_jms.He
 import org.apache.cxf.hello_world_jms.HelloWorldServiceRuntimeCorrelationIDStaticPrefix;
 import org.apache.cxf.hello_world_jms.NoSuchCodeLitFault;
 import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.interceptor.LoggingInInterceptor;
+import org.apache.cxf.interceptor.LoggingOutInterceptor;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.apache.cxf.jms_greeter.JMSGreeterPortType;
 import org.apache.cxf.jms_greeter.JMSGreeterService;
@@ -158,6 +160,8 @@ public class JMSClientServerTest extends
             
             Client client = ClientProxy.getClient(greeter);
             client.getEndpoint().getOutInterceptors().add(new TibcoSoapActionInterceptor());
+            client.getOutInterceptors().add(new LoggingOutInterceptor());
+            client.getInInterceptors().add(new LoggingInInterceptor());
             EndpointInfo ei = client.getEndpoint().getEndpointInfo();
             AddressType address = ei.getTraversedExtensor(new AddressType(), AddressType.class);
             JMSNamingPropertyType name = new JMSNamingPropertyType();