You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2011/03/23 01:03:19 UTC

svn commit: r1084417 - in /cxf/trunk: api/src/main/java/org/apache/cxf/io/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/ rt/ws/rm/src/...

Author: dkulp
Date: Wed Mar 23 00:03:19 2011
New Revision: 1084417

URL: http://svn.apache.org/viewvc?rev=1084417&view=rev
Log:
[CXF-1100] Change WS-RM to cache base on streams instead of byte[]
Patch from Aki Yoshida applied

Added:
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java   (with props)
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java   (with props)
Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java Wed Mar 23 00:03:19 2011
@@ -189,6 +189,12 @@ public class CachedOutputStream extends 
     }
 
     public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj instanceof CachedOutputStream) {
+            return currentStream.equals(((CachedOutputStream)obj).currentStream);
+        }
         return currentStream.equals(obj);
     }
 

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed Mar 23 00:03:19 2011
@@ -455,7 +455,7 @@ public class RMManager implements Server
                     RMContextUtils.storeMAPs(maps, message, true, false);
                 }
                                     
-                message.setContent(byte[].class, m.getContent());
+                message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
                           
                 retransmissionQueue.addUnacknowledged(message);
             }            

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java Wed Mar 23 00:03:19 2011
@@ -20,10 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 import java.io.IOException;
-import java.util.logging.Logger;
 
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.io.CachedOutputStreamCallback;
 import org.apache.cxf.message.Message;
@@ -37,8 +34,6 @@ import org.apache.cxf.ws.rm.persistence.
  */
 public class RetransmissionCallback implements CachedOutputStreamCallback {
     
-    private static final Logger LOG = LogUtils.getL7dLogger(RetransmissionCallback.class);
-
     Message message;
     RMManager manager;
     
@@ -47,18 +42,16 @@ public class RetransmissionCallback impl
         manager = mgr;
     }
     public void onClose(CachedOutputStream cos) {
-   
-        //REVISIT - would be nice to keep the cache on disk instead of in-memory 
-        byte bytes[] = null;
+        // make a copy as the original gets affected when its resetOut is called
+        CachedOutputStream saved = new CachedOutputStream();
+        saved.holdTempFile();
         try {
-            bytes = cos.getBytes();
+            cos.writeCacheTo(saved);            
         } catch (IOException e) {
-            throw new Fault(new org.apache.cxf.common.i18n.Message("NO_CACHED_STREAM", 
-                                                                   LOG, 
-                                                                   cos.getOut().getClass()));
+            // ignore
         }
-        
-        message.put(RMMessageConstants.SAVED_CONTENT, bytes);            
+
+        message.put(RMMessageConstants.SAVED_CONTENT, saved);
         manager.getRetransmissionQueue().addUnacknowledged(message);
         
         RMStore store = manager.getStore();
@@ -75,7 +68,7 @@ public class RetransmissionCallback impl
                     msg.setTo(maps.getTo().getValue());
                 }
             }
-            msg.setContent(bytes);
+            msg.setContent(saved);
             store.persistOutgoing(ss, msg); 
         }
     }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java Wed Mar 23 00:03:19 2011
