You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2016/05/03 14:24:52 UTC

cxf git commit: [CXF-6886] CXF 3.x WSRM attachments are not retransmitted (modified patch based on kai's patches)

Repository: cxf
Updated Branches:
  refs/heads/master 95d185201 -> cd0e74b89


[CXF-6886] CXF 3.x WSRM attachments are not retransmitted (modified patch based on kai's patches)


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/cd0e74b8
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/cd0e74b8
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/cd0e74b8

Branch: refs/heads/master
Commit: cd0e74b89aaf113374ce74262308620dcad263d4
Parents: 95d1852
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Mon May 2 13:35:15 2016 +0200
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Tue May 3 14:24:12 2016 +0200

----------------------------------------------------------------------
 .../cxf/ws/rm/RMCaptureOutInterceptor.java      |   5 +-
 .../java/org/apache/cxf/ws/rm/RMManager.java    |   6 +-
 .../cxf/ws/rm/persistence/PersistenceUtils.java |  61 ++++++-
 .../apache/cxf/ws/rm/persistence/RMMessage.java |  23 +++
 .../cxf/ws/rm/persistence/jdbc/RMTxStore.java   | 159 ++++---------------
 .../org/apache/cxf/ws/rm/RMManagerTest.java     | 141 +++++++++++++++-
 .../ws/rm/persistence/PersistenceUtilsTest.java | 109 +++++++++++++
 .../ws/rm/persistence/SerializedRMMessage.txt   |  14 ++
 .../rm/persistence/jdbc/RMTxStoreTestBase.java  |  11 +-
 9 files changed, 386 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index 6890201..c0ca125 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -54,6 +54,7 @@ import org.apache.cxf.service.model.OperationInfo;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.ContextUtils;
+import org.apache.cxf.ws.rm.persistence.PersistenceUtils;
 import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -273,7 +274,9 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message>  {
                             msg.setTo(maps.getTo().getValue());
                         }
                     }
-                    msg.setContent(bis);
+                    // serializes the message content and the attachments into
+                    // the RMMessage content
+                    PersistenceUtils.encodeRMContent(msg, message, bis);
                     store.persistOutgoing(ss, msg);
                 }
                     

http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
index 7e6e0ff..9639cfe 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
@@ -66,6 +66,7 @@ import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
 import org.apache.cxf.ws.rm.manager.RM10AddressingNamespaceType;
 import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.persistence.PersistenceUtils;
 import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RMPolicyUtilities;
