You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2016/05/25 15:29:38 UTC
cxf git commit: [CXF-6646] CXF 3.x WSRM Replace RewindableInputStream
with CachedOutputStream (ajusted for 3.0.x)
Repository: cxf
Updated Branches:
refs/heads/3.0.x-fixes d2d10aa94 -> 9a6895f45
[CXF-6646] CXF 3.x WSRM Replace RewindableInputStream with CachedOutputStream (ajusted for 3.0.x)
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/9a6895f4
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/9a6895f4
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/9a6895f4
Branch: refs/heads/3.0.x-fixes
Commit: 9a6895f45529b0c119abd1b20bd75ed07249f191
Parents: d2d10aa
Author: Kai Rommel <ka...@sap.com>
Authored: Wed May 4 02:45:38 2016 +0200
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Wed May 25 17:29:08 2016 +0200
----------------------------------------------------------------------
.../apache/cxf/ws/rm/DestinationSequence.java | 7 +--
.../cxf/ws/rm/RMCaptureInInterceptor.java | 6 +--
.../cxf/ws/rm/RMCaptureOutInterceptor.java | 13 ++++--
.../apache/cxf/ws/rm/RMMessageConstants.java | 6 ++-
.../cxf/ws/rm/persistence/PersistenceUtils.java | 48 +++++++++++++++-----
.../apache/cxf/ws/rm/persistence/RMMessage.java | 14 +++---
.../cxf/ws/rm/persistence/jdbc/RMTxStore.java | 18 +++++++-
.../cxf/ws/rm/soap/RetransmissionQueueImpl.java | 45 ++++++++++++++----
.../org/apache/cxf/ws/rm/RMManagerTest.java | 22 ++++++---
.../ws/rm/persistence/PersistenceUtilsTest.java | 22 ++++++---
.../ws/rm/persistence/RMLargeMessageTest.java | 4 --
.../cxf/ws/rm/persistence/RMMessageTest.java | 16 +------
.../rm/persistence/jdbc/RMTxStoreTestBase.java | 25 +++++++---
13 files changed, 168 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 2e1a54b..58b7906 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -33,6 +33,7 @@ import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
@@ -167,9 +168,9 @@ public class DestinationSequence extends AbstractSequence {
RMMessage msg = null;
if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
msg = new RMMessage();
- RewindableInputStream in = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
- in.rewind();
- msg.setContent(in);
+ CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ msg.setContent(cos);
+ msg.setContentType((String) message.get(Message.CONTENT_TYPE));
msg.setMessageNumber(st.getMessageNumber());
}
store.persistIncoming(this, msg);
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
index 40b4fab..9d48cbc 100755
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
@@ -57,9 +57,9 @@ public class RMCaptureInInterceptor extends AbstractRMInterceptor<Message> {
saved.lockOutputStream();
LOG.fine("Capturing the original RM message");
- RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream());
- message.setContent(InputStream.class, ris);
- message.put(RMMessageConstants.SAVED_CONTENT, ris);
+ //RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream());
+ message.setContent(InputStream.class, saved.getInputStream());
+ message.put(RMMessageConstants.SAVED_CONTENT, saved);
} catch (Exception e) {
throw new Fault(e);
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index a580071..a875e1d 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -19,8 +19,8 @@
package org.apache.cxf.ws.rm;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -35,10 +35,12 @@ import org.apache.cxf.binding.Binding;
import org.apache.cxf.binding.soap.interceptor.SoapOutInterceptor;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
import org.apache.cxf.interceptor.AttachmentOutInterceptor;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.LoggingOutInterceptor;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.FaultMode;
@@ -254,8 +256,11 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> {
}
// save message for potential retransmission
- ByteArrayInputStream bis = cw.getOutputStream().createInputStream();
- message.put(RMMessageConstants.SAVED_CONTENT, RewindableInputStream.makeRewindable(bis));
+ CachedOutputStream cos = new CachedOutputStream();
+ IOUtils.copyAndCloseInput(cw.getOutputStream().createInputStream(), cos);
+ cos.flush();
+ InputStream is = cos.getInputStream();
+ message.put(RMMessageConstants.SAVED_CONTENT, cos);
RMManager manager = getManager();
manager.getRetransmissionQueue().start();
manager.getRetransmissionQueue().addUnacknowledged(message);
@@ -275,7 +280,7 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> {
}
// serializes the message content and the attachments into
// the RMMessage content
- PersistenceUtils.encodeRMContent(msg, message, bis);
+ PersistenceUtils.encodeRMContent(msg, message, is);
store.persistOutgoing(ss, msg);
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
index eb6789c..15999e9 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
@@ -36,8 +36,12 @@ public final class RMMessageConstants {
public static final String ORIGINAL_REQUESTOR_ROLE = "org.apache.cxf.client.original";
- /** Message content (must be an instance of {@link RewindableInputStream}. */
+ /** Message content must be an instance of {@link CachedOutputStream}. */
public static final String SAVED_CONTENT = "org.apache.cxf.ws.rm.content";
+
+ /** Variable holds reference to source streams of the attachments.
+ * It must be an instance of {@link Closeable}. */
+ public static final String ATTACHMENTS_CLOSEABLE = "org.apache.cxf.ws.rm.attachment.closeable";
/** Retransmission in progress flag (Boolean.TRUE if in progress). */
public static final String RM_RETRANSMISSION = "org.apache.cxf.ws.rm.retransmitting";
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
index 0981f8e..43f01d9 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
@@ -19,6 +19,7 @@
package org.apache.cxf.ws.rm.persistence;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -38,7 +39,6 @@ import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.staxutils.StaxUtils;
import org.apache.cxf.ws.rm.RMMessageConstants;
-import org.apache.cxf.ws.rm.RewindableInputStream;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
/**
@@ -105,13 +105,14 @@ public final class PersistenceUtils {
public static void encodeRMContent(RMMessage rmmsg, Message msg, InputStream msgContent)
throws IOException {
+ CachedOutputStream cos = new CachedOutputStream();
if (msg.getAttachments() == null) {
rmmsg.setContentType((String)msg.get(Message.CONTENT_TYPE));
- rmmsg.setContent(msgContent);
+ IOUtils.copyAndCloseInput(msgContent, cos);
+ cos.flush();
+ rmmsg.setContent(cos);
} else {
MessageImpl msgImpl1 = new MessageImpl();
- // using cached output stream to handle large files
- CachedOutputStream cos = new CachedOutputStream();
msgImpl1.setContent(OutputStream.class, cos);
msgImpl1.setAttachments(msg.getAttachments());
msgImpl1.put(Message.CONTENT_TYPE, (String) msg.get(Message.CONTENT_TYPE));
@@ -121,26 +122,49 @@ public final class PersistenceUtils {
serializer.writeProlog();
// write soap root message into cached output stream
IOUtils.copyAndCloseInput(msgContent, cos);
+ cos.flush();
serializer.writeAttachments();
rmmsg.setContentType((String) msgImpl1.get(Message.CONTENT_TYPE));
-
- //TODO will pass the cos instance to rmmessage in the future
- rmmsg.setContent(cos.getInputStream());
+ rmmsg.setContent(cos);
}
}
public static void decodeRMContent(RMMessage rmmsg, Message msg) throws IOException {
String contentType = rmmsg.getContentType();
+ final CachedOutputStream cos = rmmsg.getContent();
if ((null != contentType) && contentType.startsWith("multipart/related")) {
+ final InputStream is = cos.getInputStream();
msg.put(Message.CONTENT_TYPE, contentType);
- msg.setContent(InputStream.class, rmmsg.getContent());
+ msg.setContent(InputStream.class, is);
AttachmentDeserializer ad = new AttachmentDeserializer(msg);
ad.initializeAttachments();
+ // create new cos with soap envelope only
+ CachedOutputStream cosSoap = new CachedOutputStream();
+ IOUtils.copy(msg.getContent(InputStream.class), cosSoap);
+ cosSoap.flush();
+ msg.put(RMMessageConstants.SAVED_CONTENT, cosSoap);
+ // REVISIT -- At the moment references must be hold for retransmission
+ // and the final cleanup of the CachedOutputStream.
+ msg.put(RMMessageConstants.ATTACHMENTS_CLOSEABLE, new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ try {
+ is.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ try {
+ cos.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+
+ });
} else {
- msg.setContent(InputStream.class, rmmsg.getContent());
+ msg.put(RMMessageConstants.SAVED_CONTENT, cos);
}
- InputStream is = RewindableInputStream.makeRewindable(msg.getContent(InputStream.class));
- msg.setContent(InputStream.class, is);
- msg.put(RMMessageConstants.SAVED_CONTENT, is);
}
+
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
index abab221..348117c 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
@@ -22,9 +22,11 @@ import java.io.InputStream;
import java.util.Collections;
import java.util.List;
+import org.apache.cxf.io.CachedOutputStream;
+
public class RMMessage {
- private InputStream content;
+ private CachedOutputStream content;
//TODO remove attachments when we remove the deprecated attachments related methods
private List<InputStream> attachments = Collections.emptyList();
private String contentType;
@@ -48,11 +50,11 @@ public class RMMessage {
}
/**
- * Sets the message content using the input stream.
+ * Sets the message content using the CachedOutputStream.class.
* @param in
*/
- public void setContent(InputStream in) {
- content = in;
+ public void setContent(CachedOutputStream cos) {
+ content = cos;
}
/**
@@ -73,11 +75,11 @@ public class RMMessage {
}
/**
- * Returns the input stream of this message content.
+ * Returns the CachedOutputStream of this message content.
* @return
* @throws IOException
*/
- public InputStream getContent() {
+ public CachedOutputStream getContent() {
return content;
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
index cd19d7b..be0b960 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
@@ -51,6 +51,8 @@ import org.apache.cxf.common.i18n.Message;
import org.apache.cxf.common.injection.NoJSR250Annotations;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.SystemPropertyAction;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.rm.DestinationSequence;
import org.apache.cxf.ws.rm.ProtocolVariation;
@@ -599,7 +601,10 @@ public class RMTxStore implements RMStore {
RMMessage msg = new RMMessage();
msg.setMessageNumber(mn);
msg.setTo(to);
- msg.setContent(blob.getBinaryStream());
+ CachedOutputStream cos = new CachedOutputStream();
+ IOUtils.copyAndCloseInput(blob.getBinaryStream(), cos);
+ cos.flush();
+ msg.setContent(cos);
msg.setContentType(contentType);
msgs.add(msg);
}
@@ -607,6 +612,9 @@ public class RMTxStore implements RMStore {
conex = ex;
LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
: "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
+ } catch (IOException e) {
+ abort(con);
+ throw new RMStoreException(e);
} finally {
releaseResources(stmt, res);
updateConnectionState(con, conex);
@@ -735,8 +743,10 @@ public class RMTxStore implements RMStore {
new Object[] {outbound ? "outbound" : "inbound", nr, id, to});
}
PreparedStatement stmt = null;
+ CachedOutputStream cos = msg.getContent();
+ InputStream msgin = null;
try {
- InputStream msgin = msg.getContent();
+ msgin = cos.getInputStream();
stmt = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
stmt.setString(1, id);
@@ -751,6 +761,10 @@ public class RMTxStore implements RMStore {
}
} finally {
releaseResources(stmt, null);
+ if (null != msgin) {
+ msgin.close();
+ }
+ cos.close(); // needed to clean-up tmp file folder
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
index a3a9899..3a1ec17 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
@@ -19,6 +19,9 @@
package org.apache.cxf.ws.rm.soap;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
@@ -57,6 +60,7 @@ import org.apache.cxf.helpers.DOMUtils;
import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.io.CachedOutputStreamCallback;
import org.apache.cxf.io.WriteOnCloseOutputStream;
import org.apache.cxf.message.Message;
@@ -91,7 +95,6 @@ import org.apache.cxf.ws.rm.RMProperties;
import org.apache.cxf.ws.rm.RMUtils;
import org.apache.cxf.ws.rm.RetransmissionQueue;
import org.apache.cxf.ws.rm.RetryStatus;
-import org.apache.cxf.ws.rm.RewindableInputStream;
import org.apache.cxf.ws.rm.SourceSequence;
import org.apache.cxf.ws.rm.manager.RetryPolicyType;
import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -585,10 +588,24 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
}
private void releaseSavedMessage() {
- RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
- if (is != null) {
- is.release();
+ CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ if (cos != null) {
+ cos.releaseTempFileHold();
+ try {
+ cos.close();
+ } catch (IOException e) {
+ // ignore
+ }
}
+ // REVISIT -- When reference holder is not needed anymore, code can be removed.
+ Closeable closeable = (Closeable)message.get(RMMessageConstants.ATTACHMENTS_CLOSEABLE);
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
}
/**
@@ -760,8 +777,9 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
}
// read SOAP headers from saved input stream
- RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
- is.rewind();
+ CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ cos.holdTempFile(); // CachedOutputStream is hold until delivering was successful
+ InputStream is = cos.getInputStream(); // instance is needed to close input stream later on
XMLStreamReader reader = StaxUtils.createXMLStreamReader(is, "UTF-8");
message.getHeaders().clear();
if (reader.getEventType() != XMLStreamConstants.START_ELEMENT
@@ -814,7 +832,7 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
retransmitChain.remove(incept);
}
}
- retransmitChain.add(new CopyOutInterceptor(reader));
+ retransmitChain.add(new CopyOutInterceptor(reader, is));
// restore callbacks on output stream
if (callbacks != null) {
@@ -921,10 +939,12 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
public static class CopyOutInterceptor extends AbstractOutDatabindingInterceptor {
private final XMLStreamReader reader;
+ private InputStream is;
- public CopyOutInterceptor(XMLStreamReader rdr) {
+ public CopyOutInterceptor(XMLStreamReader rdr, InputStream is) {
super(Phase.MARSHAL);
reader = rdr;
+ this.is = is;
}
@Override
@@ -932,9 +952,16 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
try {
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
StaxUtils.copy(reader, writer);
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
} catch (XMLStreamException e) {
throw new Fault("COULD_NOT_READ_XML_STREAM", LOG, e);
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
index 4bb59f1..e56c9dc 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
@@ -41,6 +41,8 @@ import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.Service;
@@ -505,7 +507,7 @@ public class RMManagerTest extends Assert {
}
@Test
- public void testRecoverReliableClientEndpoint() throws NoSuchMethodException {
+ public void testRecoverReliableClientEndpoint() throws NoSuchMethodException, IOException {
Method method = RMManager.class.getDeclaredMethod("createReliableEndpoint",
new Class[] {Endpoint.class});
manager = control.createMock(RMManager.class, new Method[] {method});
@@ -563,7 +565,10 @@ public class RMManagerTest extends Assert {
DestinationSequence ds = control.createMock(DestinationSequence.class);
RMMessage m1 = new RMMessage();
InputStream fis = getClass().getResourceAsStream("persistence/SerializedRMMessage.txt");
- m1.setContent(fis);
+ CachedOutputStream cos = new CachedOutputStream();
+ IOUtils.copyAndCloseInput(fis, cos);
+ cos.flush();
+ m1.setContent(cos);
m1.setTo("toAddress");
m1.setMessageNumber(new Long(10));
m1.setContentType(MULTIPART_TYPE);
@@ -579,8 +584,8 @@ public class RMManagerTest extends Assert {
assertNotNull(msg.getExchange());
assertSame(msg, msg.getExchange().getOutMessage());
- InputStream is = (InputStream) msg.get(RMMessageConstants.SAVED_CONTENT);
- assertStartsWith(is, "<soap:Envelope");
+ CachedOutputStream cos1 = (CachedOutputStream) msg.get(RMMessageConstants.SAVED_CONTENT);
+ assertStartsWith(cos1.getInputStream(), "<soap:Envelope");
assertEquals(1, msg.getAttachments().size());
}
@@ -673,7 +678,8 @@ public class RMManagerTest extends Assert {
void setUpRecoverReliableEndpoint(Endpoint endpoint,
Conduit conduit,
SourceSequence ss,
- DestinationSequence ds, RMMessage m, Capture<Message> mc) {
+ DestinationSequence ds, RMMessage m, Capture<Message> mc)
+ throws IOException {
RMStore store = control.createMock(RMStore.class);
RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
manager.setStore(store);
@@ -735,7 +741,11 @@ public class RMManagerTest extends Assert {
EasyMock.expect(m.getTo()).andReturn("toAddress");
}
InputStream is = new ByteArrayInputStream(new byte[0]);
- EasyMock.expect(m.getContent()).andReturn(is).anyTimes();
+ CachedOutputStream cos = new CachedOutputStream();
+ IOUtils.copy(is, cos);
+ cos.flush();
+ is.close();
+ EasyMock.expect(m.getContent()).andReturn(cos).anyTimes();
if (mc != null) {
queue.addUnacknowledged(EasyMock.capture(mc));
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
index c0667fb..9cccd2a 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java
@@ -29,9 +29,12 @@ import javax.activation.DataHandler;
import javax.mail.util.ByteArrayDataSource;
import org.apache.cxf.attachment.AttachmentImpl;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Attachment;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.ws.rm.RMMessageConstants;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange;
@@ -76,7 +79,7 @@ public class PersistenceUtilsTest extends Assert {
// update rmmessage
PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
- assertStartsWith(rmmsg.getContent(), "<soap:");
+ assertStartsWith(rmmsg.getContent().getInputStream(), "<soap:");
assertNotNull(rmmsg.getContentType());
assertTrue(rmmsg.getContentType().startsWith("text/xml"));
}
@@ -93,7 +96,7 @@ public class PersistenceUtilsTest extends Assert {
// update rmmessage
PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis);
- assertStartsWith(rmmsg.getContent(), "--uuid:");
+ assertStartsWith(rmmsg.getContent().getInputStream(), "--uuid:");
assertNotNull(rmmsg.getContentType());
assertTrue(rmmsg.getContentType().startsWith("multipart/related"));
}
@@ -112,21 +115,26 @@ public class PersistenceUtilsTest extends Assert {
Message messageImplRestored = new MessageImpl();
PersistenceUtils.decodeRMContent(rmmsg, messageImplRestored);
assertEquals(1, messageImplRestored.getAttachments().size());
-
- assertStartsWith(messageImplRestored.getContent(InputStream.class), SOAP_PART);
+ CachedOutputStream cos = (CachedOutputStream)messageImplRestored.get(RMMessageConstants.SAVED_CONTENT);
+ assertStartsWith(cos.getInputStream(), SOAP_PART);
}
@Test
public void testDecodeRMContentWithAttachment() throws Exception {
InputStream is = getClass().getResourceAsStream("SerializedRMMessage.txt");
+ CachedOutputStream cos = new CachedOutputStream();
+ IOUtils.copyAndCloseInput(is, cos);
+ cos.flush();
RMMessage msg = new RMMessage();
- msg.setContent(is);
+ msg.setContent(cos);
msg.setContentType(MULTIPART_TYPE);
Message messageImpl = new MessageImpl();
PersistenceUtils.decodeRMContent(msg, messageImpl);
assertEquals(1, messageImpl.getAttachments().size());
- assertStartsWith(messageImpl.getContent(InputStream.class), "<soap:Envelope");
+ CachedOutputStream cos1 = (CachedOutputStream)messageImpl
+ .get(RMMessageConstants.SAVED_CONTENT);
+ assertStartsWith(cos1.getInputStream(), "<soap:Envelope");
}
private static void addAttachment(Message msg) throws IOException {
@@ -137,7 +145,7 @@ public class PersistenceUtilsTest extends Assert {
msg.setAttachments(attachments);
}
- // just read the begining of the input and compare it against the specified string
+ // just read the beginning of the input and compare it against the specified string
private static boolean assertStartsWith(InputStream in, String starting) {
assertNotNull(in);
byte[] buf = new byte[starting.length()];
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
index 5badbe6..bf5c246 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
@@ -46,10 +46,6 @@ public class RMLargeMessageTest extends RMMessageTest {
}
}
- @Test
- public void testContentInputStream() throws Exception {
- super.testContentInputStream();
- }
@Test
public void testContentCachedOutputStream() throws Exception {
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
index 41322ff..1247f80 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
@@ -19,8 +19,6 @@
package org.apache.cxf.ws.rm.persistence;
-import java.io.ByteArrayInputStream;
-
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.io.CachedOutputStream;
@@ -49,23 +47,13 @@ public class RMMessageTest extends Assert {
}
@Test
- public void testContentInputStream() throws Exception {
- RMMessage msg = new RMMessage();
- msg.setContent(new ByteArrayInputStream(DATA));
-
- byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent());
-
- assertArrayEquals(DATA, msgbytes);
- }
-
- @Test
public void testContentCachedOutputStream() throws Exception {
RMMessage msg = new RMMessage();
CachedOutputStream co = new CachedOutputStream();
co.write(DATA);
- msg.setContent(co.getInputStream());
+ msg.setContent(co);
- byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent());
+ byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent().getInputStream());
assertArrayEquals(DATA, msgbytes);
co.close();
http://git-wip-us.apache.org/repos/asf/cxf/blob/9a6895f4/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
index fb62f34..a5f9723 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
@@ -30,6 +30,7 @@ import java.util.Date;
import java.util.List;
import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.Names;
import org.apache.cxf.ws.rm.DestinationSequence;
@@ -222,8 +223,13 @@ public abstract class RMTxStoreTestBase extends Assert {
EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE).anyTimes();
EasyMock.expect(msg2.getMessageNumber()).andReturn(ONE).anyTimes();
byte[] bytes = new byte[89];
- EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
- EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ CachedOutputStream cos = new CachedOutputStream();
+ IOUtils.copy(bais, cos);
+ cos.flush();
+ bais.close();
+ EasyMock.expect(msg1.getContent()).andReturn(cos).anyTimes();
+ EasyMock.expect(msg2.getContent()).andReturn(cos).anyTimes();
EasyMock.expect(msg1.getContentType()).andReturn("text/xml").times(1);
control.replay();
@@ -241,7 +247,7 @@ public abstract class RMTxStoreTestBase extends Assert {
control.reset();
EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE);
- EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg1.getContent()).andReturn(cos);
control.replay();
con = getConnection();
@@ -260,8 +266,8 @@ public abstract class RMTxStoreTestBase extends Assert {
control.reset();
EasyMock.expect(msg1.getMessageNumber()).andReturn(TEN).anyTimes();
EasyMock.expect(msg2.getMessageNumber()).andReturn(TEN).anyTimes();
- EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
- EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes();
+ EasyMock.expect(msg1.getContent()).andReturn(cos).anyTimes();
+ EasyMock.expect(msg2.getContent()).andReturn(cos).anyTimes();
control.replay();
con = getConnection();
@@ -862,7 +868,12 @@ public abstract class RMTxStoreTestBase extends Assert {
EasyMock.expect(msg.getContentType()).andReturn("text/xml").anyTimes();
byte[] value = ("Message " + mn.longValue()).getBytes();
- EasyMock.expect(msg.getContent()).andReturn(new ByteArrayInputStream(value)).anyTimes();
+ ByteArrayInputStream bais = new ByteArrayInputStream(value);
+ CachedOutputStream cos = new CachedOutputStream();
+ IOUtils.copy(bais, cos);
+ cos.flush();
+ bais.close();
+ EasyMock.expect(msg.getContent()).andReturn(cos).anyTimes();
return msg;
}
@@ -926,7 +937,7 @@ public abstract class RMTxStoreTestBase extends Assert {
assertNull(msg.getTo());
}
try {
- InputStream actual = msg.getContent();
+ InputStream actual = msg.getContent().getInputStream();
assertEquals(new String("Message " + mn), IOUtils.readStringFromStream(actual));
} catch (IOException e) {
fail("failed to get the input stream");