@@ -18,9 +18,15 @@
  */
 package org.apache.cxf.ws.rm.persistence;
 
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
+
 public class RMMessage {
     
-    private byte[] content;
+    private CachedOutputStream content;
     private long messageNumber;
     private String to;
     
@@ -44,17 +50,50 @@ public class RMMessage {
     /**
      * Returns the content of the message as an input stream.
      * @return the content
+     * @deprecated
      */
     public byte[] getContent() {
-        return content;
+        byte[] bytes = null;
+        try {
+            bytes = content.getBytes();
+        } catch (IOException e) {
+            // ignore and treat it as null
+        }
+        return bytes;
     }
 
 
     /**
      * Sets the message content as an input stream.
      * @param content the message content
+     * @deprecated
      */
     public void setContent(byte[] c) {
+        content = new CachedOutputStream();
+        content.holdTempFile();
+        try {
+            content.write(c);
+        } catch (IOException e) {
+            // ignore
+        }
+    }
+    
+    /**
+     * Sets the message content using the input stream.
+     * @param in
+     * @throws IOException
+     */
+    public void setContent(InputStream in) throws IOException {
+        content = new CachedOutputStream();
+        content.holdTempFile();
+        IOUtils.copy(in, content);
+    }
+
+    /**
+     * Sets the message content using the cached output stream.
+     * @param c
+     */
+    public void setContent(CachedOutputStream c) {
         content = c;
     }
     
@@ -75,11 +114,29 @@ public class RMMessage {
         to = t;
     }
 
-
-    
-    
-    
-    
+    /**
+     * Returns the input stream of this message content.
+     * @return
+     * @throws IOException
+     */
+    public InputStream getInputStream() throws IOException {
+        return content.getInputStream();
+    }
     
+    /**
+     * Returns the associated cached output stream.
+     * @return
+     */
+    public CachedOutputStream getCachedOutputStream() {
+        return content;
+    }
     
+    /**
+     * Returns the length of the message content in bytes.
+     * 
+     * @return
+     */
+    public int getSize() {
+        return content.size();
+    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java Wed Mar 23 00:03:19 2011
@@ -43,6 +43,20 @@ public interface RMStore {
     void createDestinationSequence(DestinationSequence seq);
     
     /**
+     * Retrieve the source sequence with the specified identifier from persistent store. 
+     * @param seq the sequence
+     * @return the sequence if present; otherwise null
+     */
+    SourceSequence getSourceSequence(Identifier seq);
+    
+    /**
+     * Retrieve the destination sequence with the specified identifier from persistent store. 
+     * @param seq the sequence
+     * @return the sequence if present; otherwise null
+     */
+    DestinationSequence getDestinationSequence(Identifier seq);
+
+    /**
      * Remove the source sequence with the specified identifier from persistent store. 
      * @param seq the sequence
      */

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Wed Mar 23 00:03:19 2011
@@ -20,7 +20,6 @@
 package org.apache.cxf.ws.rm.persistence.jdbc;
 
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,7 +44,6 @@ import javax.annotation.PostConstruct;
 
 import org.apache.cxf.common.i18n.Message;
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
 import org.apache.cxf.ws.rm.DestinationSequence;
 import org.apache.cxf.ws.rm.Identifier;
@@ -106,6 +104,12 @@ public class RMTxStore implements RMStor
         = "INSERT INTO {0} VALUES(?, ?, ?, ?)";
     private static final String DELETE_MESSAGE_STMT_STR =
         "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
+    private static final String SELECT_DEST_SEQUENCE_STMT_STR =
+        "SELECT ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+        + "WHERE SEQ_ID = ?";
+    private static final String SELECT_SRC_SEQUENCE_STMT_STR =
+        "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CXF_RM_SRC_SEQUENCES "
+        + "WHERE SEQ_ID = ?";
     private static final String SELECT_DEST_SEQUENCES_STMT_STR =
         "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
         + "WHERE ENDPOINT_ID = ?";
@@ -131,6 +135,8 @@ public class RMTxStore implements RMStor
     private PreparedStatement updateSrcSequenceStmt;
     private PreparedStatement selectDestSequencesStmt;
     private PreparedStatement selectSrcSequencesStmt;
+    private PreparedStatement selectDestSequenceStmt;
+    private PreparedStatement selectSrcSequenceStmt;
     private PreparedStatement createInboundMessageStmt;
     private PreparedStatement createOutboundMessageStmt;
     private PreparedStatement deleteInboundMessageStmt;
@@ -242,6 +248,68 @@ public class RMTxStore implements RMStor
         }
     }
     
+    public DestinationSequence getDestinationSequence(Identifier sid) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.info("Getting destination sequence for id: " + sid);
+        }
+        try {
+            if (null == selectDestSequenceStmt) {
+                selectDestSequenceStmt = 
+                    connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);               
+            }
+            selectDestSequenceStmt.setString(1, sid.getValue());
+            
+            ResultSet res = selectDestSequenceStmt.executeQuery(); 
+            if (res.next()) {
+                EndpointReferenceType acksTo = RMUtils.createReference2004(res.getString(1));  
+                long lm = res.getLong(2);
+                InputStream is = res.getBinaryStream(3); 
+                SequenceAcknowledgement ack = null;
+                if (null != is) {
+                    ack = PersistenceUtils.getInstance()
+                        .deserialiseAcknowledgment(is); 
+                }
+                return new DestinationSequence(sid, acksTo, lm, ack);
+            }
+        } catch (SQLException ex) {
+            LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(), ex);
+        }
+        return null;
+    }
+    
+    public SourceSequence getSourceSequence(Identifier sid) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.info("Getting source sequences for id: " + sid);
+        }
+        try {
+            if (null == selectSrcSequenceStmt) {
+                selectSrcSequenceStmt = 
+                    connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);     
+            }
+            selectSrcSequenceStmt.setString(1, sid.getValue());
+            ResultSet res = selectSrcSequenceStmt.executeQuery();
+            
+            if (res.next()) {
+                long cmn = res.getLong(1);
+                boolean lm = res.getBoolean(2);
+                long lval = res.getLong(3);
+                Date expiry = 0 == lval ? null : new Date(lval);
+                String oidValue = res.getString(4);
+                Identifier oi = null;
+                if (null != oidValue) {
+                    oi = RMUtils.getWSRMFactory().createIdentifier();
+                    oi.setValue(oidValue);
+                }                            
+                return new SourceSequence(sid, expiry, oi, cmn, lm);
+                          
+            }
+        } catch (SQLException ex) {
+            // ignore
+            LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG", LOG).toString(), ex);
+        }
+        return null;
+    }
+
     public void removeDestinationSequence(Identifier sid) {
         try {
             beginTransaction();
@@ -367,17 +435,16 @@ public class RMTxStore implements RMStor
                 long mn = res.getLong(1);
                 String to = res.getString(2);
                 Blob blob = res.getBlob(3);
-                byte[] bytes = blob.getBytes(1, (int)blob.length());     
                 RMMessage msg = new RMMessage();
                 msg.setMessageNumber(mn);
                 msg.setTo(to);
-                msg.setContent(bytes);
+                msg.setContent(blob.getBinaryStream());
                 msgs.add(msg);                
             }            
-        } catch (SQLException ex) {
+        } catch (Exception ex) {
             LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
                 : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
-        }        
+        }
         return msgs;
     }
 
