You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/05/26 21:46:13 UTC

[45/50] [abbrv] cxf git commit: [CXF-6646] CXF 3.x WSRM Replace RewindableInputStream with CachedOutputStream

[CXF-6646] CXF 3.x WSRM Replace RewindableInputStream with CachedOutputStream


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e8530930
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e8530930
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e8530930

Branch: refs/heads/master-jaxrs-2.1
Commit: e8530930045a5784fb1ed1f5b58658d7baae2238
Parents: 9ea9fac
Author: Kai Rommel <ka...@sap.com>
Authored: Wed May 4 02:45:38 2016 +0200
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Wed May 25 16:19:39 2016 +0200

----------------------------------------------------------------------
 .../apache/cxf/ws/rm/DestinationSequence.java   |  7 +--
 .../cxf/ws/rm/RMCaptureInInterceptor.java       |  6 +--
 .../cxf/ws/rm/RMCaptureOutInterceptor.java      | 13 ++++--
 .../apache/cxf/ws/rm/RMMessageConstants.java    |  6 ++-
 .../cxf/ws/rm/persistence/PersistenceUtils.java | 48 +++++++++++++++-----
 .../apache/cxf/ws/rm/persistence/RMMessage.java | 14 +++---
 .../cxf/ws/rm/persistence/jdbc/RMTxStore.java   | 18 +++++++-
 .../cxf/ws/rm/soap/RetransmissionQueueImpl.java | 43 ++++++++++++++----
 .../org/apache/cxf/ws/rm/RMManagerTest.java     | 22 ++++++---
 .../ws/rm/persistence/PersistenceUtilsTest.java | 22 ++++++---
 .../ws/rm/persistence/RMLargeMessageTest.java   |  4 --
 .../cxf/ws/rm/persistence/RMMessageTest.java    | 16 +------
 .../rm/persistence/jdbc/RMTxStoreTestBase.java  | 25 +++++++---
 13 files changed, 167 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 2e1a54b..58b7906 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -33,6 +33,7 @@ import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
