You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2011/03/23 01:03:19 UTC
svn commit: r1084417 - in /cxf/trunk: api/src/main/java/org/apache/cxf/io/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/
rt/ws/rm/src/...
Author: dkulp
Date: Wed Mar 23 00:03:19 2011
New Revision: 1084417
URL: http://svn.apache.org/viewvc?rev=1084417&view=rev
Log:
[CXF-1100] Change WS-RM to cache base on streams instead of byte[]
Patch from Aki Yoshida applied
Added:
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java (with props)
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java (with props)
Modified:
cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java Wed Mar 23 00:03:19 2011
@@ -189,6 +189,12 @@ public class CachedOutputStream extends
}
public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof CachedOutputStream) {
+ return currentStream.equals(((CachedOutputStream)obj).currentStream);
+ }
return currentStream.equals(obj);
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed Mar 23 00:03:19 2011
@@ -455,7 +455,7 @@ public class RMManager implements Server
RMContextUtils.storeMAPs(maps, message, true, false);
}
- message.setContent(byte[].class, m.getContent());
+ message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
retransmissionQueue.addUnacknowledged(message);
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java Wed Mar 23 00:03:19 2011
@@ -20,10 +20,7 @@
package org.apache.cxf.ws.rm;
import java.io.IOException;
-import java.util.logging.Logger;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.io.CachedOutputStreamCallback;
import org.apache.cxf.message.Message;
@@ -37,8 +34,6 @@ import org.apache.cxf.ws.rm.persistence.
*/
public class RetransmissionCallback implements CachedOutputStreamCallback {
- private static final Logger LOG = LogUtils.getL7dLogger(RetransmissionCallback.class);
-
Message message;
RMManager manager;
@@ -47,18 +42,16 @@ public class RetransmissionCallback impl
manager = mgr;
}
public void onClose(CachedOutputStream cos) {
-
- //REVISIT - would be nice to keep the cache on disk instead of in-memory
- byte bytes[] = null;
+ // make a copy as the original gets affected when its resetOut is called
+ CachedOutputStream saved = new CachedOutputStream();
+ saved.holdTempFile();
try {
- bytes = cos.getBytes();
+ cos.writeCacheTo(saved);
} catch (IOException e) {
- throw new Fault(new org.apache.cxf.common.i18n.Message("NO_CACHED_STREAM",
- LOG,
- cos.getOut().getClass()));
+ // ignore
}
-
- message.put(RMMessageConstants.SAVED_CONTENT, bytes);
+
+ message.put(RMMessageConstants.SAVED_CONTENT, saved);
manager.getRetransmissionQueue().addUnacknowledged(message);
RMStore store = manager.getStore();
@@ -75,7 +68,7 @@ public class RetransmissionCallback impl
msg.setTo(maps.getTo().getValue());
}
}
- msg.setContent(bytes);
+ msg.setContent(saved);
store.persistOutgoing(ss, msg);
}
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java Wed Mar 23 00:03:19 2011
@@ -18,9 +18,15 @@
*/
package org.apache.cxf.ws.rm.persistence;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
+
public class RMMessage {
- private byte[] content;
+ private CachedOutputStream content;
private long messageNumber;
private String to;
@@ -44,17 +50,50 @@ public class RMMessage {
/**
* Returns the content of the message as an input stream.
* @return the content
+ * @deprecated
*/
public byte[] getContent() {
- return content;
+ byte[] bytes = null;
+ try {
+ bytes = content.getBytes();
+ } catch (IOException e) {
+ // ignore and treat it as null
+ }
+ return bytes;
}
/**
* Sets the message content as an input stream.
* @param content the message content
+ * @deprecated
*/
public void setContent(byte[] c) {
+ content = new CachedOutputStream();
+ content.holdTempFile();
+ try {
+ content.write(c);
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ /**
+ * Sets the message content using the input stream.
+ * @param in
+ * @throws IOException
+ */
+ public void setContent(InputStream in) throws IOException {
+ content = new CachedOutputStream();
+ content.holdTempFile();
+ IOUtils.copy(in, content);
+ }
+
+ /**
+ * Sets the message content using the cached output stream.
+ * @param c
+ */
+ public void setContent(CachedOutputStream c) {
content = c;
}
@@ -75,11 +114,29 @@ public class RMMessage {
to = t;
}
-
-
-
-
-
+ /**
+ * Returns the input stream of this message content.
+ * @return
+ * @throws IOException
+ */
+ public InputStream getInputStream() throws IOException {
+ return content.getInputStream();
+ }
+ /**
+ * Returns the associated cached output stream.
+ * @return
+ */
+ public CachedOutputStream getCachedOutputStream() {
+ return content;
+ }
+ /**
+ * Returns the length of the message content in bytes.
+ *
+ * @return
+ */
+ public int getSize() {
+ return content.size();
+ }
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java Wed Mar 23 00:03:19 2011
@@ -43,6 +43,20 @@ public interface RMStore {
void createDestinationSequence(DestinationSequence seq);
/**
+ * Retrieve the source sequence with the specified identifier from persistent store.
+ * @param seq the sequence
+ * @return the sequence if present; otherwise null
+ */
+ SourceSequence getSourceSequence(Identifier seq);
+
+ /**
+ * Retrieve the destination sequence with the specified identifier from persistent store.
+ * @param seq the sequence
+ * @return the sequence if present; otherwise null
+ */
+ DestinationSequence getDestinationSequence(Identifier seq);
+
+ /**
* Remove the source sequence with the specified identifier from persistent store.
* @param seq the sequence
*/
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Wed Mar 23 00:03:19 2011
@@ -20,7 +20,6 @@
package org.apache.cxf.ws.rm.persistence.jdbc;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -45,7 +44,6 @@ import javax.annotation.PostConstruct;
import org.apache.cxf.common.i18n.Message;
import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
import org.apache.cxf.ws.rm.DestinationSequence;
import org.apache.cxf.ws.rm.Identifier;
@@ -106,6 +104,12 @@ public class RMTxStore implements RMStor
= "INSERT INTO {0} VALUES(?, ?, ?, ?)";
private static final String DELETE_MESSAGE_STMT_STR =
"DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
+ private static final String SELECT_DEST_SEQUENCE_STMT_STR =
+ "SELECT ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ + "WHERE SEQ_ID = ?";
+ private static final String SELECT_SRC_SEQUENCE_STMT_STR =
+ "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CXF_RM_SRC_SEQUENCES "
+ + "WHERE SEQ_ID = ?";
private static final String SELECT_DEST_SEQUENCES_STMT_STR =
"SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "WHERE ENDPOINT_ID = ?";
@@ -131,6 +135,8 @@ public class RMTxStore implements RMStor
private PreparedStatement updateSrcSequenceStmt;
private PreparedStatement selectDestSequencesStmt;
private PreparedStatement selectSrcSequencesStmt;
+ private PreparedStatement selectDestSequenceStmt;
+ private PreparedStatement selectSrcSequenceStmt;
private PreparedStatement createInboundMessageStmt;
private PreparedStatement createOutboundMessageStmt;
private PreparedStatement deleteInboundMessageStmt;
@@ -242,6 +248,68 @@ public class RMTxStore implements RMStor
}
}
+ public DestinationSequence getDestinationSequence(Identifier sid) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.info("Getting destination sequence for id: " + sid);
+ }
+ try {
+ if (null == selectDestSequenceStmt) {
+ selectDestSequenceStmt =
+ connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);
+ }
+ selectDestSequenceStmt.setString(1, sid.getValue());
+
+ ResultSet res = selectDestSequenceStmt.executeQuery();
+ if (res.next()) {
+ EndpointReferenceType acksTo = RMUtils.createReference2004(res.getString(1));
+ long lm = res.getLong(2);
+ InputStream is = res.getBinaryStream(3);
+ SequenceAcknowledgement ack = null;
+ if (null != is) {
+ ack = PersistenceUtils.getInstance()
+ .deserialiseAcknowledgment(is);
+ }
+ return new DestinationSequence(sid, acksTo, lm, ack);
+ }
+ } catch (SQLException ex) {
+ LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(), ex);
+ }
+ return null;
+ }
+
+ public SourceSequence getSourceSequence(Identifier sid) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.info("Getting source sequences for id: " + sid);
+ }
+ try {
+ if (null == selectSrcSequenceStmt) {
+ selectSrcSequenceStmt =
+ connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);
+ }
+ selectSrcSequenceStmt.setString(1, sid.getValue());
+ ResultSet res = selectSrcSequenceStmt.executeQuery();
+
+ if (res.next()) {
+ long cmn = res.getLong(1);
+ boolean lm = res.getBoolean(2);
+ long lval = res.getLong(3);
+ Date expiry = 0 == lval ? null : new Date(lval);
+ String oidValue = res.getString(4);
+ Identifier oi = null;
+ if (null != oidValue) {
+ oi = RMUtils.getWSRMFactory().createIdentifier();
+ oi.setValue(oidValue);
+ }
+ return new SourceSequence(sid, expiry, oi, cmn, lm);
+
+ }
+ } catch (SQLException ex) {
+ // ignore
+ LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG", LOG).toString(), ex);
+ }
+ return null;
+ }
+
public void removeDestinationSequence(Identifier sid) {
try {
beginTransaction();
@@ -367,17 +435,16 @@ public class RMTxStore implements RMStor
long mn = res.getLong(1);
String to = res.getString(2);
Blob blob = res.getBlob(3);
- byte[] bytes = blob.getBytes(1, (int)blob.length());
RMMessage msg = new RMMessage();
msg.setMessageNumber(mn);
msg.setTo(to);
- msg.setContent(bytes);
+ msg.setContent(blob.getBinaryStream());
msgs.add(msg);
}
- } catch (SQLException ex) {
+ } catch (Exception ex) {
LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
: "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
- }
+ }
return msgs;
}
@@ -496,12 +563,7 @@ public class RMTxStore implements RMStor
stmt.setString(i++, id);
stmt.setBigDecimal(i++, new BigDecimal(nr));
stmt.setString(i++, to);
- byte[] bytes = msg.getContent();
- stmt.setBinaryStream(i++, new ByteArrayInputStream(bytes) {
- public String toString() {
- return IOUtils.newStringFromBytes(buf, 0, count);
- }
- }, bytes.length);
+ stmt.setBinaryStream(i++, msg.getInputStream(), msg.getSize());
stmt.execute();
LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}",
new Object[] {outbound ? "outbound" : "inbound", nr, id});
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Wed Mar 23 00:03:19 2011
@@ -21,6 +21,7 @@ package org.apache.cxf.ws.rm.soap;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.util.ArrayList;
@@ -336,19 +337,26 @@ public class RetransmissionQueueImpl imp
}
}
}
- byte[] content = (byte[])message
+ CachedOutputStream content = (CachedOutputStream)message
.get(RMMessageConstants.SAVED_CONTENT);
- if (null == content) {
- content = message.getContent(byte[].class);
+ InputStream bis = null;
+ if (null == content) {
+ byte[] savedbytes = message.getContent(byte[].class);
+ bis = new ByteArrayInputStream(savedbytes);
if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Using saved byte array: " + Arrays.toString(content));
+ LOG.fine("Using saved byte array: " + Arrays.toString(savedbytes));
}
} else {
+ bis = content.getInputStream();
if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Using saved output stream: " + IOUtils.newStringFromBytes(content));
+ if (content.size() < 65536) {
+ LOG.fine("Using saved output stream: "
+ + IOUtils.newStringFromBytes(content.getBytes()));
+ } else {
+ LOG.fine("Using saved output stream: ...");
+ }
}
}
- ByteArrayInputStream bis = new ByteArrayInputStream(content);
// copy saved output stream to new output stream in chunks of 1024
IOUtils.copyAndCloseInput(bis, os);
@@ -474,6 +482,7 @@ public class RetransmissionQueueImpl imp
next = null;
if (null != nextTask) {
nextTask.cancel();
+ releaseSavedMessage();
}
}
@@ -483,9 +492,17 @@ public class RetransmissionQueueImpl imp
protected void cancel() {
if (null != nextTask) {
nextTask.cancel();
+ releaseSavedMessage();
}
}
+ private void releaseSavedMessage() {
+ CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ if (saved != null) {
+ saved.releaseTempFileHold();
+ }
+
+ }
/**
* @return associated message context
*/
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java Wed Mar 23 00:03:19 2011
@@ -153,6 +153,16 @@ public class RMManagerConfigurationTest
// TODO Auto-generated method stub
}
+
+ public SourceSequence getSourceSequence(Identifier seq) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public DestinationSequence getDestinationSequence(Identifier seq) {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
}
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Wed Mar 23 00:03:19 2011
@@ -19,6 +19,7 @@
package org.apache.cxf.ws.rm;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,6 +37,7 @@ import org.apache.cxf.bus.spring.SpringB
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.Service;
@@ -587,7 +589,14 @@ public class RMManagerTest extends Asser
EasyMock.expect(m.getTo()).andReturn("toAddress");
}
byte[] content = new byte[] {'x', '9'};
- EasyMock.expect(m.getContent()).andReturn(content);
+ CachedOutputStream cos = new CachedOutputStream();
+ try {
+ cos.write(content);
+ } catch (IOException e) {
+ // ignore
+ }
+ EasyMock.expect(m.getCachedOutputStream()).andReturn(cos);
+
queue.addUnacknowledged(EasyMock.isA(Message.class));
EasyMock.expectLastCall();
queue.start();
Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.persistence;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * A version of RMMessageTest using a cached out set up.
+ */
+public class RMLargeMessageTest extends RMMessageTest {
+ private static String oldThreshold;
+
+ @BeforeClass
+ public static void setupThreshold() throws Exception {
+ oldThreshold = System.getProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+ // forces the CacheOutputStream to use temporary file caching
+ System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", "16");
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ if (oldThreshold == null) {
+ System.clearProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+ } else {
+ System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", oldThreshold);
+ }
+ }
+
+ @Test
+ public void testContentInputStream() throws Exception {
+ super.testContentInputStream();
+ }
+
+ @Test
+ public void testContentCachedOutputStream() throws Exception {
+ super.testContentCachedOutputStream();
+ }
+}
Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.persistence;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CachedOutputStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ *
+ */
+public class RMMessageTest extends Assert {
+ private static final byte[] DATA =
+ ("<greetMe xmlns=\"http://cxf.apache.org/greeter_control/types\">"
+ + "<requestType>one</requestType></greetMe>").getBytes();
+ private static final String TO = "http://localhost:9999/decoupled_endpoint";
+
+ @Test
+ public void testAttributes() throws Exception {
+ RMMessage msg = new RMMessage();
+
+ msg.setTo(TO);
+ msg.setMessageNumber(1);
+
+ assertEquals(msg.getTo(), TO);
+ assertEquals(msg.getMessageNumber(), 1);
+ }
+
+ @Test
+ public void testContentInputStream() throws Exception {
+ RMMessage msg = new RMMessage();
+ msg.setContent(new ByteArrayInputStream(DATA));
+
+ assertEquals(DATA.length, msg.getSize());
+ byte[] msgbytes = IOUtils.readBytesFromStream(msg.getInputStream());
+
+ assertArrayEquals(DATA, msgbytes);
+ }
+
+ @Test
+ public void testContentCachedOutputStream() throws Exception {
+ RMMessage msg = new RMMessage();
+ CachedOutputStream co = new CachedOutputStream();
+ co.write(DATA);
+ msg.setContent(co);
+
+ assertEquals(DATA.length, msg.getSize());
+ byte[] msgbytes = IOUtils.readBytesFromStream(msg.getInputStream());
+
+ assertArrayEquals(DATA, msgbytes);
+
+ assertEquals(co, msg.getCachedOutputStream());
+ }
+}
Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java Wed Mar 23 00:03:19 2011
@@ -19,7 +19,9 @@
package org.apache.cxf.ws.rm.persistence.jdbc;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
@@ -227,7 +229,8 @@ public class RMTxStoreTest extends Asser
sid1.setValue("sequence1");
EasyMock.expect(msg.getMessageNumber()).andReturn(ONE).times(2);
byte[] bytes = new byte[89];
- EasyMock.expect(msg.getContent()).andReturn(bytes).times(2);
+ EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg.getSize()).andReturn(bytes.length);
control.replay();
store.beginTransaction();
@@ -238,7 +241,8 @@ public class RMTxStoreTest extends Asser
control.reset();
EasyMock.expect(msg.getMessageNumber()).andReturn(ONE);
- EasyMock.expect(msg.getContent()).andReturn(bytes);
+ EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg.getSize()).andReturn(bytes.length);
control.replay();
store.beginTransaction();
@@ -252,7 +256,8 @@ public class RMTxStoreTest extends Asser
control.reset();
EasyMock.expect(msg.getMessageNumber()).andReturn(TEN).times(2);
- EasyMock.expect(msg.getContent()).andReturn(bytes).times(2);
+ EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg.getSize()).andReturn(bytes.length);
control.replay();
store.beginTransaction();
@@ -411,6 +416,62 @@ public class RMTxStoreTest extends Asser
}
@Test
+ public void testGetDestinationSequence() throws SQLException, IOException {
+
+ Identifier sid1 = null;
+ Identifier sid2 = null;
+
+ DestinationSequence seq = store.getDestinationSequence(RMUtils.getWSRMFactory().createIdentifier());
+ assertNull(seq);
+
+ try {
+ sid1 = setupDestinationSequence("sequence1");
+
+ seq = store.getDestinationSequence(sid1);
+ assertNotNull(seq);
+
+ sid2 = setupDestinationSequence("sequence2");
+ seq = store.getDestinationSequence(sid2);
+ assertNotNull(seq);
+ } finally {
+ if (null != sid1) {
+ store.removeDestinationSequence(sid1);
+ }
+ if (null != sid2) {
+ store.removeDestinationSequence(sid2);
+ }
+ }
+ }
+
+ @Test
+ public void testGetSourceSequence() throws SQLException, IOException {
+
+ Identifier sid1 = null;
+ Identifier sid2 = null;
+
+ SourceSequence seq = store.getSourceSequence(RMUtils.getWSRMFactory().createIdentifier());
+ assertNull(seq);
+
+ try {
+ sid1 = setupSourceSequence("sequence1");
+
+ seq = store.getSourceSequence(sid1);
+ assertNotNull(seq);
+
+ sid2 = setupSourceSequence("sequence2");
+ seq = store.getSourceSequence(sid2);
+ assertNotNull(seq);
+ } finally {
+ if (null != sid1) {
+ store.removeSourceSequence(sid1);
+ }
+ if (null != sid2) {
+ store.removeSourceSequence(sid2);
+ }
+ }
+ }
+
+ @Test
public void testGetMessages() throws SQLException, IOException {
Identifier sid1 = RMUtils.getWSRMFactory().createIdentifier();
@@ -528,8 +589,9 @@ public class RMTxStoreTest extends Asser
RMMessage msg = control.createMock(RMMessage.class);
EasyMock.expect(msg.getMessageNumber()).andReturn(mn);
EasyMock.expect(msg.getTo()).andReturn(to);
- String value = "Message " + mn.longValue();
- EasyMock.expect(msg.getContent()).andReturn(value.getBytes());
+ byte[] value = ("Message " + mn.longValue()).getBytes();
+ EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(value));
+ EasyMock.expect(msg.getSize()).andReturn(value.length);
control.replay();
store.beginTransaction();
@@ -593,8 +655,12 @@ public class RMTxStoreTest extends Asser
} else {
assertNull(msg.getTo());
}
- byte[] actual = msg.getContent();
- assertEquals(new String("Message " + mn), IOUtils.newStringFromBytes(actual));
+ try {
+ InputStream actual = msg.getInputStream();
+ assertEquals(new String("Message " + mn), IOUtils.readStringFromStream(actual));
+ } catch (IOException e) {
+ fail("failed to get the input stream");
+ }
}
}
Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A simulated-large message version of ClientPersistenceTest.
+ */
+public class CachedOutClientPersistenceTest extends ClientPersistenceTest {
+
+ private static String oldThreshold;
+
+ @BeforeClass
+ public static void setProperties() throws Exception {
+ oldThreshold = System.getProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+ // forces the CacheOutputStream to use temporary file caching
+ System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", "16");
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ if (oldThreshold == null) {
+ System.clearProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+ } else {
+ System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", oldThreshold);
+ }
+ }
+
+ @Test
+ public void testRecovery() throws Exception {
+ super.testRecovery();
+ }
+
+}
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutClientPersistenceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Logger;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.interceptor.LoggingInInterceptor;
+import org.apache.cxf.interceptor.LoggingOutInterceptor;
+import org.apache.cxf.systest.ws.rm.RetransmissionQueueTest.Server;
+import org.apache.cxf.systest.ws.util.ConnectionHelper;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.ws.rm.RMManager;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the WS-RM processing with the cached out message (using temporary files).
+ */
+public class CachedOutMessageTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = allocatePort(Server.class);
+ public static final String DECOUPLE_PORT = allocatePort("decoupled.port");
+
+ private static String oldThreshold;
+
+ private static final Logger LOG = LogUtils.getLogger(RetransmissionQueueTest.class);
+ private Bus bus;
+
+
+
+ public static class Server extends AbstractBusTestServerBase {
+
+ protected void run() {
+ SpringBusFactory bf = new SpringBusFactory();
+ Bus bus = bf.createBus("/org/apache/cxf/systest/ws/rm/message-loss.xml");
+ BusFactory.setDefaultBus(bus);
+ LoggingInInterceptor in = new LoggingInInterceptor();
+ bus.getInInterceptors().add(in);
+ bus.getInFaultInterceptors().add(in);
+ LoggingOutInterceptor out = new LoggingOutInterceptor();
+ bus.getOutInterceptors().add(out);
+ bus.getOutFaultInterceptors().add(out);
+
+ GreeterImpl implementor = new GreeterImpl();
+ String address = "http://localhost:" + PORT + "/SoapContext/GreeterPort";
+
+ Endpoint ep = Endpoint.create(implementor);
+ ep.publish(address);
+
+ LOG.info("Published greeter endpoint.");
+ }
+
+ public static void main(String[] args) {
+ try {
+ Server s = new Server();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+ oldThreshold = System.getProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+ // forces the CacheOutputStream to use temporary file caching
+ System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", "16");
+
+ assertTrue("server did not launch correctly",
+ launchServer(Server.class));
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ if (oldThreshold == null) {
+ System.clearProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+ } else {
+ System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", oldThreshold);
+ }
+ }
+
+ @Test
+ public void testCachedOutMessage() throws Exception {
+ SpringBusFactory bf = new SpringBusFactory();
+ bus = bf.createBus("/org/apache/cxf/systest/ws/rm/message-loss.xml");
+ BusFactory.setDefaultBus(bus);
+ LoggingInInterceptor in = new LoggingInInterceptor();
+ bus.getInInterceptors().add(in);
+ bus.getInFaultInterceptors().add(in);
+ LoggingOutInterceptor out = new LoggingOutInterceptor();
+ bus.getOutInterceptors().add(out);
+ // an interceptor to simulate a message loss
+ MessageLossSimulator mls = new MessageLossSimulator();
+ bus.getOutInterceptors().add(mls);
+ RMManager manager = bus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new Long(2000));
+
+ bus.getOutFaultInterceptors().add(out);
+
+ GreeterService gs = new GreeterService();
+ final Greeter greeter = gs.getGreeterPort();
+ updateAddressPort(greeter, DecoupledClientServerTest.PORT);
+ LOG.fine("Created greeter client.");
+
+ ConnectionHelper.setKeepAliveConnection(greeter, true);
+
+ greeter.greetMeOneWay("one");
+ greeter.greetMeOneWay("two");
+ greeter.greetMeOneWay("three");
+
+ long wait = 4000;
+ while (wait > 0) {
+ long start = System.currentTimeMillis();
+ try {
+ Thread.sleep(wait);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ wait -= System.currentTimeMillis() - start;
+ }
+
+ boolean empty = manager.getRetransmissionQueue().isEmpty();
+ assertTrue("Some messages are not acknowledged", empty);
+ }
+
+}
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutMessageTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java?rev=1084417&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java Wed Mar 23 00:03:19 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A simulated-large message version of ServerPersistenceTest.
+ */
+public class CachedOutServerPersistenceTest extends ServerPersistenceTest {
+
+ private static String oldThreshold;
+
+ @BeforeClass
+ public static void setProperties() throws Exception {
+ oldThreshold = System.getProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+ // forces the CacheOutputStream to use temporary file caching
+ System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", "16");
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ if (oldThreshold == null) {
+ System.clearProperty("org.apache.cxf.io.CachedOutputStream.Threshold");
+ } else {
+ System.setProperty("org.apache.cxf.io.CachedOutputStream.Threshold", oldThreshold);
+ }
+ }
+
+ @Test
+ public void testRecovery() throws Exception {
+ super.testRecovery();
+ }
+
+}
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/CachedOutServerPersistenceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java Wed Mar 23 00:03:19 2011
@@ -101,6 +101,7 @@ public class ServerPersistenceTest exten
LOG.fine("Created bus " + bus + " with default cfg");
ControlService cs = new ControlService();
Control control = cs.getControlPort();
+ ConnectionHelper.setKeepAliveConnection(control, false, true);
updateAddressPort(control, PORT);
assertTrue("Failed to start greeter", control.startGreeter(SERVER_LOSS_CFG));
@@ -119,7 +120,7 @@ public class ServerPersistenceTest exten
LOG.fine("Created greeter client.");
- ConnectionHelper.setKeepAliveConnection(greeter, true);
+ ConnectionHelper.setKeepAliveConnection(greeter, false, true);
Client c = ClientProxy.getClient(greeter);
HTTPConduit hc = (HTTPConduit)(c.getConduit());
@@ -166,12 +167,16 @@ public class ServerPersistenceTest exten
}
void verifyMissingResponse(Response<GreetMeResponse> responses[]) throws Exception {
- awaitMessages(5, 8, 60000);
+ awaitMessages(5, 7, 30000);
- // wait another while to prove that response to second request is indeed lost
- Thread.sleep(4000);
int nDone = 0;
for (int i = 0; i < 3; i++) {
+ // wait another while to prove that response to second request is indeed lost
+ if (!responses[i].isDone()) {
+ Thread.sleep(4000);
+ }
+ }
+ for (int i = 0; i < 3; i++) {
if (responses[i].isDone()) {
nDone++;
}
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml?rev=1084417&r1=1084416&r2=1084417&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/persistent-message-loss-server.xml Wed Mar 23 00:03:19 2011
@@ -23,8 +23,10 @@
xmlns:wsa="http://cxf.apache.org/ws/addressing"
xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xmlns:http="http://cxf.apache.org/transports/http/configuration"
xsi:schemaLocation="
http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
+http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/schemas/configuration/http-conf.xsd
http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
http://cxf.apache.org/configuration/beans http://cxf.apache.org/schemas/configuration/cxf-beans.xsd
@@ -48,5 +50,9 @@ http://www.springframework.org/schema/be
<ref bean="messageLoss"/>
</cxf:outInterceptors>
</cxf:bus>
-
+
+ <http:conduit name="http://localhost.*">
+ <http:client Connection="close"/>
+ </http:conduit>
+
</beans>
\ No newline at end of file