You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2016/05/03 14:24:52 UTC
cxf git commit: [CXF-6886] CXF 3.x WSRM attachments are not
retransmitted (modified patch based on kai's patches)
Repository: cxf
Updated Branches:
refs/heads/master 95d185201 -> cd0e74b89
[CXF-6886] CXF 3.x WSRM attachments are not retransmitted (modified patch based on kai's patches)
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/cd0e74b8
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/cd0e74b8
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/cd0e74b8
Branch: refs/heads/master
Commit: cd0e74b89aaf113374ce74262308620dcad263d4
Parents: 95d1852
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Mon May 2 13:35:15 2016 +0200
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Tue May 3 14:24:12 2016 +0200
----------------------------------------------------------------------
.../cxf/ws/rm/RMCaptureOutInterceptor.java | 5 +-
.../java/org/apache/cxf/ws/rm/RMManager.java | 6 +-
.../cxf/ws/rm/persistence/PersistenceUtils.java | 61 ++++++-
.../apache/cxf/ws/rm/persistence/RMMessage.java | 23 +++
.../cxf/ws/rm/persistence/jdbc/RMTxStore.java | 159 ++++---------------
.../org/apache/cxf/ws/rm/RMManagerTest.java | 141 +++++++++++++++-
.../ws/rm/persistence/PersistenceUtilsTest.java | 109 +++++++++++++
.../ws/rm/persistence/SerializedRMMessage.txt | 14 ++
.../rm/persistence/jdbc/RMTxStoreTestBase.java | 11 +-
9 files changed, 386 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index 6890201..c0ca125 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -54,6 +54,7 @@ import org.apache.cxf.service.model.OperationInfo;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.ContextUtils;
+import org.apache.cxf.ws.rm.persistence.PersistenceUtils;
import org.apache.cxf.ws.rm.persistence.RMMessage;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -273,7 +274,9 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> {
msg.setTo(maps.getTo().getValue());
}
}
- msg.setContent(bis);
+ // serializes the message content and the attachments into
+ // the RMMessage content
+ PersistenceUtils.encodeRMContent(msg, message, bis);
store.persistOutgoing(ss, msg);
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
index 7e6e0ff..9639cfe 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
@@ -66,6 +66,7 @@ import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
import org.apache.cxf.ws.rm.manager.RM10AddressingNamespaceType;
import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.persistence.PersistenceUtils;
import org.apache.cxf.ws.rm.persistence.RMMessage;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.policy.RMPolicyUtilities;
@@ -610,7 +611,10 @@ public class RMManager {
}
try {
- message.put(RMMessageConstants.SAVED_CONTENT, RewindableInputStream.makeRewindable(m.getContent()));
+ // RMMessage is stored in a serialized way, therefore
+ // RMMessage content must be splitted into soap root message
+ // and attachments
+ PersistenceUtils.decodeRMContent(m, message);
RMContextUtils.setProtocolVariation(message, ss.getProtocol());
retransmissionQueue.addUnacknowledged(message);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
index c4e8e7a..0981f8e 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
@@ -19,16 +19,26 @@
package org.apache.cxf.ws.rm.persistence;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.stream.XMLStreamReader;
+import org.apache.cxf.attachment.AttachmentDeserializer;
+import org.apache.cxf.attachment.AttachmentSerializer;
import org.apache.cxf.common.util.PackageUtils;
+import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RewindableInputStream;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
/**
@@ -51,7 +61,7 @@ public final class PersistenceUtils {
}
return instance;
}
-
+
public SequenceAcknowledgement deserialiseAcknowledgment(InputStream is) {
Object obj = null;
XMLStreamReader reader = StaxUtils.createXMLStreamReader(is);
@@ -68,14 +78,14 @@ public final class PersistenceUtils {
StaxUtils.close(reader);
is.close();
} catch (Throwable t) {
- //ignore, just cleaning up
+ // ignore, just cleaning up
}
}
return (SequenceAcknowledgement)obj;
}
-
+
public InputStream serialiseAcknowledgment(SequenceAcknowledgement ack) {
- LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream();
+ LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream();
try {
getContext().createMarshaller().marshal(ack, bos);
} catch (JAXBException ex) {
@@ -83,7 +93,7 @@ public final class PersistenceUtils {
}
return bos.createInputStream();
}
-
+
private JAXBContext getContext() throws JAXBException {
if (null == context) {
context = JAXBContext.newInstance(PackageUtils
@@ -92,4 +102,45 @@ public final class PersistenceUtils {
}
return context;
}
+
+ public static void encodeRMContent(RMMessage rmmsg, Message msg, InputStream msgContent)
+ throws IOException {
+ if (msg.getAttachments() == null) {
+ rmmsg.setContentType((String)msg.get(Message.CONTENT_TYPE));
+ rmmsg.setContent(msgContent);
+ } else {
+ MessageImpl msgImpl1 = new MessageImpl();
+ // using cached output stream to handle large files
+ CachedOutputStream cos = new CachedOutputStream();
+ msgImpl1.setContent(OutputStream.class, cos);
+ msgImpl1.setAttachments(msg.getAttachments());
+ msgImpl1.put(Message.CONTENT_TYPE, (String) msg.get(Message.CONTENT_TYPE));
+ msgImpl1.setContent(InputStream.class, msgContent);
+ AttachmentSerializer serializer = new AttachmentSerializer(msgImpl1);
+ serializer.setXop(false);
+ serializer.writeProlog();
+ // write soap root message into cached output stream
+ IOUtils.copyAndCloseInput(msgContent, cos);
+ serializer.writeAttachments();
+ rmmsg.setContentType((String) msgImpl1.get(Message.CONTENT_TYPE));
+
+ //TODO will pass the cos instance to rmmessage in the future
+ rmmsg.setContent(cos.getInputStream());
+ }
+ }
+
+ public static void decodeRMContent(RMMessage rmmsg, Message msg) throws IOException {
+ String contentType = rmmsg.getContentType();
+ if ((null != contentType) && contentType.startsWith("multipart/related")) {
+ msg.put(Message.CONTENT_TYPE, contentType);
+ msg.setContent(InputStream.class, rmmsg.getContent());
+ AttachmentDeserializer ad = new AttachmentDeserializer(msg);
+ ad.initializeAttachments();
+ } else {
+ msg.setContent(InputStream.class, rmmsg.getContent());
+ }
+ InputStream is = RewindableInputStream.makeRewindable(msg.getContent(InputStream.class));
+ msg.setContent(InputStream.class, is);
+ msg.put(RMMessageConstants.SAVED_CONTENT, is);
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
index 4e91208..abab221 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
@@ -25,7 +25,9 @@ import java.util.List;
public class RMMessage {
private InputStream content;
+ //TODO remove attachments when we remove the deprecated attachments related methods
private List<InputStream> attachments = Collections.emptyList();
+ private String contentType;
private long messageNumber;
private String to;
@@ -82,7 +84,9 @@ public class RMMessage {
/**
* Returns the list of attachments.
* @return list (non-null)
+ * @deprecated not used as the optional attachments are stored in the content
*/
+ @Deprecated
public List<InputStream> getAttachments() {
return attachments;
}
@@ -90,9 +94,28 @@ public class RMMessage {
/**
* Set the list of attachments.
* @param attaches (non-null)
+ * @deprecated not used as the optional attachments are stored in the content
*/
+ @Deprecated
public void setAttachments(List<InputStream> attaches) {
assert attaches != null;
attachments = attaches;
}
+
+ /**
+ * Returns the content type of the message content
+ * @return
+ */
+ public String getContentType() {
+ return contentType;
+ }
+
+ /**
+ * Set the content type of the RMMessage
+ * @param contentType
+ */
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
index 4c2954a..7e626a5 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
@@ -88,22 +88,13 @@ public class RMTxStore implements RMStore {
= {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
{"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
{"SEND_TO", "VARCHAR(256)"},
- {"CONTENT", "BLOB"}};
+ {"CONTENT", "BLOB"},
+ {"CONTENT_TYPE", "VARCHAR(1024)"}};
private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"};
- private static final String[][] ATTACHMENTS_TABLE_COLS
- = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
- {"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
- {"ATTACHMENT_NO", "DECIMAL(19, 0) NOT NULL"},
- {"DATA", "BLOB"}};
- private static final String[] ATTACHMENTS_TABLE_KEYS = {"SEQ_ID", "MSG_NO", "ATTACHMENT_NO"};
-
private static final String DEST_SEQUENCES_TABLE_NAME = "CXF_RM_DEST_SEQUENCES";
private static final String SRC_SEQUENCES_TABLE_NAME = "CXF_RM_SRC_SEQUENCES";
private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES";
private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";
- private static final String INBOUND_ATTS_TABLE_NAME = "CXF_RM_INBOUND_ATTACHMENTS";
- private static final String OUTBOUND_ATTS_TABLE_NAME = "CXF_RM_OUTBOUND_ATTACHMENTS";
-
private static final String CREATE_DEST_SEQUENCES_TABLE_STMT =
buildCreateTableStatement(DEST_SEQUENCES_TABLE_NAME,
DEST_SEQUENCES_TABLE_COLS, DEST_SEQUENCES_TABLE_KEYS);
@@ -113,9 +104,6 @@ public class RMTxStore implements RMStore {
SRC_SEQUENCES_TABLE_COLS, SRC_SEQUENCES_TABLE_KEYS);
private static final String CREATE_MESSAGES_TABLE_STMT =
buildCreateTableStatement("{0}", MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS);
- private static final String CREATE_ATTACHMENTS_TABLE_STMT =
- buildCreateTableStatement("{0}", ATTACHMENTS_TABLE_COLS, ATTACHMENTS_TABLE_KEYS);
-
private static final String CREATE_DEST_SEQUENCE_STMT_STR
= "INSERT INTO CXF_RM_DEST_SEQUENCES "
+ "(SEQ_ID, ACKS_TO, ENDPOINT_ID, PROTOCOL_VERSION) "
@@ -133,13 +121,9 @@ public class RMTxStore implements RMStore {
private static final String UPDATE_SRC_SEQUENCE_STMT_STR =
"UPDATE CXF_RM_SRC_SEQUENCES SET CUR_MSG_NO = ?, LAST_MSG = ? WHERE SEQ_ID = ?";
private static final String CREATE_MESSAGE_STMT_STR
- = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT) VALUES(?, ?, ?, ?)";
+ = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?)";
private static final String DELETE_MESSAGE_STMT_STR =
"DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
- private static final String CREATE_ATTACHMENT_STMT_STR
- = "INSERT INTO {0} (SEQ_ID, MSG_NO, ATTACHMENT_NO, DATA) VALUES(?, ?, ?, ?)";
- private static final String DELETE_ATTACHMENTS_STMT_STR =
- "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
private static final String SELECT_DEST_SEQUENCE_STMT_STR =
"SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "WHERE SEQ_ID = ?";
@@ -153,9 +137,7 @@ public class RMTxStore implements RMStore {
"SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION "
+ "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
private static final String SELECT_MESSAGES_STMT_STR =
- "SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?";
- private static final String SELECT_ATTACHMENTS_STMT_STR =
- "SELECT ATTACHMENT_NO, DATA FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
+ "SELECT MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?";
private static final String ALTER_TABLE_STMT_STR =
"ALTER TABLE {0} ADD {1} {2}";
private static final String CREATE_INBOUND_MESSAGE_STMT_STR =
@@ -170,19 +152,6 @@ public class RMTxStore implements RMStore {
MessageFormat.format(SELECT_MESSAGES_STMT_STR, INBOUND_MSGS_TABLE_NAME);
private static final String SELECT_OUTBOUND_MESSAGES_STMT_STR =
MessageFormat.format(SELECT_MESSAGES_STMT_STR, OUTBOUND_MSGS_TABLE_NAME);
- private static final String CREATE_INBOUND_ATTACHMENT_STMT_STR =
- MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, INBOUND_ATTS_TABLE_NAME);
- private static final String CREATE_OUTBOUND_ATTACHMENT_STMT_STR =
- MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
- private static final String DELETE_INBOUND_ATTACHMENTS_STMT_STR =
- MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME);
- private static final String DELETE_OUTBOUND_ATTACHMENTS_STMT_STR =
- MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
- private static final String SELECT_INBOUND_ATTACHMENTS_STMT_STR =
- MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME);
- private static final String SELECT_OUTBOUND_ATTACHMENTS_STMT_STR =
- MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
-
// create_schema may not work for several reasons, if so, create one manually
private static final String CREATE_SCHEMA_STMT_STR = "CREATE SCHEMA {0}";
// given the schema, try these standard statements to switch to the schema
@@ -613,44 +582,33 @@ public class RMTxStore implements RMStore {
public Collection<RMMessage> getMessages(Identifier sid, boolean outbound) {
Connection con = verifyConnection();
- PreparedStatement stmt1 = null;
- PreparedStatement stmt2 = null;
+ PreparedStatement stmt = null;
SQLException conex = null;
Collection<RMMessage> msgs = new ArrayList<RMMessage>();
- ResultSet res1 = null;
- ResultSet res2 = null;
+ ResultSet res = null;
try {
- stmt1 = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR);
+ stmt = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR);
- stmt1.setString(1, sid.getValue());
- res1 = stmt1.executeQuery();
- while (res1.next()) {
- long mn = res1.getLong(1);
- String to = res1.getString(2);
- Blob blob = res1.getBlob(3);
+ stmt.setString(1, sid.getValue());
+ res = stmt.executeQuery();
+ while (res.next()) {
+ long mn = res.getLong(1);
+ String to = res.getString(2);
+ Blob blob = res.getBlob(3);
+ String contentType = res.getString(4);
RMMessage msg = new RMMessage();
msg.setMessageNumber(mn);
msg.setTo(to);
msg.setContent(blob.getBinaryStream());
+ msg.setContentType(contentType);
msgs.add(msg);
- stmt2 = getStatement(con, outbound
- ? SELECT_OUTBOUND_ATTACHMENTS_STMT_STR : SELECT_INBOUND_ATTACHMENTS_STMT_STR);
- stmt2.setString(1, sid.getValue());
- stmt2.setLong(2, mn);
- res2 = stmt2.executeQuery();
- List<InputStream> attaches = new ArrayList<InputStream>();
- while (res2.next()) {
- attaches.add(res2.getBinaryStream(1));
- }
- msg.setAttachments(attaches);
}
} catch (SQLException ex) {
conex = ex;
LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
: "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
} finally {
- releaseResources(stmt2, res2);
- releaseResources(stmt1, res1);
+ releaseResources(stmt, res);
updateConnectionState(con, conex);
}
return msgs;
@@ -709,24 +667,17 @@ public class RMTxStore implements RMStore {
public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean outbound) {
Connection con = verifyConnection();
- PreparedStatement stmt1 = null;
- PreparedStatement stmt2 = null;
+ PreparedStatement stmt = null;
SQLException conex = null;
try {
- stmt1 = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR);
- stmt2 = getStatement(con, outbound
- ? DELETE_OUTBOUND_ATTACHMENTS_STMT_STR : DELETE_INBOUND_ATTACHMENTS_STMT_STR);
-
+ stmt = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR);
beginTransaction();
- stmt1.setString(1, sid.getValue());
- stmt2.setString(1, sid.getValue());
-
+ stmt.setString(1, sid.getValue());
+
for (Long messageNr : messageNrs) {
- stmt2.setLong(2, messageNr);
- stmt2.execute();
- stmt1.setLong(2, messageNr);
- stmt1.execute();
+ stmt.setLong(2, messageNr);
+ stmt.execute();
}
commit(con);
@@ -736,8 +687,7 @@ public class RMTxStore implements RMStore {
abort(con);
throw new RMStoreException(ex);
} finally {
- releaseResources(stmt2, null);
- releaseResources(stmt1, null);
+ releaseResources(stmt, null);
updateConnectionState(con, conex);
}
}
@@ -779,47 +729,28 @@ public class RMTxStore implements RMStore {
String id = sid.getValue();
long nr = msg.getMessageNumber();
String to = msg.getTo();
+ String contentType = msg.getContentType();
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Storing {0} message number {1} for sequence {2}, to = {3}",
new Object[] {outbound ? "outbound" : "inbound", nr, id, to});
}
- PreparedStatement stmt1 = null;
- PreparedStatement stmt2 = null;
+ PreparedStatement stmt = null;
try {
InputStream msgin = msg.getContent();
- stmt1 = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
+ stmt = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
- stmt1.setString(1, id);
- stmt1.setLong(2, nr);
- stmt1.setString(3, to);
- stmt1.setBinaryStream(4, msgin);
- stmt1.execute();
-
- List<InputStream> attachments = msg.getAttachments();
- if (attachments.size() > 0) {
- stmt2 = getStatement(con, outbound
- ? CREATE_OUTBOUND_ATTACHMENT_STMT_STR : CREATE_INBOUND_ATTACHMENT_STMT_STR);
- stmt2.setString(1, id);
- stmt2.setLong(2, nr);
- for (int i = 0; i < attachments.size(); i++) {
- stmt2.setLong(3, i);
- stmt2.setBinaryStream(4, attachments.get(i));
- stmt2.execute();
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE,
- "Successfully stored {0} attachment {1} for message number {2} in sequence {3}",
- new Object[] {outbound ? "outbound" : "inbound", i, nr, id});
- }
- }
- }
-
+ stmt.setString(1, id);
+ stmt.setLong(2, nr);
+ stmt.setString(3, to);
+ stmt.setBinaryStream(4, msgin);
+ stmt.setString(5, contentType);
+ stmt.execute();
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}",
new Object[] {outbound ? "outbound" : "inbound", nr, id});
}
} finally {
- releaseResources(stmt1, null);
- releaseResources(stmt2, null);
+ releaseResources(stmt, null);
}
}
@@ -930,24 +861,6 @@ public class RMTxStore implements RMStore {
stmt.close();
}
}
-
- for (String tableName : new String[] {OUTBOUND_ATTS_TABLE_NAME, INBOUND_ATTS_TABLE_NAME}) {
- stmt = con.createStatement();
- try {
- stmt.executeUpdate(MessageFormat.format(CREATE_ATTACHMENTS_TABLE_STMT, tableName));
- } catch (SQLException ex) {
- if (!isTableExistsError(ex)) {
- throw ex;
- } else {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Table " + tableName + " already exists.");
- }
- verifyTable(con, tableName, ATTACHMENTS_TABLE_COLS);
- }
- } finally {
- stmt.close();
- }
- }
} finally {
con.setAutoCommit(false);
if (connection == null && con != null) {
@@ -1137,12 +1050,6 @@ public class RMTxStore implements RMStore {
cacheStatement(connection, DELETE_OUTBOUND_MESSAGE_STMT_STR);
cacheStatement(connection, SELECT_INBOUND_MESSAGES_STMT_STR);
cacheStatement(connection, SELECT_OUTBOUND_MESSAGES_STMT_STR);
- cacheStatement(connection, CREATE_INBOUND_ATTACHMENT_STMT_STR);
- cacheStatement(connection, CREATE_OUTBOUND_ATTACHMENT_STMT_STR);
- cacheStatement(connection, DELETE_INBOUND_ATTACHMENTS_STMT_STR);
- cacheStatement(connection, DELETE_OUTBOUND_ATTACHMENTS_STMT_STR);
- cacheStatement(connection, SELECT_INBOUND_ATTACHMENTS_STMT_STR);
- cacheStatement(connection, SELECT_OUTBOUND_ATTACHMENTS_STMT_STR);
}
public synchronized void init() {
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
index d89356d..ea082b2 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
@@ -20,6 +20,7 @@
package org.apache.cxf.ws.rm;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -69,7 +70,10 @@ import org.junit.Before;
import org.junit.Test;
public class RMManagerTest extends Assert {
-
+
+ private static final String MULTIPART_TYPE = "multipart/related; type=\"text/xml\";"
+ + " boundary=\"uuid:74b6a245-2e17-40eb-a86c-308664e18460\"; start=\"<root."
+ + "message@cxf.apache.org>\"; start-info=\"application/soap+xml\"";
private MyControl control;
private RMManager manager;
@@ -541,6 +545,117 @@ public class RMManagerTest extends Assert {
assertNotNull(msg.getExchange());
assertSame(msg, msg.getExchange().getOutMessage());
}
+
+ @Test
+ public void testRecoverReliableClientEndpointWithAttachment() throws NoSuchMethodException, IOException {
+ Method method = RMManager.class.getDeclaredMethod("createReliableEndpoint",
+ new Class[] {Endpoint.class});
+ manager = control.createMock(RMManager.class, new Method[] {method});
+ manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
+ Endpoint endpoint = control.createMock(Endpoint.class);
+ EndpointInfo ei = control.createMock(EndpointInfo.class);
+ ServiceInfo si = control.createMock(ServiceInfo.class);
+ BindingInfo bi = control.createMock(BindingInfo.class);
+ InterfaceInfo ii = control.createMock(InterfaceInfo.class);
+ setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
+ Conduit conduit = control.createMock(Conduit.class);
+ SourceSequence ss = control.createMock(SourceSequence.class);
+ DestinationSequence ds = control.createMock(DestinationSequence.class);
+ RMMessage m1 = new RMMessage();
+ InputStream fis = getClass().getResourceAsStream("persistence/SerializedRMMessage.txt");
+ m1.setContent(fis);
+ m1.setTo("toAddress");
+ m1.setMessageNumber(new Long(10));
+ m1.setContentType(MULTIPART_TYPE);
+ Capture<Message> mc = Capture.newInstance();
+
+ setUpRecoverReliableEndpointWithAttachment(endpoint, conduit, ss, ds, m1, mc);
+ control.replay();
+ manager.recoverReliableEndpoint(endpoint, conduit);
+ control.verify();
+
+ Message msg = mc.getValue();
+ assertNotNull(msg);
+ assertNotNull(msg.getExchange());
+ assertSame(msg, msg.getExchange().getOutMessage());
+
+ InputStream is = (InputStream) msg.get(RMMessageConstants.SAVED_CONTENT);
+ assertStartsWith(is, "<soap:Envelope");
+ assertEquals(1, msg.getAttachments().size());
+ }
+
+ void setUpRecoverReliableEndpointWithAttachment(Endpoint endpoint,
+ Conduit conduit,
+ SourceSequence ss,
+ DestinationSequence ds, RMMessage m,
+ Capture<Message> mc)
+ throws IOException {
+ RMStore store = control.createMock(RMStore.class);
+ RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
+ manager.setStore(store);
+ manager.setRetransmissionQueue(queue);
+
+ Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
+ if (null != ss) {
+ sss.add(ss);
+ }
+ EasyMock.expect(store.getSourceSequences("{S}s.{P}p@cxf"))
+ .andReturn(sss);
+ if (null == ss) {
+ return;
+ }
+
+ Collection<DestinationSequence> dss = new ArrayList<DestinationSequence>();
+ if (null != ds) {
+ dss.add(ds);
+ }
+ EasyMock.expect(store.getDestinationSequences("{S}s.{P}p@cxf"))
+ .andReturn(dss);
+ if (null == ds) {
+ return;
+ }
+
+ Collection<RMMessage> ms = new ArrayList<RMMessage>();
+ if (null != m) {
+ ms.add(m);
+ }
+ Identifier id = new Identifier();
+ id.setValue("S1");
+ EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2);
+ EasyMock.expect(ss.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408).anyTimes();
+ EasyMock.expect(store.getMessages(id, true)).andReturn(ms);
+
+
+ RMEndpoint rme = control.createMock(RMEndpoint.class);
+ EasyMock.expect(manager.createReliableEndpoint(endpoint))
+ .andReturn(rme);
+ Source source = control.createMock(Source.class);
+ EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+
+ Destination destination = control.createMock(Destination.class);
+ EasyMock.expect(rme.getDestination()).andReturn(destination);
+ destination.addSequence(ds, false);
+ EasyMock.expectLastCall();
+
+ Service service = control.createMock(Service.class);
+ EasyMock.expect(endpoint.getService()).andReturn(service).anyTimes();
+ Binding binding = control.createMock(Binding.class);
+ EasyMock.expect(endpoint.getBinding()).andReturn(binding).anyTimes();
+
+ EasyMock.expect(ss.isLastMessage()).andReturn(true).anyTimes();
+ EasyMock.expect(ss.getCurrentMessageNr()).andReturn(new Long(10)).anyTimes();
+ if (null == m) {
+ return;
+ }
+
+ queue.addUnacknowledged(EasyMock.capture(mc));
+
+ EasyMock.expectLastCall();
+ queue.start();
+ EasyMock.expectLastCall();
+ }
+
+
Endpoint setUpEndpointForRecovery(Endpoint endpoint,
EndpointInfo ei,
@@ -552,6 +667,7 @@ public class RMManagerTest extends Assert {
EasyMock.expect(si.getName()).andReturn(new QName("S", "s")).anyTimes();
EasyMock.expect(ei.getName()).andReturn(new QName("P", "p")).anyTimes();
EasyMock.expect(si.getInterface()).andReturn(ii).anyTimes();
+ EasyMock.expect(ei.getBinding()).andReturn(bi).anyTimes();
return endpoint;
}
@@ -681,6 +797,25 @@ public class RMManagerTest extends Assert {
return mock;
}
-
+
+ }
+ // just read the begining of the input and compare it against the specified string
+ private static boolean assertStartsWith(InputStream in, String starting) {
+ assertNotNull(in);
+ byte[] buf = new byte[starting.length()];
+ try {
+ in.read(buf, 0, buf.length);
+ assertEquals(starting, new String(buf, "utf-8"));
+ return true;
+ } catch (IOException e) {
+ // ignore
+ } finally {
+ try {
+ in.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ return false;
}
-}
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
index 47e197b..c0667fb 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
@@ -19,8 +19,19 @@
package org.apache.cxf.ws.rm.persistence;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import javax.activation.DataHandler;
+import javax.mail.util.ByteArrayDataSource;
+
+import org.apache.cxf.attachment.AttachmentImpl;
+import org.apache.cxf.message.Attachment;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange;
@@ -31,6 +42,13 @@ import org.junit.Test;
*
*/
public class PersistenceUtilsTest extends Assert {
+
+ private static final String MULTIPART_TYPE = "multipart/related; type=\"text/xml\";"
+ + " boundary=\"uuid:74b6a245-2e17-40eb-a86c-308664e18460\"; start=\"<root."
+ + "message@cxf.apache.org>\"; start-info=\"application/soap+xml\"";
+
+ private static final String SOAP_PART = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ + "<data/></soap:Envelope>";
@Test
public void testSerialiseDeserialiseAcknowledgement() {
@@ -47,4 +65,95 @@ public class PersistenceUtilsTest extends Assert {
assertEquals(range.getLower(), refRange.getLower());
assertEquals(range.getUpper(), refRange.getUpper());
}
+
+ @Test
+ public void testEncodeRMContent() throws Exception {
+ ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes());
+
+ RMMessage rmmsg = new RMMessage();
+ Message messageImpl = new MessageImpl();
+ messageImpl.put(Message.CONTENT_TYPE, "text/xml");
+ // update rmmessage
+ PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
+
+ assertStartsWith(rmmsg.getContent(), "<soap:");
+ assertNotNull(rmmsg.getContentType());
+ assertTrue(rmmsg.getContentType().startsWith("text/xml"));
+ }
+
+ @Test
+ public void testEncodeRMContentWithAttachments() throws Exception {
+ ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes());
+
+ RMMessage rmmsg = new RMMessage();
+ Message messageImpl = new MessageImpl();
+ messageImpl.put(Message.CONTENT_TYPE, "text/xml");
+ // add attachments
+ addAttachment(messageImpl);
+ // update rmmessage
+ PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
+
+ assertStartsWith(rmmsg.getContent(), "--uuid:");
+ assertNotNull(rmmsg.getContentType());
+ assertTrue(rmmsg.getContentType().startsWith("multipart/related"));
+ }
+
+ @Test
+ public void testEncodeDecodeRMContent() throws Exception {
+ ByteArrayInputStream bis = new ByteArrayInputStream(SOAP_PART.getBytes());
+ RMMessage rmmsg = new RMMessage();
+ Message messageImpl = new MessageImpl();
+ messageImpl.put(Message.CONTENT_TYPE, "text/xml");
+ // add attachments
+ addAttachment(messageImpl);
+ // serialize
+ PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
+
+ Message messageImplRestored = new MessageImpl();
+ PersistenceUtils.decodeRMContent(rmmsg, messageImplRestored);
+ assertEquals(1, messageImplRestored.getAttachments().size());
+
+ assertStartsWith(messageImplRestored.getContent(InputStream.class), SOAP_PART);
+ }
+
+ @Test
+ public void testDecodeRMContentWithAttachment() throws Exception {
+ InputStream is = getClass().getResourceAsStream("SerializedRMMessage.txt");
+ RMMessage msg = new RMMessage();
+ msg.setContent(is);
+ msg.setContentType(MULTIPART_TYPE);
+ Message messageImpl = new MessageImpl();
+ PersistenceUtils.decodeRMContent(msg, messageImpl);
+
+ assertEquals(1, messageImpl.getAttachments().size());
+ assertStartsWith(messageImpl.getContent(InputStream.class), "<soap:Envelope");
+ }
+
+ private static void addAttachment(Message msg) throws IOException {
+ Collection<Attachment> attachments = new ArrayList<Attachment>();
+ DataHandler dh = new DataHandler(new ByteArrayDataSource("hello world!", "text/plain"));
+ Attachment a = new AttachmentImpl("test.xml", dh);
+ attachments.add(a);
+ msg.setAttachments(attachments);
+ }
+
+ // just read the begining of the input and compare it against the specified string
+ private static boolean assertStartsWith(InputStream in, String starting) {
+ assertNotNull(in);
+ byte[] buf = new byte[starting.length()];
+ try {
+ in.read(buf, 0, buf.length);
+ assertEquals(starting, new String(buf, "utf-8"));
+ return true;
+ } catch (IOException e) {
+ // ignore
+ } finally {
+ try {
+ in.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt
new file mode 100644
index 0000000..66e33a4
--- /dev/null
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/SerializedRMMessage.txt
@@ -0,0 +1,14 @@
+
+--uuid:74b6a245-2e17-40eb-a86c-308664e18460
+Content-Type: text/xml; charset=UTF-8
+Content-Transfer-Encoding: binary
+Content-ID: <ro...@cxf.apache.org>
+
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"><soap:Header><Action xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">http://cxf.apache.org/hello_world_soap_http/Greeter/greetMeOneWayRequest</Action><MessageID xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">urn:uuid:9a29d476-d1c6-4cc2-b8cf-76de0cf1d4c7</MessageID><To xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">http://localhost:8999/SoapContext/GreeterPort</To></soap:Header><soap:Body><greetMeOneWay xmlns="http://cxf.apache.org/hello_world_soap_http/types"><requestType>Chris</requestType></greetMeOneWay></soap:Body></soap:Envelope>
+--uuid:74b6a245-2e17-40eb-a86c-308664e18460
+Content-Type: text/plain
+Content-Transfer-Encoding: binary
+Content-ID: <test.xml>
+
+Hello World!
+--uuid:74b6a245-2e17-40eb-a86c-308664e18460--
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/cd0e74b8/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
index 306e3b0..fb62f34 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
@@ -224,9 +224,7 @@ public abstract class RMTxStoreTestBase extends Assert {
byte[] bytes = new byte[89];
EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
- EasyMock.expect(msg1.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
- EasyMock.expect(msg2.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
-
+ EasyMock.expect(msg1.getContentType()).andReturn("text/xml").times(1);
control.replay();
Connection con = getConnection();
@@ -264,9 +262,7 @@ public abstract class RMTxStoreTestBase extends Assert {
EasyMock.expect(msg2.getMessageNumber()).andReturn(TEN).anyTimes();
EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
- EasyMock.expect(msg1.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
- EasyMock.expect(msg2.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
-
+
control.replay();
con = getConnection();
try {
@@ -863,7 +859,8 @@ public abstract class RMTxStoreTestBase extends Assert {
RMMessage msg = control.createMock(RMMessage.class);
EasyMock.expect(msg.getMessageNumber()).andReturn(mn).anyTimes();
EasyMock.expect(msg.getTo()).andReturn(to).anyTimes();
- EasyMock.expect(msg.getAttachments()).andReturn(new ArrayList<InputStream>()).anyTimes();
+
+ EasyMock.expect(msg.getContentType()).andReturn("text/xml").anyTimes();
byte[] value = ("Message " + mn.longValue()).getBytes();
EasyMock.expect(msg.getContent()).andReturn(new ByteArrayInputStream(value)).anyTimes();
return msg;