@@ -610,7 +611,10 @@ public class RMManager {
             }
                                     
             try {
-                message.put(RMMessageConstants.SAVED_CONTENT, RewindableInputStream.makeRewindable(m.getContent()));
+                // RMMessage is stored in a serialized way, therefore
+                // RMMessage content must be splitted into soap root message
+                // and attachments
+                PersistenceUtils.decodeRMContent(m, message);
                 RMContextUtils.setProtocolVariation(message, ss.getProtocol());
                 retransmissionQueue.addUnacknowledged(message);
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
index c4e8e7a..0981f8e 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
@@ -19,16 +19,26 @@
 
 package org.apache.cxf.ws.rm.persistence;
 
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
 import javax.xml.bind.JAXBException;
 import javax.xml.stream.XMLStreamReader;
 
+import org.apache.cxf.attachment.AttachmentDeserializer;
+import org.apache.cxf.attachment.AttachmentSerializer;
 import org.apache.cxf.common.util.PackageUtils;
+import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RewindableInputStream;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
 
 /**
@@ -51,7 +61,7 @@ public final class PersistenceUtils {
         }
         return instance;
     }
-    
+
     public SequenceAcknowledgement deserialiseAcknowledgment(InputStream is) {
         Object obj = null;
         XMLStreamReader reader = StaxUtils.createXMLStreamReader(is);
@@ -68,14 +78,14 @@ public final class PersistenceUtils {
                 StaxUtils.close(reader);
                 is.close();
             } catch (Throwable t) {
-                //ignore, just cleaning up
+                // ignore, just cleaning up
             }
         }
         return (SequenceAcknowledgement)obj;
     }
-    
+
     public InputStream serialiseAcknowledgment(SequenceAcknowledgement ack) {
-        LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream(); 
+        LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream();
         try {
             getContext().createMarshaller().marshal(ack, bos);
         } catch (JAXBException ex) {
@@ -83,7 +93,7 @@ public final class PersistenceUtils {
         }
         return bos.createInputStream();
     }
-    
+
     private JAXBContext getContext() throws JAXBException {
         if (null == context) {
             context = JAXBContext.newInstance(PackageUtils
@@ -92,4 +102,45 @@ public final class PersistenceUtils {
         }
         return context;
     }
+
+    public static void encodeRMContent(RMMessage rmmsg, Message msg, InputStream msgContent)
+        throws IOException {
+        if (msg.getAttachments() == null) {
+            rmmsg.setContentType((String)msg.get(Message.CONTENT_TYPE));
+            rmmsg.setContent(msgContent);
+        } else {
+            MessageImpl msgImpl1 = new MessageImpl();
+            // using cached output stream to handle large files
+            CachedOutputStream cos = new CachedOutputStream();
+            msgImpl1.setContent(OutputStream.class, cos);
+            msgImpl1.setAttachments(msg.getAttachments());
+            msgImpl1.put(Message.CONTENT_TYPE, (String) msg.get(Message.CONTENT_TYPE));
+            msgImpl1.setContent(InputStream.class, msgContent);
+            AttachmentSerializer serializer = new AttachmentSerializer(msgImpl1);
+            serializer.setXop(false);
+            serializer.writeProlog();
+            // write soap root message into cached output stream
+            IOUtils.copyAndCloseInput(msgContent, cos);
+            serializer.writeAttachments();
+            rmmsg.setContentType((String) msgImpl1.get(Message.CONTENT_TYPE));
+
+            //TODO will pass the cos instance to rmmessage in the future
+            rmmsg.setContent(cos.getInputStream());
+        }
+    }
+
+    public static void decodeRMContent(RMMessage rmmsg, Message msg) throws IOException {
+        String contentType = rmmsg.getContentType();
+        if ((null != contentType) && contentType.startsWith("multipart/related")) {
+            msg.put(Message.CONTENT_TYPE, contentType);
+            msg.setContent(InputStream.class, rmmsg.getContent());
+            AttachmentDeserializer ad = new AttachmentDeserializer(msg);
+            ad.initializeAttachments();
+        } else {
+            msg.setContent(InputStream.class, rmmsg.getContent());
+        }
+        InputStream is = RewindableInputStream.makeRewindable(msg.getContent(InputStream.class));
+        msg.setContent(InputStream.class, is);
+        msg.put(RMMessageConstants.SAVED_CONTENT, is);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
index 4e91208..abab221 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
@@ -25,7 +25,9 @@ import java.util.List;
 public class RMMessage {
     
     private InputStream content;
+    //TODO remove attachments when we remove the deprecated attachments related methods
     private List<InputStream> attachments = Collections.emptyList();
+    private String contentType;
     private long messageNumber;
     private String to;
     
@@ -82,7 +84,9 @@ public class RMMessage {
     /**
      * Returns the list of attachments.
      * @return list (non-null)
+     * @deprecated not used as the optional attachments are stored in the content
      */
+    @Deprecated
     public List<InputStream> getAttachments() {
         return attachments;
     }
@@ -90,9 +94,28 @@ public class RMMessage {
     /**
      * Set the list of attachments.
      * @param attaches (non-null)
+     * @deprecated not used as the optional attachments are stored in the content
      */
+    @Deprecated
     public void setAttachments(List<InputStream> attaches) {
         assert attaches != null;
         attachments = attaches;
     }
+
+    /**
+     * Returns the content type of the message content
+     * @return
+     */
+    public String getContentType() {
+        return contentType;
+    }
+
+    /**
+     * Set the content type of the RMMessage
+     * @param contentType
+     */
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
index 4c2954a..7e626a5 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
@@ -88,22 +88,13 @@ public class RMTxStore implements RMStore {
         = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
            {"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
            {"SEND_TO", "VARCHAR(256)"},
-           {"CONTENT", "BLOB"}};
+           {"CONTENT", "BLOB"},
+           {"CONTENT_TYPE", "VARCHAR(1024)"}};
     private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"};
-    private static final String[][] ATTACHMENTS_TABLE_COLS
-        = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
-           {"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
-           {"ATTACHMENT_NO", "DECIMAL(19, 0) NOT NULL"},
-           {"DATA", "BLOB"}};
-    private static final String[] ATTACHMENTS_TABLE_KEYS = {"SEQ_ID", "MSG_NO", "ATTACHMENT_NO"};
-
     private static final String DEST_SEQUENCES_TABLE_NAME = "CXF_RM_DEST_SEQUENCES"; 
     private static final String SRC_SEQUENCES_TABLE_NAME = "CXF_RM_SRC_SEQUENCES";
     private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES";
     private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";    
-    private static final String INBOUND_ATTS_TABLE_NAME = "CXF_RM_INBOUND_ATTACHMENTS";
-    private static final String OUTBOUND_ATTS_TABLE_NAME = "CXF_RM_OUTBOUND_ATTACHMENTS";    
-    
     private static final String CREATE_DEST_SEQUENCES_TABLE_STMT = 
         buildCreateTableStatement(DEST_SEQUENCES_TABLE_NAME, 
                                   DEST_SEQUENCES_TABLE_COLS, DEST_SEQUENCES_TABLE_KEYS);
@@ -113,9 +104,6 @@ public class RMTxStore implements RMStore {
                                   SRC_SEQUENCES_TABLE_COLS, SRC_SEQUENCES_TABLE_KEYS);
     private static final String CREATE_MESSAGES_TABLE_STMT =
         buildCreateTableStatement("{0}", MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS);
-    private static final String CREATE_ATTACHMENTS_TABLE_STMT =
-        buildCreateTableStatement("{0}", ATTACHMENTS_TABLE_COLS, ATTACHMENTS_TABLE_KEYS);
-
     private static final String CREATE_DEST_SEQUENCE_STMT_STR 
         = "INSERT INTO CXF_RM_DEST_SEQUENCES "
             + "(SEQ_ID, ACKS_TO, ENDPOINT_ID, PROTOCOL_VERSION) " 
@@ -133,13 +121,9 @@ public class RMTxStore implements RMStore {
     private static final String UPDATE_SRC_SEQUENCE_STMT_STR =
         "UPDATE CXF_RM_SRC_SEQUENCES SET CUR_MSG_NO = ?, LAST_MSG = ? WHERE SEQ_ID = ?";
     private static final String CREATE_MESSAGE_STMT_STR 
-        = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT) VALUES(?, ?, ?, ?)";
+        = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?)";
     private static final String DELETE_MESSAGE_STMT_STR =
         "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
-    private static final String CREATE_ATTACHMENT_STMT_STR 
-        = "INSERT INTO {0} (SEQ_ID, MSG_NO, ATTACHMENT_NO, DATA) VALUES(?, ?, ?, ?)";
-    private static final String DELETE_ATTACHMENTS_STMT_STR =
-        "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
     private static final String SELECT_DEST_SEQUENCE_STMT_STR =
         "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
         + "WHERE SEQ_ID = ?";
@@ -153,9 +137,7 @@ public class RMTxStore implements RMStore {
         "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION "
         + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
     private static final String SELECT_MESSAGES_STMT_STR =
-        "SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?";
-    private static final String SELECT_ATTACHMENTS_STMT_STR =
-        "SELECT ATTACHMENT_NO, DATA FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
+        "SELECT MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?";
     private static final String ALTER_TABLE_STMT_STR =
         "ALTER TABLE {0} ADD {1} {2}";
     private static final String CREATE_INBOUND_MESSAGE_STMT_STR = 
@@ -170,19 +152,6 @@ public class RMTxStore implements RMStore {
         MessageFormat.format(SELECT_MESSAGES_STMT_STR, INBOUND_MSGS_TABLE_NAME);
     private static final String SELECT_OUTBOUND_MESSAGES_STMT_STR =
         MessageFormat.format(SELECT_MESSAGES_STMT_STR, OUTBOUND_MSGS_TABLE_NAME);
-    private static final String CREATE_INBOUND_ATTACHMENT_STMT_STR = 
-        MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, INBOUND_ATTS_TABLE_NAME);
-    private static final String CREATE_OUTBOUND_ATTACHMENT_STMT_STR = 
-        MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
-    private static final String DELETE_INBOUND_ATTACHMENTS_STMT_STR =
-        MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME);
-    private static final String DELETE_OUTBOUND_ATTACHMENTS_STMT_STR =
-        MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
-    private static final String SELECT_INBOUND_ATTACHMENTS_STMT_STR =
-        MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME);
-    private static final String SELECT_OUTBOUND_ATTACHMENTS_STMT_STR =
-        MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
-    
     // create_schema may not work for several reasons, if so, create one manually
     private static final String CREATE_SCHEMA_STMT_STR = "CREATE SCHEMA {0}";
     // given the schema, try these standard statements to switch to the schema