@@ -167,9 +168,9 @@ public class DestinationSequence extends AbstractSequence {
             RMMessage msg = null;
             if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
                 msg = new RMMessage();
-                RewindableInputStream in = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
-                in.rewind();
-                msg.setContent(in);
+                CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+                msg.setContent(cos);
+                msg.setContentType((String) message.get(Message.CONTENT_TYPE));
                 msg.setMessageNumber(st.getMessageNumber());
             }
             store.persistIncoming(this, msg);

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
index 40b4fab..9d48cbc 100755
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
@@ -57,9 +57,9 @@ public class RMCaptureInInterceptor extends AbstractRMInterceptor<Message> {
                     saved.lockOutputStream();
 
                     LOG.fine("Capturing the original RM message");
-                    RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream());
-                    message.setContent(InputStream.class, ris);
-                    message.put(RMMessageConstants.SAVED_CONTENT, ris);
+                    //RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream());
+                    message.setContent(InputStream.class, saved.getInputStream());
+                    message.put(RMMessageConstants.SAVED_CONTENT, saved);
                 } catch (Exception e) {
                     throw new Fault(e);
                 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index c0ca125..4514e03 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -19,8 +19,8 @@
 
 package org.apache.cxf.ws.rm;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -35,10 +35,12 @@ import org.apache.cxf.binding.Binding;
 import org.apache.cxf.binding.soap.interceptor.SoapOutInterceptor;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
 import org.apache.cxf.interceptor.AttachmentOutInterceptor;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.LoggingOutInterceptor;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.FaultMode;
@@ -255,8 +257,11 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message>  {
                 }
                 
                 // save message for potential retransmission
-                ByteArrayInputStream bis = cw.getOutputStream().createInputStream();
-                message.put(RMMessageConstants.SAVED_CONTENT, RewindableInputStream.makeRewindable(bis));
+                CachedOutputStream cos = new CachedOutputStream();
+                IOUtils.copyAndCloseInput(cw.getOutputStream().createInputStream(), cos);
+                cos.flush();
+                InputStream is = cos.getInputStream();
+                message.put(RMMessageConstants.SAVED_CONTENT, cos);
                 RMManager manager = getManager();
                 manager.getRetransmissionQueue().start();
                 manager.getRetransmissionQueue().addUnacknowledged(message);
@@ -276,7 +281,7 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message>  {
                     }
                     // serializes the message content and the attachments into
                     // the RMMessage content
-                    PersistenceUtils.encodeRMContent(msg, message, bis);
+                    PersistenceUtils.encodeRMContent(msg, message, is);
                     store.persistOutgoing(ss, msg);
                 }
                     

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
index eb6789c..15999e9 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
@@ -36,8 +36,12 @@ public final class RMMessageConstants {
     
     public static final String ORIGINAL_REQUESTOR_ROLE = "org.apache.cxf.client.original";
     
-    /** Message content (must be an instance of {@link RewindableInputStream}. */
+    /** Message content must be an instance of {@link CachedOutputStream}. */
     public static final String SAVED_CONTENT = "org.apache.cxf.ws.rm.content";
+
+    /** Variable holds reference to source streams of the attachments.
+     * It must be an instance of {@link Closeable}. */
+    public static final String ATTACHMENTS_CLOSEABLE = "org.apache.cxf.ws.rm.attachment.closeable";
     
     /** Retransmission in progress flag (Boolean.TRUE if in progress). */
     public static final String RM_RETRANSMISSION = "org.apache.cxf.ws.rm.retransmitting";

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
index 0981f8e..43f01d9 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.ws.rm.persistence;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -38,7 +39,6 @@ import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.staxutils.StaxUtils;
 import org.apache.cxf.ws.rm.RMMessageConstants;
-import org.apache.cxf.ws.rm.RewindableInputStream;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
 
 /**
@@ -105,13 +105,14 @@ public final class PersistenceUtils {
 
     public static void encodeRMContent(RMMessage rmmsg, Message msg, InputStream msgContent)
         throws IOException {
+        CachedOutputStream cos = new CachedOutputStream();
         if (msg.getAttachments() == null) {
             rmmsg.setContentType((String)msg.get(Message.CONTENT_TYPE));
-            rmmsg.setContent(msgContent);
+            IOUtils.copyAndCloseInput(msgContent, cos);
+            cos.flush();
+            rmmsg.setContent(cos);
         } else {
             MessageImpl msgImpl1 = new MessageImpl();
-            // using cached output stream to handle large files
-            CachedOutputStream cos = new CachedOutputStream();
             msgImpl1.setContent(OutputStream.class, cos);
             msgImpl1.setAttachments(msg.getAttachments());
             msgImpl1.put(Message.CONTENT_TYPE, (String) msg.get(Message.CONTENT_TYPE));
@@ -121,26 +122,49 @@ public final class PersistenceUtils {
             serializer.writeProlog();
             // write soap root message into cached output stream
             IOUtils.copyAndCloseInput(msgContent, cos);
+            cos.flush();
             serializer.writeAttachments();
             rmmsg.setContentType((String) msgImpl1.get(Message.CONTENT_TYPE));
-
-            //TODO will pass the cos instance to rmmessage in the future
-            rmmsg.setContent(cos.getInputStream());
+            rmmsg.setContent(cos);
         }
     }
 
     public static void decodeRMContent(RMMessage rmmsg, Message msg) throws IOException {
         String contentType = rmmsg.getContentType();
+        final CachedOutputStream cos = rmmsg.getContent();
         if ((null != contentType) && contentType.startsWith("multipart/related")) {
+            final InputStream is = cos.getInputStream();
             msg.put(Message.CONTENT_TYPE, contentType);
-            msg.setContent(InputStream.class, rmmsg.getContent());
+            msg.setContent(InputStream.class, is);
             AttachmentDeserializer ad = new AttachmentDeserializer(msg);
             ad.initializeAttachments();
+            // create new cos with soap envelope only
+            CachedOutputStream cosSoap = new CachedOutputStream();
+            IOUtils.copy(msg.getContent(InputStream.class), cosSoap);
+            cosSoap.flush();
+            msg.put(RMMessageConstants.SAVED_CONTENT, cosSoap);
+            // REVISIT -- At the moment references must be hold for retransmission
+            // and the final cleanup of the CachedOutputStream.  
+            msg.put(RMMessageConstants.ATTACHMENTS_CLOSEABLE, new Closeable() {
+
+                @Override
+                public void close() throws IOException {
+                    try {
+                        is.close();
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                    try {
+                        cos.close();
+                    } catch (IOException e) {
+                        // Ignore
+                    }                   
+                }
+                
+            });
         } else {
-            msg.setContent(InputStream.class, rmmsg.getContent());
+            msg.put(RMMessageConstants.SAVED_CONTENT, cos);
         }
-        InputStream is = RewindableInputStream.makeRewindable(msg.getContent(InputStream.class));
-        msg.setContent(InputStream.class, is);
-        msg.put(RMMessageConstants.SAVED_CONTENT, is);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
index abab221..348117c 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
@@ -22,9 +22,11 @@ import java.io.InputStream;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.cxf.io.CachedOutputStream;
+
 public class RMMessage {
     
-    private InputStream content;
+    private CachedOutputStream content;
     //TODO remove attachments when we remove the deprecated attachments related methods
     private List<InputStream> attachments = Collections.emptyList();
     private String contentType;
@@ -48,11 +50,11 @@ public class RMMessage {
     }
     
     /**
-     * Sets the message content using the input stream.
+     * Sets the message content using the CachedOutputStream.class.
      * @param in
      */
-    public void setContent(InputStream in) {
-        content = in;
+    public void setContent(CachedOutputStream cos) {
+        content = cos;
     }
     
     /**
@@ -73,11 +75,11 @@ public class RMMessage {
     }
 
     /**
-     * Returns the input stream of this message content.
+     * Returns the CachedOutputStream of this message content.
      * @return
      * @throws IOException
      */
-    public InputStream getContent() {
+    public CachedOutputStream getContent() {
         return content;
     }
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
index 7e626a5..641df46 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
@@ -51,6 +51,8 @@ import org.apache.cxf.common.i18n.Message;
 import org.apache.cxf.common.injection.NoJSR250Annotations;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.SystemPropertyAction;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.rm.DestinationSequence;
 import org.apache.cxf.ws.rm.ProtocolVariation;
@@ -599,7 +601,10 @@ public class RMTxStore implements RMStore {
                 RMMessage msg = new RMMessage();
                 msg.setMessageNumber(mn);
                 msg.setTo(to);
-                msg.setContent(blob.getBinaryStream());
+                CachedOutputStream cos = new CachedOutputStream();
+                IOUtils.copyAndCloseInput(blob.getBinaryStream(), cos);
+                cos.flush();
+                msg.setContent(cos);
                 msg.setContentType(contentType);
                 msgs.add(msg);
             }
@@ -607,6 +612,9 @@ public class RMTxStore implements RMStore {
             conex = ex;
             LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
                 : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
+        } catch (IOException e) {
+            abort(con);
+            throw new RMStoreException(e);
         } finally {
             releaseResources(stmt, res);
             updateConnectionState(con, conex);
@@ -735,8 +743,10 @@ public class RMTxStore implements RMStore {
                     new Object[] {outbound ? "outbound" : "inbound", nr, id, to});
         }
         PreparedStatement stmt = null;
+        CachedOutputStream cos = msg.getContent();
+        InputStream msgin = null;
         try {
-            InputStream msgin = msg.getContent();
+            msgin = cos.getInputStream();
             stmt = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
 
             stmt.setString(1, id);
@@ -751,6 +761,10 @@ public class RMTxStore implements RMStore {
             }
         } finally  {
             releaseResources(stmt, null);
+            if (null != msgin) {
+                msgin.close();
+            }
+            cos.close(); // needed to clean-up tmp file folder
         }
     }
     

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
index 223430e..5ae80dd 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
@@ -19,6 +19,9 @@
 
 package org.apache.cxf.ws.rm.soap;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -57,6 +60,7 @@ import org.apache.cxf.helpers.DOMUtils;
 import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.io.CachedOutputStreamCallback;
 import org.apache.cxf.io.WriteOnCloseOutputStream;
 import org.apache.cxf.message.Message;
@@ -91,7 +95,6 @@ import org.apache.cxf.ws.rm.RMProperties;
 import org.apache.cxf.ws.rm.RMUtils;
 import org.apache.cxf.ws.rm.RetransmissionQueue;
 import org.apache.cxf.ws.rm.RetryStatus;
-import org.apache.cxf.ws.rm.RewindableInputStream;
 import org.apache.cxf.ws.rm.SourceSequence;
 import org.apache.cxf.ws.rm.manager.RetryPolicyType;
 import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -585,10 +588,24 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
         }
 
         private void releaseSavedMessage() {
-            RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
-            if (is != null) {
-                is.release();
+            CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            if (cos != null) {
+                cos.releaseTempFileHold();
+                try {
+                    cos.close();
+                } catch (IOException e) {
+                    // ignore
+                }
             }
+            // REVISIT -- When reference holder is not needed anymore, code can be removed.
+            Closeable closeable = (Closeable)message.get(RMMessageConstants.ATTACHMENTS_CLOSEABLE);
+            if (closeable != null) {
+                try {
+                    closeable.close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }           
         }
 
         /**
@@ -760,8 +777,9 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
             }
             
             // read SOAP headers from saved input stream
-            RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
-            is.rewind();
+            CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            cos.holdTempFile(); // CachedOutputStream is hold until delivering was successful
+            InputStream is = cos.getInputStream(); // instance is needed to close input stream later on
             XMLStreamReader reader = StaxUtils.createXMLStreamReader(is, StandardCharsets.UTF_8.name());
             message.getHeaders().clear();
             if (reader.getEventType() != XMLStreamConstants.START_ELEMENT
@@ -814,7 +832,7 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
                     retransmitChain.remove(incept);
                 }
             }
-            retransmitChain.add(new CopyOutInterceptor(reader));
+            retransmitChain.add(new CopyOutInterceptor(reader, is));
             
             // restore callbacks on output stream
             if (callbacks != null) {
@@ -922,10 +940,12 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
     
     public static class CopyOutInterceptor extends AbstractOutDatabindingInterceptor {
         private final XMLStreamReader reader;
+        private InputStream is;
         
-        public CopyOutInterceptor(XMLStreamReader rdr) {
+        public CopyOutInterceptor(XMLStreamReader rdr, InputStream is) {
             super(Phase.MARSHAL);
             reader = rdr;
+            this.is = is;
         }
         
         @Override
@@ -933,6 +953,13 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
             try {
                 XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
                 StaxUtils.copy(reader, writer);
+                if (is != null) {
+                    try {
+                        is.close();
+                    } catch (IOException e) {
+                        // ignore
+                    }
+                }
             } catch (XMLStreamException e) {
                 throw new Fault("COULD_NOT_READ_XML_STREAM", LOG, e);
             }

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
index 645708a..37fb6ac 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
@@ -41,6 +41,8 @@ import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.Service;
@@ -505,7 +507,7 @@ public class RMManagerTest extends Assert {
     }
     
     @Test
-    public void testRecoverReliableClientEndpoint() throws NoSuchMethodException {
+    public void testRecoverReliableClientEndpoint() throws NoSuchMethodException, IOException {
         Method method = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
             new Class[] {Endpoint.class});
         manager = control.createMock(RMManager.class, new Method[] {method});
@@ -563,7 +565,10 @@ public class RMManagerTest extends Assert {
         DestinationSequence ds = control.createMock(DestinationSequence.class);
         RMMessage m1 = new RMMessage();
         InputStream fis = getClass().getResourceAsStream("persistence/SerializedRMMessage.txt");
-        m1.setContent(fis);
+        CachedOutputStream cos = new CachedOutputStream();
+        IOUtils.copyAndCloseInput(fis, cos);
+        cos.flush();
+        m1.setContent(cos);
         m1.setTo("toAddress");
         m1.setMessageNumber(new Long(10));
         m1.setContentType(MULTIPART_TYPE);
@@ -579,8 +584,8 @@ public class RMManagerTest extends Assert {
         assertNotNull(msg.getExchange());
         assertSame(msg, msg.getExchange().getOutMessage());
 
-        InputStream is = (InputStream) msg.get(RMMessageConstants.SAVED_CONTENT);
-        assertStartsWith(is, "<soap:Envelope");
+        CachedOutputStream cos1 = (CachedOutputStream) msg.get(RMMessageConstants.SAVED_CONTENT);
+        assertStartsWith(cos1.getInputStream(), "<soap:Envelope");
         assertEquals(1, msg.getAttachments().size());
     }
     
@@ -673,7 +678,8 @@ public class RMManagerTest extends Assert {
     void setUpRecoverReliableEndpoint(Endpoint endpoint,
                                       Conduit conduit, 
                                       SourceSequence ss, 
-                                      DestinationSequence ds, RMMessage m, Capture<Message> mc)  {                
+                                      DestinationSequence ds, RMMessage m, Capture<Message> mc) 
+                                          throws IOException  {                
         RMStore store = control.createMock(RMStore.class);
         RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
         manager.setStore(store);
@@ -735,7 +741,11 @@ public class RMManagerTest extends Assert {
             EasyMock.expect(m.getTo()).andReturn("toAddress");
         }
         InputStream is = new ByteArrayInputStream(new byte[0]);
-        EasyMock.expect(m.getContent()).andReturn(is).anyTimes();
+        CachedOutputStream cos = new CachedOutputStream();
+        IOUtils.copy(is, cos);
+        cos.flush();
+        is.close();
+        EasyMock.expect(m.getContent()).andReturn(cos).anyTimes();
 
         if (mc != null) {
             queue.addUnacknowledged(EasyMock.capture(mc));

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
index c0667fb..9cccd2a 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
@@ -29,9 +29,12 @@ import javax.activation.DataHandler;
 import javax.mail.util.ByteArrayDataSource;
 
 import org.apache.cxf.attachment.AttachmentImpl;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Attachment;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.ws.rm.RMMessageConstants;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange;
 
@@ -76,7 +79,7 @@ public class PersistenceUtilsTest extends Assert {
         // update rmmessage
         PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
 
-        assertStartsWith(rmmsg.getContent(), "<soap:");
+        assertStartsWith(rmmsg.getContent().getInputStream(), "<soap:");
         assertNotNull(rmmsg.getContentType());
         assertTrue(rmmsg.getContentType().startsWith("text/xml"));
     }
@@ -93,7 +96,7 @@ public class PersistenceUtilsTest extends Assert {
         // update rmmessage
         PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
 
-        assertStartsWith(rmmsg.getContent(), "--uuid:");
+        assertStartsWith(rmmsg.getContent().getInputStream(), "--uuid:");
         assertNotNull(rmmsg.getContentType());
         assertTrue(rmmsg.getContentType().startsWith("multipart/related"));
     }
@@ -112,21 +115,26 @@ public class PersistenceUtilsTest extends Assert {
         Message messageImplRestored = new MessageImpl();
         PersistenceUtils.decodeRMContent(rmmsg, messageImplRestored);
         assertEquals(1, messageImplRestored.getAttachments().size());
-
-        assertStartsWith(messageImplRestored.getContent(InputStream.class), SOAP_PART);
+        CachedOutputStream cos = (CachedOutputStream)messageImplRestored.get(RMMessageConstants.SAVED_CONTENT);
+        assertStartsWith(cos.getInputStream(), SOAP_PART);
     }
     
     @Test
     public void testDecodeRMContentWithAttachment() throws Exception {
         InputStream is = getClass().getResourceAsStream("SerializedRMMessage.txt");
+        CachedOutputStream cos = new CachedOutputStream();
+        IOUtils.copyAndCloseInput(is, cos);
+        cos.flush();
         RMMessage msg = new RMMessage();
-        msg.setContent(is);      
+        msg.setContent(cos);      
         msg.setContentType(MULTIPART_TYPE);
         Message messageImpl = new MessageImpl();
         PersistenceUtils.decodeRMContent(msg, messageImpl);
 
         assertEquals(1, messageImpl.getAttachments().size());
-        assertStartsWith(messageImpl.getContent(InputStream.class), "<soap:Envelope");
+        CachedOutputStream cos1 =  (CachedOutputStream)messageImpl
+            .get(RMMessageConstants.SAVED_CONTENT);
+        assertStartsWith(cos1.getInputStream(), "<soap:Envelope");
     }
 
     private static void addAttachment(Message msg) throws IOException {
@@ -137,7 +145,7 @@ public class PersistenceUtilsTest extends Assert {
         msg.setAttachments(attachments);
     }
 
-    // just read the begining of the input and compare it against the specified string
+    // just read the beginning of the input and compare it against the specified string
     private static boolean assertStartsWith(InputStream in, String starting) {
         assertNotNull(in);
         byte[] buf = new byte[starting.length()];

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
index 5badbe6..bf5c246 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
@@ -46,10 +46,6 @@ public class RMLargeMessageTest extends RMMessageTest {
         }
     }
     
-    @Test
-    public void testContentInputStream() throws Exception {
-        super.testContentInputStream();
-    }
     
     @Test
     public void testContentCachedOutputStream() throws Exception {

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
index 41322ff..1247f80 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.ws.rm.persistence;
 
-import java.io.ByteArrayInputStream;
-
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.io.CachedOutputStream;
 
@@ -49,23 +47,13 @@ public class RMMessageTest extends Assert {
     }
     
     @Test
-    public void testContentInputStream() throws Exception {
-        RMMessage msg = new RMMessage();
-        msg.setContent(new ByteArrayInputStream(DATA));
-        
-        byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent());
-        
-        assertArrayEquals(DATA, msgbytes);
-    }
-    
-    @Test
     public void testContentCachedOutputStream() throws Exception {
         RMMessage msg = new RMMessage();
         CachedOutputStream co = new CachedOutputStream();
         co.write(DATA);
-        msg.setContent(co.getInputStream());
+        msg.setContent(co);
         
-        byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent());
+        byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent().getInputStream());
         
         assertArrayEquals(DATA, msgbytes);
         co.close();

http://git-wip-us.apache.org/repos/asf/cxf/blob/e8530930/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
index fb62f34..a5f9723 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
@@ -30,6 +30,7 @@ import java.util.Date;
 import java.util.List;
 
 import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.addressing.Names;
 import org.apache.cxf.ws.rm.DestinationSequence;
@@ -222,8 +223,13 @@ public abstract class RMTxStoreTestBase extends Assert {
         EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE).anyTimes(); 
         EasyMock.expect(msg2.getMessageNumber()).andReturn(ONE).anyTimes(); 
         byte[] bytes = new byte[89];
-        EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
-        EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        CachedOutputStream cos = new CachedOutputStream();
+        IOUtils.copy(bais, cos);
+        cos.flush();
+        bais.close();
+        EasyMock.expect(msg1.getContent()).andReturn(cos).anyTimes();
+        EasyMock.expect(msg2.getContent()).andReturn(cos).anyTimes();
         EasyMock.expect(msg1.getContentType()).andReturn("text/xml").times(1);
         control.replay();
 
@@ -241,7 +247,7 @@ public abstract class RMTxStoreTestBase extends Assert {
         
         control.reset();
         EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE); 
-        EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes));
+        EasyMock.expect(msg1.getContent()).andReturn(cos);
         
         control.replay();
         con = getConnection();
@@ -260,8 +266,8 @@ public abstract class RMTxStoreTestBase extends Assert {
         control.reset();
         EasyMock.expect(msg1.getMessageNumber()).andReturn(TEN).anyTimes();
         EasyMock.expect(msg2.getMessageNumber()).andReturn(TEN).anyTimes(); 
-        EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); 
-        EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); 
+        EasyMock.expect(msg1.getContent()).andReturn(cos).anyTimes(); 
+        EasyMock.expect(msg2.getContent()).andReturn(cos).anyTimes(); 
 
         control.replay();
         con = getConnection();
@@ -862,7 +868,12 @@ public abstract class RMTxStoreTestBase extends Assert {
 
         EasyMock.expect(msg.getContentType()).andReturn("text/xml").anyTimes();
         byte[] value = ("Message " + mn.longValue()).getBytes();
-        EasyMock.expect(msg.getContent()).andReturn(new ByteArrayInputStream(value)).anyTimes();
+        ByteArrayInputStream bais = new ByteArrayInputStream(value);
+        CachedOutputStream cos = new CachedOutputStream();
+        IOUtils.copy(bais, cos);
+        cos.flush();
+        bais.close();
+        EasyMock.expect(msg.getContent()).andReturn(cos).anyTimes();
         return msg;
     }
 
@@ -926,7 +937,7 @@ public abstract class RMTxStoreTestBase extends Assert {
                 assertNull(msg.getTo());
             }
             try {
-                InputStream actual = msg.getContent();
+                InputStream actual = msg.getContent().getInputStream();
                 assertEquals(new String("Message " + mn), IOUtils.readStringFromStream(actual));
             } catch (IOException e) {
                 fail("failed to get the input stream");