@@ -496,12 +563,7 @@ public class RMTxStore implements RMStor
         stmt.setString(i++, id);  
         stmt.setBigDecimal(i++, new BigDecimal(nr));
         stmt.setString(i++, to); 
-        byte[] bytes = msg.getContent();    
-        stmt.setBinaryStream(i++, new ByteArrayInputStream(bytes) {
-            public String toString() {
-                return IOUtils.newStringFromBytes(buf, 0, count);
-            }
-        }, bytes.length);
+        stmt.setBinaryStream(i++, msg.getInputStream(), msg.getSize());
         stmt.execute();
         LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}",
                 new Object[] {outbound ? "outbound" : "inbound", nr, id});

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Wed Mar 23 00:03:19 2011
@@ -21,6 +21,7 @@ package org.apache.cxf.ws.rm.soap;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.ConnectException;
 import java.util.ArrayList;
@@ -336,19 +337,26 @@ public class RetransmissionQueueImpl imp
                     }
                 }
             }
-            byte[] content = (byte[])message
+            CachedOutputStream content = (CachedOutputStream)message
                 .get(RMMessageConstants.SAVED_CONTENT);
-            if (null == content) {                
-                content = message.getContent(byte[].class); 
+            InputStream bis = null;
+            if (null == content) {
+                byte[] savedbytes = message.getContent(byte[].class);
+                bis = new ByteArrayInputStream(savedbytes); 
                 if (LOG.isLoggable(Level.FINE)) {
-                    LOG.fine("Using saved byte array: " + Arrays.toString(content));
+                    LOG.fine("Using saved byte array: " + Arrays.toString(savedbytes));
                 }
             } else {
+                bis = content.getInputStream();
                 if (LOG.isLoggable(Level.FINE)) {
-                    LOG.fine("Using saved output stream: " + IOUtils.newStringFromBytes(content));
+                    if (content.size() < 65536) {
+                        LOG.fine("Using saved output stream: " 
+                                 + IOUtils.newStringFromBytes(content.getBytes()));                        
+                    } else {                        
+                        LOG.fine("Using saved output stream: ...");                        
+                    }
                 }
             }
-            ByteArrayInputStream bis = new ByteArrayInputStream(content);
 
             // copy saved output stream to new output stream in chunks of 1024
             IOUtils.copyAndCloseInput(bis, os);
@@ -474,6 +482,7 @@ public class RetransmissionQueueImpl imp
             next = null;
             if (null != nextTask) {
                 nextTask.cancel();
+                releaseSavedMessage();
             }
         }
         