@@ -613,44 +582,33 @@ public class RMTxStore implements RMStore {
     
     public Collection<RMMessage> getMessages(Identifier sid, boolean outbound) {
         Connection con = verifyConnection();
-        PreparedStatement stmt1 = null;
-        PreparedStatement stmt2 = null;
+        PreparedStatement stmt = null;
         SQLException conex = null;
         Collection<RMMessage> msgs = new ArrayList<RMMessage>();
-        ResultSet res1 = null;
-        ResultSet res2 = null;
+        ResultSet res = null;
         try {
-            stmt1 = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR);
+            stmt = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR);
 
-            stmt1.setString(1, sid.getValue());
-            res1 = stmt1.executeQuery();
-            while (res1.next()) {
-                long mn = res1.getLong(1);
-                String to = res1.getString(2);
-                Blob blob = res1.getBlob(3);
+            stmt.setString(1, sid.getValue());
+            res = stmt.executeQuery();
+            while (res.next()) {
+                long mn = res.getLong(1);
+                String to = res.getString(2);
+                Blob blob = res.getBlob(3);
+                String contentType = res.getString(4);
                 RMMessage msg = new RMMessage();
                 msg.setMessageNumber(mn);
                 msg.setTo(to);
                 msg.setContent(blob.getBinaryStream());
+                msg.setContentType(contentType);
                 msgs.add(msg);
-                stmt2 = getStatement(con, outbound
-                     ? SELECT_OUTBOUND_ATTACHMENTS_STMT_STR : SELECT_INBOUND_ATTACHMENTS_STMT_STR);
-                stmt2.setString(1, sid.getValue());
-                stmt2.setLong(2, mn);
-                res2 = stmt2.executeQuery();
-                List<InputStream> attaches = new ArrayList<InputStream>();
-                while (res2.next()) {
-                    attaches.add(res2.getBinaryStream(1));
-                }
-                msg.setAttachments(attaches);
             }
         } catch (SQLException ex) {
             conex = ex;
             LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
                 : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
         } finally {
-            releaseResources(stmt2, res2);
-            releaseResources(stmt1, res1);
+            releaseResources(stmt, res);
             updateConnectionState(con, conex);
         }
         return msgs;
@@ -709,24 +667,17 @@ public class RMTxStore implements RMStore {
     
     public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean outbound) {
         Connection con = verifyConnection();
-        PreparedStatement stmt1 = null;
-        PreparedStatement stmt2 = null;
+        PreparedStatement stmt = null;
         SQLException conex = null;
         try {
-            stmt1 = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR);
-            stmt2 = getStatement(con, outbound
-                ? DELETE_OUTBOUND_ATTACHMENTS_STMT_STR : DELETE_INBOUND_ATTACHMENTS_STMT_STR);
-
+            stmt = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR);
             beginTransaction();
 
-            stmt1.setString(1, sid.getValue());
-            stmt2.setString(1, sid.getValue());
-                        
+            stmt.setString(1, sid.getValue());
+
             for (Long messageNr : messageNrs) {
-                stmt2.setLong(2, messageNr);
-                stmt2.execute();
-                stmt1.setLong(2, messageNr);
-                stmt1.execute();
+                stmt.setLong(2, messageNr);
+                stmt.execute();
             }
             
             commit(con);
@@ -736,8 +687,7 @@ public class RMTxStore implements RMStore {
             abort(con);
             throw new RMStoreException(ex);
         } finally {
-            releaseResources(stmt2, null);
-            releaseResources(stmt1, null);
+            releaseResources(stmt, null);
             updateConnectionState(con, conex);
         }
     }
@@ -779,47 +729,28 @@ public class RMTxStore implements RMStore {
         String id = sid.getValue();
         long nr = msg.getMessageNumber();
         String to = msg.getTo();
+        String contentType = msg.getContentType();
         if (LOG.isLoggable(Level.FINE)) {
             LOG.log(Level.FINE, "Storing {0} message number {1} for sequence {2}, to = {3}",
                     new Object[] {outbound ? "outbound" : "inbound", nr, id, to});
         }
-        PreparedStatement stmt1 = null;
-        PreparedStatement stmt2 = null;
+        PreparedStatement stmt = null;
         try {
             InputStream msgin = msg.getContent();
-            stmt1 = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
+            stmt = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
 
-            stmt1.setString(1, id);  
-            stmt1.setLong(2, nr);
-            stmt1.setString(3, to); 
-            stmt1.setBinaryStream(4, msgin);
-            stmt1.execute();
-            
-            List<InputStream> attachments = msg.getAttachments();
-            if (attachments.size() > 0) {
-                stmt2 = getStatement(con, outbound
-                     ? CREATE_OUTBOUND_ATTACHMENT_STMT_STR : CREATE_INBOUND_ATTACHMENT_STMT_STR);
-                stmt2.setString(1, id);
-                stmt2.setLong(2, nr);
-                for (int i = 0; i < attachments.size(); i++) {
-                    stmt2.setLong(3, i);
-                    stmt2.setBinaryStream(4, attachments.get(i));
-                    stmt2.execute();
-                    if (LOG.isLoggable(Level.FINE)) {
-                        LOG.log(Level.FINE,
-                            "Successfully stored {0} attachment {1} for message number {2} in sequence {3}",
-                            new Object[] {outbound ? "outbound" : "inbound", i, nr, id});
-                    }
-                }
-            }
-            
+            stmt.setString(1, id);
+            stmt.setLong(2, nr);
+            stmt.setString(3, to);
+            stmt.setBinaryStream(4, msgin);
+            stmt.setString(5, contentType);
+            stmt.execute();
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}",
                         new Object[] {outbound ? "outbound" : "inbound", nr, id});
             }
         } finally  {
-            releaseResources(stmt1, null);
-            releaseResources(stmt2, null);
+            releaseResources(stmt, null);
         }
     }
     