@@ -483,9 +492,17 @@ public class RetransmissionQueueImpl imp
         protected void cancel() {
             if (null != nextTask) {
                 nextTask.cancel();
+                releaseSavedMessage();
             }
         }
 
+        private void releaseSavedMessage() {
+            CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            if (saved != null) {
+                saved.releaseTempFileHold();
+            }
+
+        }
         /**
          * @return associated message context
          */

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java Wed Mar 23 00:03:19 2011
@@ -153,6 +153,16 @@ public class RMManagerConfigurationTest 
             // TODO Auto-generated method stub
             
         }
+
+        public SourceSequence getSourceSequence(Identifier seq) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        public DestinationSequence getDestinationSequence(Identifier seq) {
+            // TODO Auto-generated method stub
+            return null;
+        }
         
     }
 }

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Wed Mar 23 00:03:19 2011
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,6 +37,7 @@ import org.apache.cxf.bus.spring.SpringB
 import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.Service;
@@ -587,7 +589,14 @@ public class RMManagerTest extends Asser
             EasyMock.expect(m.getTo()).andReturn("toAddress");
         }
         byte[] content = new byte[] {'x', '9'};
-        EasyMock.expect(m.getContent()).andReturn(content);
+        CachedOutputStream cos = new CachedOutputStream();
+        try {
+            cos.write(content);
+        } catch (IOException e) {
+            // ignore
+        }
+        EasyMock.expect(m.getCachedOutputStream()).andReturn(cos);
+
         queue.addUnacknowledged(EasyMock.isA(Message.class));
         EasyMock.expectLastCall();
         queue.start();

Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.persistence;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * A version of RMMessageTest using a cached out set up.
+ */
+public class RMLargeMessageTest extends RMMessageTest {
+    private static String oldThreshold;
+
+    @BeforeClass
+    public static void setupThreshold() throws Exception {
+        oldThreshold = System.getProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+        // forces the CacheOutputStream to use temporary file caching
+        System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", "16");
+    }
+
+    @AfterClass
+    public static void cleanup() throws Exception {
+        if (oldThreshold == null) {
+            System.clearProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+        } else {
+            System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", oldThreshold);
+        }
+    }
+    
+    @Test
+    public void testContentInputStream() throws Exception {
+        super.testContentInputStream();
+    }
+    
+    @Test
+    public void testContentCachedOutputStream() throws Exception {
+        super.testContentCachedOutputStream();
+    }
+}

Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.persistence;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * 
+ */
+public class RMMessageTest extends Assert {
+    private static final byte[] DATA = 
+        ("<greetMe xmlns=\"http://cxf.apache.org/greeter_control/types\">" 
+        + "<requestType>one</requestType></greetMe>").getBytes();
+    private static final String TO = "http://localhost:9999/decoupled_endpoint";
+
+    @Test
+    public void testAttributes() throws Exception {
+        RMMessage msg = new RMMessage();
+
+        msg.setTo(TO);
+        msg.setMessageNumber(1);
+        
+        assertEquals(msg.getTo(), TO);
+        assertEquals(msg.getMessageNumber(), 1);
+    }
+    
+    @Test
+    public void testContentInputStream() throws Exception {
+        RMMessage msg = new RMMessage();
+        msg.setContent(new ByteArrayInputStream(DATA));
+        
+        assertEquals(DATA.length, msg.getSize());
+        byte[] msgbytes = IOUtils.readBytesFromStream(msg.getInputStream());
+        
+        assertArrayEquals(DATA, msgbytes);
+    }
+    
+    @Test
+    public void testContentCachedOutputStream() throws Exception {
+        RMMessage msg = new RMMessage();
+        CachedOutputStream co = new CachedOutputStream();
+        co.write(DATA);
+        msg.setContent(co);
+        
+        assertEquals(DATA.length, msg.getSize());
+        byte[] msgbytes = IOUtils.readBytesFromStream(msg.getInputStream());
+        
+        assertArrayEquals(DATA, msgbytes);
+
+        assertEquals(co, msg.getCachedOutputStream());
+    }
+}

Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java Wed Mar 23 00:03:19 2011
@@ -19,7 +19,9 @@
 
 package org.apache.cxf.ws.rm.persistence.jdbc;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -227,7 +229,8 @@ public class RMTxStoreTest extends Asser
         sid1.setValue("sequence1");
         EasyMock.expect(msg.getMessageNumber()).andReturn(ONE).times(2); 
         byte[] bytes = new byte[89];
-        EasyMock.expect(msg.getContent()).andReturn(bytes).times(2);
+        EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+        EasyMock.expect(msg.getSize()).andReturn(bytes.length);
         
         control.replay();
         store.beginTransaction();
@@ -238,7 +241,8 @@ public class RMTxStoreTest extends Asser
         
         control.reset();
         EasyMock.expect(msg.getMessageNumber()).andReturn(ONE); 
-        EasyMock.expect(msg.getContent()).andReturn(bytes);
+        EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+        EasyMock.expect(msg.getSize()).andReturn(bytes.length);
         
         control.replay();
         store.beginTransaction();
@@ -252,7 +256,8 @@ public class RMTxStoreTest extends Asser
         
         control.reset();
         EasyMock.expect(msg.getMessageNumber()).andReturn(TEN).times(2); 
-        EasyMock.expect(msg.getContent()).andReturn(bytes).times(2); 
+        EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes)); 
+        EasyMock.expect(msg.getSize()).andReturn(bytes.length);
         
         control.replay();
         store.beginTransaction();
@@ -411,6 +416,62 @@ public class RMTxStoreTest extends Asser
     }
 
     @Test
+    public void testGetDestinationSequence() throws SQLException, IOException {
+        
+        Identifier sid1 = null;
+        Identifier sid2 = null;
+        
+        DestinationSequence seq = store.getDestinationSequence(RMUtils.getWSRMFactory().createIdentifier());
+        assertNull(seq);
+
+        try {
+            sid1 = setupDestinationSequence("sequence1");
+
+            seq = store.getDestinationSequence(sid1);
+            assertNotNull(seq);
+
+            sid2 = setupDestinationSequence("sequence2");
+            seq = store.getDestinationSequence(sid2);
+            assertNotNull(seq);
+        } finally {
+            if (null != sid1) {
+                store.removeDestinationSequence(sid1);
+            }
+            if (null != sid2) {
+                store.removeDestinationSequence(sid2);
+            }
+        }
+    }
+
+    @Test
+    public void testGetSourceSequence() throws SQLException, IOException {
+        
+        Identifier sid1 = null;
+        Identifier sid2 = null;
+        
+        SourceSequence seq = store.getSourceSequence(RMUtils.getWSRMFactory().createIdentifier());
+        assertNull(seq);
+        
+        try {
+            sid1 = setupSourceSequence("sequence1");
+
+            seq = store.getSourceSequence(sid1);
+            assertNotNull(seq);
+
+            sid2 = setupSourceSequence("sequence2");
+            seq = store.getSourceSequence(sid2);
+            assertNotNull(seq);
+        } finally {
+            if (null != sid1) {
+                store.removeSourceSequence(sid1);
+            }
+            if (null != sid2) {
+                store.removeSourceSequence(sid2);
+            }
+        }
+    }
+
+    @Test
     public void testGetMessages() throws SQLException, IOException {
         
         Identifier sid1 = RMUtils.getWSRMFactory().createIdentifier();
@@ -528,8 +589,9 @@ public class RMTxStoreTest extends Asser
         RMMessage msg = control.createMock(RMMessage.class);
         EasyMock.expect(msg.getMessageNumber()).andReturn(mn);
         EasyMock.expect(msg.getTo()).andReturn(to);
-        String value = "Message " + mn.longValue();
-        EasyMock.expect(msg.getContent()).andReturn(value.getBytes());
+        byte[] value = ("Message " + mn.longValue()).getBytes();
+        EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(value));
+        EasyMock.expect(msg.getSize()).andReturn(value.length);
         
         control.replay();
         store.beginTransaction();
@@ -593,8 +655,12 @@ public class RMTxStoreTest extends Asser
             } else {
                 assertNull(msg.getTo());
             }