@@ -930,24 +861,6 @@ public class RMTxStore implements RMStore {
                     stmt.close();
                 }
             }
-
-            for (String tableName : new String[] {OUTBOUND_ATTS_TABLE_NAME, INBOUND_ATTS_TABLE_NAME}) {
-                stmt = con.createStatement();
-                try {
-                    stmt.executeUpdate(MessageFormat.format(CREATE_ATTACHMENTS_TABLE_STMT, tableName));
-                } catch (SQLException ex) {
-                    if (!isTableExistsError(ex)) {
-                        throw ex;
-                    } else {
-                        if (LOG.isLoggable(Level.FINE)) {
-                            LOG.fine("Table " + tableName + " already exists.");
-                        }
-                        verifyTable(con, tableName, ATTACHMENTS_TABLE_COLS);
-                    }
-                } finally {
-                    stmt.close();
-                }
-            }
         } finally {
             con.setAutoCommit(false);
             if (connection == null && con != null) {
@@ -1137,12 +1050,6 @@ public class RMTxStore implements RMStore {
         cacheStatement(connection, DELETE_OUTBOUND_MESSAGE_STMT_STR);
         cacheStatement(connection, SELECT_INBOUND_MESSAGES_STMT_STR);
         cacheStatement(connection, SELECT_OUTBOUND_MESSAGES_STMT_STR);
-        cacheStatement(connection, CREATE_INBOUND_ATTACHMENT_STMT_STR);
-        cacheStatement(connection, CREATE_OUTBOUND_ATTACHMENT_STMT_STR);
-        cacheStatement(connection, DELETE_INBOUND_ATTACHMENTS_STMT_STR);
-        cacheStatement(connection, DELETE_OUTBOUND_ATTACHMENTS_STMT_STR);
-        cacheStatement(connection, SELECT_INBOUND_ATTACHMENTS_STMT_STR);
-        cacheStatement(connection, SELECT_OUTBOUND_ATTACHMENTS_STMT_STR);
     }
 
     public synchronized void init() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
index d89356d..ea082b2 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -69,7 +70,10 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class RMManagerTest extends Assert {
-    
+
+    private static final String MULTIPART_TYPE = "multipart/related; type=\"text/xml\";"
+        + " boundary=\"uuid:74b6a245-2e17-40eb-a86c-308664e18460\"; start=\"<root."
+        + "message@cxf.apache.org>\"; start-info=\"application/soap+xml\"";
     private MyControl control;
     private RMManager manager;
     
@@ -541,6 +545,117 @@ public class RMManagerTest extends Assert {
         assertNotNull(msg.getExchange());
         assertSame(msg, msg.getExchange().getOutMessage());
     }
+ 
+    @Test
+    public void testRecoverReliableClientEndpointWithAttachment() throws NoSuchMethodException, IOException {
+        Method method = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
+            new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {method});
+        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
+        Endpoint endpoint = control.createMock(Endpoint.class);
+        EndpointInfo ei = control.createMock(EndpointInfo.class);
+        ServiceInfo si = control.createMock(ServiceInfo.class);  
+        BindingInfo bi = control.createMock(BindingInfo.class);
+        InterfaceInfo ii = control.createMock(InterfaceInfo.class);
+        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);          
+        Conduit conduit = control.createMock(Conduit.class);        
+        SourceSequence ss = control.createMock(SourceSequence.class);
+        DestinationSequence ds = control.createMock(DestinationSequence.class);
+        RMMessage m1 = new RMMessage();
+        InputStream fis = getClass().getResourceAsStream("persistence/SerializedRMMessage.txt");
+        m1.setContent(fis);
+        m1.setTo("toAddress");
+        m1.setMessageNumber(new Long(10));
+        m1.setContentType(MULTIPART_TYPE);
+        Capture<Message> mc = Capture.newInstance();
+        
+        setUpRecoverReliableEndpointWithAttachment(endpoint, conduit, ss, ds, m1, mc);        
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();
+        
+        Message msg = mc.getValue();
+        assertNotNull(msg);
+        assertNotNull(msg.getExchange());
+        assertSame(msg, msg.getExchange().getOutMessage());
+
+        InputStream is = (InputStream) msg.get(RMMessageConstants.SAVED_CONTENT);
+        assertStartsWith(is, "<soap:Envelope");
+        assertEquals(1, msg.getAttachments().size());
+    }
+    
+    void setUpRecoverReliableEndpointWithAttachment(Endpoint endpoint,
+                                      Conduit conduit, 
+                                      SourceSequence ss, 
+                                      DestinationSequence ds, RMMessage m,
+                                      Capture<Message> mc)
+                                          throws IOException  {                
+        RMStore store = control.createMock(RMStore.class);
+        RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
+        manager.setStore(store);
+        manager.setRetransmissionQueue(queue);
+        
+        Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
+        if (null != ss) {
+            sss.add(ss);            
+        }
+        EasyMock.expect(store.getSourceSequences("{S}s.{P}p@cxf"))
+            .andReturn(sss);
+        if (null == ss) {
+            return;
+        }         
+        
+        Collection<DestinationSequence> dss = new ArrayList<DestinationSequence>();
+        if (null != ds) {
+            dss.add(ds);            
+        }
+        EasyMock.expect(store.getDestinationSequences("{S}s.{P}p@cxf"))
+            .andReturn(dss);
+        if (null == ds) {
+            return;
+        }
+
+        Collection<RMMessage> ms = new ArrayList<RMMessage>();
+        if (null != m) {
+            ms.add(m);
+        }
+        Identifier id = new Identifier();
+        id.setValue("S1");
+        EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2);
+        EasyMock.expect(ss.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408).anyTimes();
+        EasyMock.expect(store.getMessages(id, true)).andReturn(ms);
+        
+        
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EasyMock.expect(manager.createReliableEndpoint(endpoint))
+            .andReturn(rme);
+        Source source = control.createMock(Source.class);
+        EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+                
+        Destination destination = control.createMock(Destination.class);
+        EasyMock.expect(rme.getDestination()).andReturn(destination);
+        destination.addSequence(ds, false);
+        EasyMock.expectLastCall();
+        
+        Service service = control.createMock(Service.class);
+        EasyMock.expect(endpoint.getService()).andReturn(service).anyTimes();
+        Binding binding = control.createMock(Binding.class);
+        EasyMock.expect(endpoint.getBinding()).andReturn(binding).anyTimes();
+
+        EasyMock.expect(ss.isLastMessage()).andReturn(true).anyTimes();
+        EasyMock.expect(ss.getCurrentMessageNr()).andReturn(new Long(10)).anyTimes();
+        if (null == m) {
+            return;
+        }
+
+        queue.addUnacknowledged(EasyMock.capture(mc));
+        
+        EasyMock.expectLastCall();
+        queue.start();
+        EasyMock.expectLastCall();
+    }
+
+    
     
     Endpoint setUpEndpointForRecovery(Endpoint endpoint, 
                                       EndpointInfo ei, 
@@ -552,6 +667,7 @@ public class RMManagerTest extends Assert {
         EasyMock.expect(si.getName()).andReturn(new QName("S", "s")).anyTimes();
         EasyMock.expect(ei.getName()).andReturn(new QName("P", "p")).anyTimes();
         EasyMock.expect(si.getInterface()).andReturn(ii).anyTimes();
+        EasyMock.expect(ei.getBinding()).andReturn(bi).anyTimes();
         return endpoint;
     }
     
@@ -681,6 +797,25 @@ public class RMManagerTest extends Assert {
             return mock;
         }
         
-         
+
+    }
+    // just read the begining of the input and compare it against the specified string
+    private static boolean assertStartsWith(InputStream in, String starting) {
+        assertNotNull(in);
+        byte[] buf = new byte[starting.length()];
+        try {
+            in.read(buf, 0, buf.length);
+            assertEquals(starting, new String(buf, "utf-8"));
+            return true;
+        } catch (IOException e) {
+            // ignore
+        } finally {
+            try {
+                in.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+        return false;
     }
-} 
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
index 47e197b..c0667fb 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
@@ -19,8 +19,19 @@
 
 package org.apache.cxf.ws.rm.persistence;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
 
+import javax.activation.DataHandler;
+import javax.mail.util.ByteArrayDataSource;
+
+import org.apache.cxf.attachment.AttachmentImpl;
+import org.apache.cxf.message.Attachment;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange;
 
@@ -31,6 +42,13 @@ import org.junit.Test;
  * 
  */
 public class PersistenceUtilsTest extends Assert {
+    
+    private static final String MULTIPART_TYPE = "multipart/related; type=\"text/xml\";"
+        + " boundary=\"uuid:74b6a245-2e17-40eb-a86c-308664e18460\"; start=\"<root."
+        + "message@cxf.apache.org>\"; start-info=\"application/soap+xml\"";
+
+    private static final String SOAP_PART = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+            + "<data/></soap:Envelope>";
 
     @Test
     public void testSerialiseDeserialiseAcknowledgement() {
@@ -47,4 +65,95 @@ public class PersistenceUtilsTest extends Assert {
         assertEquals(range.getLower(), refRange.getLower());
         assertEquals(range.getUpper(), refRange.getUpper());
     }
+    
+    @Test
+    public void testEncodeRMContent() throws Exception {
+        ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes());
+        
+        RMMessage rmmsg = new RMMessage();
+        Message messageImpl = new MessageImpl();
+        messageImpl.put(Message.CONTENT_TYPE, "text/xml");
+        // update rmmessage
+        PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
+
+        assertStartsWith(rmmsg.getContent(), "<soap:");
+        assertNotNull(rmmsg.getContentType());
+        assertTrue(rmmsg.getContentType().startsWith("text/xml"));
+    }
+
+    @Test
+    public void testEncodeRMContentWithAttachments() throws Exception {
+        ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes());
+
+        RMMessage rmmsg = new RMMessage();
+        Message messageImpl = new MessageImpl();
+        messageImpl.put(Message.CONTENT_TYPE, "text/xml");
+        // add attachments
+        addAttachment(messageImpl);
+        // update rmmessage
+        PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
+
+        assertStartsWith(rmmsg.getContent(), "--uuid:");
+        assertNotNull(rmmsg.getContentType());
+        assertTrue(rmmsg.getContentType().startsWith("multipart/related"));
+    }
+
+    @Test
+    public void testEncodeDecodeRMContent() throws Exception {
+        ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes());
+        RMMessage rmmsg = new RMMessage();
+        Message messageImpl = new MessageImpl();
+        messageImpl.put(Message.CONTENT_TYPE, "text/xml");
+        // add attachments
+        addAttachment(messageImpl);
+        // serialize
+        PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
+
+        Message messageImplRestored = new MessageImpl();
+        PersistenceUtils.decodeRMContent(rmmsg, messageImplRestored);
+        assertEquals(1, messageImplRestored.getAttachments().size());
+
+        assertStartsWith(messageImplRestored.getContent(InputStream.class), SOAP_PART);
+    }
+    
+    @Test
+    public void testDecodeRMContentWithAttachment() throws Exception {
+        InputStream is = getClass().getResourceAsStream("SerializedRMMessage.txt");
+        RMMessage msg = new RMMessage();
+        msg.setContent(is);      
+        msg.setContentType(MULTIPART_TYPE);
+        Message messageImpl = new MessageImpl();
+        PersistenceUtils.decodeRMContent(msg, messageImpl);
+
+        assertEquals(1, messageImpl.getAttachments().size());
+        assertStartsWith(messageImpl.getContent(InputStream.class), "<soap:Envelope");
+    }
+
+    private static void addAttachment(Message msg) throws IOException {
+        Collection<Attachment> attachments = new ArrayList<Attachment>();
+        DataHandler dh = new DataHandler(new ByteArrayDataSource("hello world!", "text/plain"));
+        Attachment a = new AttachmentImpl("test.xml", dh);
+        attachments.add(a);
+        msg.setAttachments(attachments);
+    }
+
+    // just read the begining of the input and compare it against the specified string
+    private static boolean assertStartsWith(InputStream in, String starting) {
+        assertNotNull(in);
+        byte[] buf = new byte[starting.length()];
+        try {
+            in.read(buf, 0, buf.length);
+            assertEquals(starting, new String(buf, "utf-8"));
+            return true;
+        } catch (IOException e) {
+            // ignore
+        } finally {
+            try {
+                in.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt
new file mode 100644
index 0000000..66e33a4
--- /dev/null
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt
@@ -0,0 +1,14 @@
+
+--uuid:74b6a245-2e17-40eb-a86c-308664e18460
+Content-Type: text/xml; charset=UTF-8
+Content-Transfer-Encoding: binary
+Content-ID: <ro...@cxf.apache.org>
+
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"><soap:Header><Action xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">http://cxf.apache.org/hello_world_soap_http/Greeter/greetMeOneWayRequest</Action><MessageID xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">urn:uuid:9a29d476-d1c6-4cc2-b8cf-76de0cf1d4c7</MessageID><To xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">http://localhost:8999/SoapContext/GreeterPort</To></soap:Header><soap:Body><greetMeOneWay xmlns="http://cxf.apache.org/hello_world_soap_http/types"><requestType>Chris</requestType></greetMeOneWay></soap:Body></soap:Envelope>
+--uuid:74b6a245-2e17-40eb-a86c-308664e18460
+Content-Type: text/plain
+Content-Transfer-Encoding: binary
+Content-ID: <test.xml>
+
+Hello World!
+--uuid:74b6a245-2e17-40eb-a86c-308664e18460--
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
index 306e3b0..fb62f34 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
@@ -224,9 +224,7 @@ public abstract class RMTxStoreTestBase extends Assert {
         byte[] bytes = new byte[89];
         EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
         EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
-        EasyMock.expect(msg1.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
-        EasyMock.expect(msg2.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
-        
+        EasyMock.expect(msg1.getContentType()).andReturn("text/xml").times(1);
         control.replay();
 
         Connection con = getConnection();
@@ -264,9 +262,7 @@ public abstract class RMTxStoreTestBase extends Assert {
         EasyMock.expect(msg2.getMessageNumber()).andReturn(TEN).anyTimes(); 
         EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); 
         EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); 
-        EasyMock.expect(msg1.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
-        EasyMock.expect(msg2.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
-        
+
         control.replay();
         con = getConnection();
         try {
@@ -863,7 +859,8 @@ public abstract class RMTxStoreTestBase extends Assert {
         RMMessage msg = control.createMock(RMMessage.class);
         EasyMock.expect(msg.getMessageNumber()).andReturn(mn).anyTimes();
         EasyMock.expect(msg.getTo()).andReturn(to).anyTimes();
-        EasyMock.expect(msg.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
+
+        EasyMock.expect(msg.getContentType()).andReturn("text/xml").anyTimes();
         byte[] value = ("Message " + mn.longValue()).getBytes();
         EasyMock.expect(msg.getContent()).andReturn(new ByteArrayInputStream(value)).anyTimes();
         return msg;