-            byte[] actual = msg.getContent();
-            assertEquals(new String("Message " + mn), IOUtils.newStringFromBytes(actual));
+            try {
+                InputStream actual = msg.getInputStream();
+                assertEquals(new String("Message " + mn), IOUtils.readStringFromStream(actual));
+            } catch (IOException e) {
+                fail("failed to get the input stream");
+            }
         }
     }
     

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A simulated-large message version of ClientPersistenceTest.
+ */
+public class CachedOutClientPersistenceTest extends ClientPersistenceTest {
+
+    private static String oldThreshold;
+    
+    @BeforeClass
+    public static void setProperties() throws Exception {
+        oldThreshold = System.getProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+        // forces the CacheOutputStream to use temporary file caching
+        System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", "16");
+    }
+    
+    @AfterClass
+    public static void cleanup() throws Exception {
+        if (oldThreshold == null) {
+            System.clearProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+        } else {
+            System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", oldThreshold);
+        }
+    }
+    
+    @Test 
+    public void testRecovery() throws Exception {
+        super.testRecovery();
+    }
+
+}

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Logger;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.interceptor.LoggingInInterceptor;
+import org.apache.cxf.interceptor.LoggingOutInterceptor;
+import org.apache.cxf.systest.ws.rm.RetransmissionQueueTest.Server;
+import org.apache.cxf.systest.ws.util.ConnectionHelper;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.ws.rm.RMManager;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the WS-RM processing with the cached out message (using temporary files). 
+ */
+public class CachedOutMessageTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = allocatePort(Server.class);
+    public static final String DECOUPLE_PORT = allocatePort("decoupled.port");
+
+    private static String oldThreshold;
+    
+    private static final Logger LOG = LogUtils.getLogger(RetransmissionQueueTest.class);
+    private Bus bus;
+    
+
+    
+    public static class Server extends AbstractBusTestServerBase {
+      
+        protected void run()  {            
+            SpringBusFactory bf = new SpringBusFactory();
+            Bus bus = bf.createBus("/org/apache/cxf/systest/ws/rm/message-loss.xml");
+            BusFactory.setDefaultBus(bus);
+            LoggingInInterceptor in = new LoggingInInterceptor();
+            bus.getInInterceptors().add(in);
+            bus.getInFaultInterceptors().add(in);
+            LoggingOutInterceptor out = new LoggingOutInterceptor();
+            bus.getOutInterceptors().add(out);
+            bus.getOutFaultInterceptors().add(out);
+            
+            GreeterImpl implementor = new GreeterImpl();
+            String address = "http://localhost:" + PORT + "/SoapContext/GreeterPort";
+            
+            Endpoint ep = Endpoint.create(implementor);
+            ep.publish(address);
+
+            LOG.info("Published greeter endpoint.");
+        }
+ 
+        public static void main(String[] args) {
+            try { 
+                Server s = new Server(); 
+                s.start();
+            } catch (Exception ex) {
+                ex.printStackTrace();
+                System.exit(-1);
+            } finally { 
+                System.out.println("done!");
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void startServers() throws Exception {
+        oldThreshold = System.getProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+        // forces the CacheOutputStream to use temporary file caching
+        System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", "16");
+
+        assertTrue("server did not launch correctly", 
+                   launchServer(Server.class));
+    }
+
+    @AfterClass
+    public static void cleanup() throws Exception {
+        if (oldThreshold == null) {
+            System.clearProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+        } else {
+            System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", oldThreshold);
+        }
+    }
+
+    @Test
+    public void testCachedOutMessage() throws Exception {
+        SpringBusFactory bf = new SpringBusFactory();
+        bus = bf.createBus("/org/apache/cxf/systest/ws/rm/message-loss.xml");
+        BusFactory.setDefaultBus(bus);
+        LoggingInInterceptor in = new LoggingInInterceptor();
+        bus.getInInterceptors().add(in);
+        bus.getInFaultInterceptors().add(in);
+        LoggingOutInterceptor out = new LoggingOutInterceptor();
+        bus.getOutInterceptors().add(out);
+        // an interceptor to simulate a message loss
+        MessageLossSimulator mls = new MessageLossSimulator();
+        bus.getOutInterceptors().add(mls);
+        RMManager manager = bus.getExtension(RMManager.class);
+        manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new Long(2000));
+        
+        bus.getOutFaultInterceptors().add(out);
+        
+        GreeterService gs = new GreeterService();
+        final Greeter greeter = gs.getGreeterPort();
+        updateAddressPort(greeter, DecoupledClientServerTest.PORT);
+        LOG.fine("Created greeter client.");
+       
+        ConnectionHelper.setKeepAliveConnection(greeter, true);
+        
+        greeter.greetMeOneWay("one");
+        greeter.greetMeOneWay("two");
+        greeter.greetMeOneWay("three");
+
+        long wait = 4000;
+        while (wait > 0) {
+            long start = System.currentTimeMillis();
+            try {
+                Thread.sleep(wait);
+            } catch (InterruptedException ex) {
+                // ignore
+            }
+            wait -= System.currentTimeMillis() - start;
+        }
+        
+        boolean empty = manager.getRetransmissionQueue().isEmpty();
+        assertTrue("Some messages are not acknowledged", empty);
+    }
+
+}

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A simulated-large message version of ServerPersistenceTest.
+ */
+public class CachedOutServerPersistenceTest extends ServerPersistenceTest {
+
+    private static String oldThreshold;
+    
+    @BeforeClass
+    public static void setProperties() throws Exception {
+        oldThreshold = System.getProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+        // forces the CacheOutputStream to use temporary file caching
+        System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", "16");
+    }
+    
+    @AfterClass
+    public static void cleanup() throws Exception {
+        if (oldThreshold == null) {
+            System.clearProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+        } else {
+            System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", oldThreshold);
+        }
+    }
+
+    @Test 
+    public void testRecovery() throws Exception {
+        super.testRecovery();
+    }
+
+}

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java Wed Mar 23 00:03:19 2011
@@ -101,6 +101,7 @@ public class ServerPersistenceTest exten
         LOG.fine("Created bus " + bus + " with default cfg");
         ControlService cs = new ControlService();
         Control control = cs.getControlPort();
+        ConnectionHelper.setKeepAliveConnection(control, false, true);
         updateAddressPort(control, PORT);
         
         assertTrue("Failed to start greeter", control.startGreeter(SERVER_LOSS_CFG)); 
@@ -119,7 +120,7 @@ public class ServerPersistenceTest exten
         
         LOG.fine("Created greeter client.");
  
-        ConnectionHelper.setKeepAliveConnection(greeter, true);
+        ConnectionHelper.setKeepAliveConnection(greeter, false, true);
 
         Client c = ClientProxy.getClient(greeter);
         HTTPConduit hc = (HTTPConduit)(c.getConduit());
@@ -166,12 +167,16 @@ public class ServerPersistenceTest exten
     }
     
     void verifyMissingResponse(Response<GreetMeResponse> responses[]) throws Exception {
-        awaitMessages(5, 8, 60000);
+        awaitMessages(5, 7, 30000);
 
-        // wait another while to prove that response to second request is indeed lost
-        Thread.sleep(4000);
         int nDone = 0;
         for (int i = 0; i < 3; i++) {
+            // wait another while to prove that response to second request is indeed lost
+            if (!responses[i].isDone()) {
+                Thread.sleep(4000);
+            }
+        }
+        for (int i = 0; i < 3; i++) {
             if (responses[i].isDone()) {
                 nDone++;
             }

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml Wed Mar 23 00:03:19 2011
@@ -23,8 +23,10 @@
        xmlns:wsa="http://cxf.apache.org/ws/addressing"
        xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
        xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+       xmlns:http="http://cxf.apache.org/transports/http/configuration"
        xsi:schemaLocation="
 http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
+http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/schemas/configuration/http-conf.xsd
 http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
 http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
 http://cxf.apache.org/configuration/beans http://cxf.apache.org/schemas/configuration/cxf-beans.xsd
@@ -48,5 +50,9 @@ http://www.springframework.org/schema/be
             <ref bean="messageLoss"/>
         </cxf:outInterceptors>
     </cxf:bus>
-   
+    
+    <http:conduit name="http://localhost.*">
+      <http:client Connection="close"/>
+    </http:conduit>
+    
 </beans>
\ No newline at end of file