You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2007/03/13 15:53:27 UTC
svn commit: r517714 - in /incubator/cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/
systests/src/test/java/org/apache/cxf/systest/ws/rm/
Author: andreasmyth
Date: Tue Mar 13 07:53:26 2007
New Revision: 517714
URL: http://svn.apache.org/viewvc?view=rev&rev=517714
Log:
[JIRA CXF-280] Message specific retransmission behaviour.
Removed:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-deferred.xml
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-no-offer-test.xml
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties?view=diff&rev=517714&r1=517713&r2=517714
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties Tue Mar 13 07:53:26 2007
@@ -22,5 +22,6 @@
RESEND_MSG = WS-RM retransmission of message {0}.
RESEND_CANDIDATES_CONCURRENT_MODIFICATION_MSG = Candidates were acknowledged while iterating for resend.
RESEND_FAILED_MSG = WS-RM retransmission failed.
+SCHEDULE_RESEND_FAILED_MSG = Scheduling of WS-RM retransmission failed.
RESEND_INITIATION_FAILED_MSG = Failed to initiate retransmission.
NO_TRANSPORT_FOR_RESEND_MSG = No transport available for WS-RM retransmission.
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=517714&r1=517713&r2=517714
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue Mar 13 07:53:26 2007
@@ -26,12 +26,10 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.ConcurrentModificationException;
+import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
@@ -44,7 +42,11 @@
import org.apache.cxf.io.CachedOutputStreamCallback;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.policy.AssertionInfo;
+import org.apache.cxf.ws.policy.AssertionInfoMap;
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMConstants;
import org.apache.cxf.ws.rm.RMContextUtils;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMMessageConstants;
@@ -65,8 +67,6 @@
private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
private Resender resender;
- private Runnable resendInitiator;
- private Timer timer;
private RMManager manager;
public RetransmissionQueueImpl(RMManager m) {
@@ -80,14 +80,84 @@
public void setManager(RMManager m) {
manager = m;
}
+
+ /**
+ * Returns the base retransmission interval for the specified message.
+ * This is obtained as the minimum base retransmission interval in all RMAssertions pertaining
+ * to the message, or the default configured for the RMManager if there are no such policy
+ * assertions.
+ * @param message the message
+ * @return the base retransmission interval for the message
+ */
+ public long getBaseRetransmissionInterval(Message message) {
+ AssertionInfoMap amap = message.get(AssertionInfoMap.class);
+ boolean initialised = false;
+ long baseRetransmissionInterval = 0;
+ if (null != amap) {
+ Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
+ if (null != ais) {
+ for (AssertionInfo ai : ais) {
+ JaxbAssertion<RMAssertion> ja = getAssertion(ai);
+ RMAssertion rma = ja.getData();
+ RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+ if (null == bri) {
+ continue;
+ }
+ BigInteger bival = bri.getMilliseconds();
+ if (null == bival) {
+ continue;
+ }
+ long lval = bival.longValue();
+ if (initialised && lval < baseRetransmissionInterval) {
+ baseRetransmissionInterval = lval;
+ } else {
+ baseRetransmissionInterval = lval;
+ }
+ initialised = true;
- public long getBaseRetransmissionInterval() {
- RMAssertion rma = null == manager ? null : manager.getRMAssertion();
- if (null != rma && null != rma.getBaseRetransmissionInterval()
- && null != rma.getBaseRetransmissionInterval().getMilliseconds()) {
- return rma.getBaseRetransmissionInterval().getMilliseconds().longValue();
+ }
+ }
+ }
+ if (!initialised) {
+ RMAssertion rma = manager.getRMAssertion();
+ RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+ if (null != bri) {
+ BigInteger bival = bri.getMilliseconds();
+ if (null != bival) {
+ baseRetransmissionInterval = bival.longValue();
+ }
+ }
}
- return new BigInteger(DEFAULT_BASE_RETRANSMISSION_INTERVAL).longValue();
+ return baseRetransmissionInterval;
+ }
+
+ /**
+ * Determines if exponential backoff should be used in repeated attemprs to resend
+ * the specified message.
+ * Returns false if there is at least one RMAssertion for this message indicating that no
+ * exponential backoff algorithm should be used, or true otherwise.
+ * @param message the message
+ * @return true iff the exponential backoff algorithm should be used for the message
+ */
+ public boolean useExponentialBackoff(Message message) {
+ AssertionInfoMap amap = message.get(AssertionInfoMap.class);
+ if (null != amap) {
+ Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
+ if (null != ais) {
+ for (AssertionInfo ai : ais) {
+ JaxbAssertion<RMAssertion> ja = getAssertion(ai);
+ RMAssertion rma = ja.getData();
+ if (null == rma.getExponentialBackoff()) {
+ return false;
+ }
+ }
+ }
+ }
+ RMAssertion rma = manager.getRMAssertion();
+ if (null == rma.getExponentialBackoff()) {
+ return false;
+ }
+ return true;
}
public void addUnacknowledged(Message message) {
@@ -112,7 +182,6 @@
public void populate(Collection<SourceSequence> sss) {
// TODO Auto-generated method stub
-
}
/**
@@ -151,41 +220,23 @@
/**
* Initiate resends.
- *
- * @param queue the work queue providing async execution
*/
public void start() {
- if (null != timer) {
+ if (null != resender) {
return;
}
LOG.fine("Starting retransmission queue");
- // setup resender
- if (null == resender) {
- resender = getDefaultResender();
- }
- // start resend initiator
- TimerTask task = new TimerTask() {
- public void run() {
- getResendInitiator().run();
- }
- };
- timer = new Timer();
- // TODO
- // delay starting the queue to give the first request a chance to be sent before
- // waiting for another period.
- timer.schedule(task, getBaseRetransmissionInterval() / 2, getBaseRetransmissionInterval());
+ // setup resender
+
+ resender = getDefaultResender();
}
/**
* Stops retransmission queue.
*/
public void stop() {
- if (null != timer) {
- LOG.fine("Stopping retransmission queue");
- timer.cancel();
- timer = null;
- }
+ // no-op
}
/**
@@ -196,16 +247,6 @@
}
/**
- * @return the ResendInitiator
- */
- protected Runnable getResendInitiator() {
- if (resendInitiator == null) {
- resendInitiator = new ResendInitiator();
- }
- return resendInitiator;
- }
-
- /**
* @param message the message context
* @return a ResendCandidate
*/
@@ -305,50 +346,15 @@
}
/**
- * Manages scheduling of resend attempts. A single task runs every base
- * transmission interval, determining which resend candidates are due a
- * resend attempt.
- */
- protected class ResendInitiator implements Runnable {
- public void run() {
- // iterate over resend candidates, resending any that are due
- synchronized (RetransmissionQueueImpl.this) {
- LOG.fine("Starting ResendInitiator on thread " + Thread.currentThread());
- Iterator<Map.Entry<String, List<ResendCandidate>>> sequences = candidates.entrySet()
- .iterator();
- while (sequences.hasNext()) {
- Iterator<ResendCandidate> sequenceCandidates = sequences.next().getValue().iterator();
- boolean requestAck = true;
- try {
- while (sequenceCandidates.hasNext()) {
- ResendCandidate candidate = sequenceCandidates.next();
- if (candidate.isDue()) {
- candidate.initiate(requestAck);
- requestAck = false;
- }
- }
- } catch (ConcurrentModificationException ex) {
- // TODO:
- // can happen if resend occurs on same thread as resend initiation
- // i.e. when endpoint's executor executes on current thread
- LOG.log(Level.WARNING, "RESEND_CANDIDATES_CONCURRENT_MODIFICATION_MSG");
- }
- }
- LOG.fine("Completed ResendInitiator");
- }
-
- }
- }
-
- /**
- * Represents a candidate for resend, i.e. an unacked outgoing message. When
- * this is determined as due another resend attempt, an asynchronous task is
- * scheduled for this purpose.
+ * Represents a candidate for resend, i.e. an unacked outgoing message.
*/
protected class ResendCandidate implements Runnable {
private Message message;
- private int skips;
- private int skipped;
+ private Date next;
+ private TimerTask nextTask;
+ private int resends;
+ private long nextInterval;
+ private long backoff;
private boolean pending;
private boolean includeAckRequested;
@@ -357,13 +363,39 @@
*/
protected ResendCandidate(Message m) {
message = m;
- skipped = -1;
- skips = 1;
+ resends = 0;
+ long baseRetransmissionInterval = getBaseRetransmissionInterval(m);
+ backoff = useExponentialBackoff(m) ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
+ next = new Date(System.currentTimeMillis() + baseRetransmissionInterval);
+ nextInterval = baseRetransmissionInterval * backoff;
+ if (null != manager.getTimer()) {
+ schedule();
+ }
}
+
/**
- * Async resend logic.
+ * Initiate resend asynchronsly.
+ *
+ * @param requestAcknowledge true if a AckRequest header is to be sent
+ * with resend
*/
+ protected void initiate(boolean requestAcknowledge) {
+ includeAckRequested = requestAcknowledge;
+ pending = true;
+ Endpoint ep = message.getExchange().get(Endpoint.class);
+ Executor executor = ep.getExecutor();
+ if (null == executor) {
+ executor = ep.getService().getExecutor();
+ }
+ LOG.log(Level.FINE, "Using executor {0}", executor.getClass().getName());
+ try {
+ executor.execute(this);
+ } catch (RejectedExecutionException ex) {
+ LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
+ }
+ }
+
public void run() {
try {
// ensure ACK wasn't received while this task was enqueued
@@ -377,48 +409,26 @@
}
}
+
/**
- * @return true if candidate is due a resend REVISIT should bound the
- * max number of resend attampts
+ * @return number of resend attempts
*/
- protected synchronized boolean isDue() {
- boolean due = false;
- // skip count is used to model exponential backoff
- // to avoid gratuitous time evaluation
- if (!pending && ++skipped == skips) {
- skips *= getExponentialBackoff();
- skipped = 0;
- due = true;
- }
- return due;
+ protected int getResends() {
+ return resends;
}
-
+
/**
- * @return if resend attempt is pending
+ * @return date of next resend
*/
- protected synchronized boolean isPending() {
- return pending;
+ protected Date getNext() {
+ return next;
}
/**
- * Initiate resend asynchronsly.
- *
- * @param requestAcknowledge true if a AckRequest header is to be sent
- * with resend
+ * @return if resend attempt is pending
*/
- protected synchronized void initiate(boolean requestAcknowledge) {
- includeAckRequested = requestAcknowledge;
- pending = true;
- Endpoint ep = message.getExchange().get(Endpoint.class);
- Executor executor = ep.getExecutor();
- if (null == executor) {
- executor = ep.getService().getExecutor();
- }
- try {
- executor.execute(this);
- } catch (RejectedExecutionException ex) {
- LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
- }
+ protected synchronized boolean isPending() {
+ return pending;
}
/**
@@ -426,7 +436,10 @@
*/
protected synchronized void resolved() {
pending = false;
- skips = Integer.MAX_VALUE;
+ next = null;
+ if (null != nextTask) {
+ nextTask.cancel();
+ }
}
/**
@@ -437,12 +450,43 @@
}
/**
- * A resend has been attempted.
+ * A resend has been attempted. Schedule the next attempt.
*/
- private synchronized void attempted() {
+ protected synchronized void attempted() {
pending = false;
+ resends++;
+ if (null != next) {
+ next = new Date(next.getTime() + nextInterval);
+ nextInterval *= backoff;
+ schedule();
+ }
+ }
+
+ protected final synchronized void schedule() {
+ if (null == manager.getTimer()) {
+ return;
+ }
+ class ResendTask extends TimerTask {
+ ResendCandidate candidate;
+ ResendTask(ResendCandidate c) {
+ candidate = c;
+ }
+ @Override
+ public void run() {
+ if (!candidate.isPending()) {
+ candidate.initiate(includeAckRequested);
+ }
+ }
+ }
+ nextTask = new ResendTask(this);
+ try {
+ manager.getTimer().schedule(nextTask, next);
+ } catch (IllegalStateException ex) {
+ LOG.log(Level.WARNING, "SCHEDULE_RESEND_FAILED_MSG", ex);
+ }
}
}
+
/**
* Encapsulates actual resend logic (pluggable to facilitate unit testing)
@@ -464,7 +508,7 @@
*/
protected final Resender getDefaultResender() {
return new Resender() {
- public void resend(Message message, boolean requestAcknowledge) {
+ public void resend(Message message, boolean requestAcknowledge) {
RMProperties properties = RMContextUtils.retrieveRMProperties(message, true);
SequenceType st = properties.getSequence();
if (st != null) {
@@ -484,7 +528,7 @@
}
}
};
- };
+ }
/**
* Plug in replacement resend logic (facilitates unit testing).
@@ -493,6 +537,11 @@
*/
protected void replaceResender(Resender replacement) {
resender = replacement;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {
+ return (JaxbAssertion<RMAssertion>)ai.getAssertion();
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?view=diff&rev=517714&r1=517713&r2=517714
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Tue Mar 13 07:53:26 2007
@@ -20,26 +20,28 @@
package org.apache.cxf.ws.rm.soap;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
import java.util.List;
import java.util.concurrent.Executor;
import junit.framework.TestCase;
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
-import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.policy.AssertionInfo;
+import org.apache.cxf.ws.policy.AssertionInfoMap;
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMConstants;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMMessageConstants;
import org.apache.cxf.ws.rm.RMProperties;
import org.apache.cxf.ws.rm.SequenceType;
import org.apache.cxf.ws.rm.SourceSequence;
import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.policy.RMAssertion;
import org.easymock.IMocksControl;
import org.easymock.classextension.EasyMock;
@@ -64,6 +66,7 @@
new ArrayList<Identifier>();
private List<Object> mocks =
new ArrayList<Object>();
+ private RMAssertion rma;
public void setUp() {
control = EasyMock.createNiceControl();
@@ -72,7 +75,8 @@
resender = new TestResender();
queue.replaceResender(resender);
executor = createMock(Executor.class);
-
+ rma = createMock(RMAssertion.class);
+ assertNotNull(executor);
}
public void tearDown() {
@@ -85,18 +89,157 @@
control.reset();
}
+
public void testCtor() {
- ready();
+ ready(false);
assertNotNull("expected unacked map", queue.getUnacknowledged());
assertEquals("expected empty unacked map",
0,
queue.getUnacknowledged().size());
- assertEquals("unexpected base retransmission interval",
- 3000L,
- queue.getBaseRetransmissionInterval());
- assertEquals("unexpected exponential backoff",
- 2,
- queue.getExponentialBackoff());
+
+ queue = new RetransmissionQueueImpl(null);
+ assertNull(queue.getManager());
+ queue.setManager(manager);
+ assertSame("Unexpected RMManager", manager, queue.getManager());
+ }
+
+ public void testGetBaseRetranmissionIntervalFromPolicies() {
+ Message message = createMock(Message.class);
+ AssertionInfoMap aim = createMock(AssertionInfoMap.class);
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
+ AssertionInfo ai1 = createMock(AssertionInfo.class);
+ AssertionInfo ai2 = createMock(AssertionInfo.class);
+ AssertionInfo ai3 = createMock(AssertionInfo.class);
+ AssertionInfo ai4 = createMock(AssertionInfo.class);
+ Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
+ ais.add(ai1);
+ ais.add(ai2);
+ ais.add(ai3);
+ ais.add(ai4);
+ EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
+ JaxbAssertion ja1 = createMock(JaxbAssertion.class);
+ EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
+ RMAssertion rma1 = createMock(RMAssertion.class);
+ EasyMock.expect(ja1.getData()).andReturn(rma1);
+ EasyMock.expect(rma1.getBaseRetransmissionInterval()).andReturn(null);
+ JaxbAssertion ja2 = createMock(JaxbAssertion.class);
+ EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
+ RMAssertion rma2 = createMock(RMAssertion.class);
+ EasyMock.expect(ja2.getData()).andReturn(rma2);
+ RMAssertion.BaseRetransmissionInterval bri2 =
+ createMock(RMAssertion.BaseRetransmissionInterval.class);
+ EasyMock.expect(rma2.getBaseRetransmissionInterval()).andReturn(bri2);
+ EasyMock.expect(bri2.getMilliseconds()).andReturn(null);
+ JaxbAssertion ja3 = createMock(JaxbAssertion.class);
+ EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
+ RMAssertion rma3 = createMock(RMAssertion.class);
+ EasyMock.expect(ja3.getData()).andReturn(rma3);
+ RMAssertion.BaseRetransmissionInterval bri3 =
+ createMock(RMAssertion.BaseRetransmissionInterval.class);
+ EasyMock.expect(rma3.getBaseRetransmissionInterval()).andReturn(bri3);
+ EasyMock.expect(bri3.getMilliseconds()).andReturn(new BigInteger("10000"));
+ JaxbAssertion ja4 = createMock(JaxbAssertion.class);
+ EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
+ RMAssertion rma4 = createMock(RMAssertion.class);
+ EasyMock.expect(ja4.getData()).andReturn(rma4);
+ RMAssertion.BaseRetransmissionInterval bri4 =
+ createMock(RMAssertion.BaseRetransmissionInterval.class);
+ EasyMock.expect(rma4.getBaseRetransmissionInterval()).andReturn(bri4);
+ EasyMock.expect(bri4.getMilliseconds()).andReturn(new BigInteger("5000"));
+
+ control.replay();
+ assertEquals("Unexpected value for base retransmission interval",
+ 5000, queue.getBaseRetransmissionInterval(message));
+ }
+
+ public void testGetBaseRetransmissionIntervalFromManager() {
+ Message message = createMock(Message.class);
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+ EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(null);
+ control.replay();
+ assertEquals("Unexpected value for base retransmission interval",
+ 0, queue.getBaseRetransmissionInterval(message));
+ control.verify();
+ control.reset();
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+ RMAssertion.BaseRetransmissionInterval bri = createMock(RMAssertion.BaseRetransmissionInterval.class);
+ EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
+ EasyMock.expect(bri.getMilliseconds()).andReturn(null);
+ control.replay();
+ assertEquals("Unexpected value for base retransmission interval",
+ 0, queue.getBaseRetransmissionInterval(message));
+ control.verify();
+ control.reset();
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+ EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
+ EasyMock.expect(bri.getMilliseconds()).andReturn(new BigInteger("7000"));
+ control.replay();
+ assertEquals("Unexpected value for base retransmission interval",
+ 7000, queue.getBaseRetransmissionInterval(message));
+ }
+
+ public void testUseExponentialBackoff() {
+ Message message = createMock(Message.class);
+ AssertionInfoMap aim = createMock(AssertionInfoMap.class);
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
+ AssertionInfo ai = createMock(AssertionInfo.class);
+ Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
+ EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
+ ais.add(ai);
+ JaxbAssertion ja = createMock(JaxbAssertion.class);
+ EasyMock.expect(ai.getAssertion()).andReturn(ja);
+ EasyMock.expect(ja.getData()).andReturn(rma);
+ EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
+ control.replay();
+ assertTrue("Should not use exponential backoff", !queue.useExponentialBackoff(message));
+ control.verify();
+ control.reset();
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+ EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
+ control.replay();
+ assertTrue("Should not use exponential backoff", !queue.useExponentialBackoff(message));
+ control.verify();
+ control.reset();
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+ RMAssertion.ExponentialBackoff eb = createMock(RMAssertion.ExponentialBackoff.class);
+ EasyMock.expect(rma.getExponentialBackoff()).andReturn(eb);
+ control.replay();
+ assertTrue("Should use exponential backoff", queue.useExponentialBackoff(message));
+ }
+
+ public void testResendCandidateCtor() {
+ Message message = createMock(Message.class);
+ setupMessagePolicies(message);
+ control.replay();
+ long now = System.currentTimeMillis();
+ RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
+ assertSame(message, candidate.getMessage());
+ assertEquals(0, candidate.getResends());
+ Date refDate = new Date(now + 5000);
+ assertTrue(!candidate.getNext().before(refDate));
+ refDate = new Date(now + 7000);
+ assertTrue(!candidate.getNext().after(refDate));
+ assertTrue(!candidate.isPending());
+ }
+
+ public void testResendCandidateAttempted() {
+ Message message = createMock(Message.class);
+ setupMessagePolicies(message);
+ ready(true);
+ long now = System.currentTimeMillis();
+ RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
+ candidate.attempted();
+ assertEquals(1, candidate.getResends());
+ Date refDate = new Date(now + 15000);
+ assertTrue(!candidate.getNext().before(refDate));
+ refDate = new Date(now + 17000);
+ assertTrue(!candidate.getNext().after(refDate));
+ assertTrue(!candidate.isPending());
}
public void testCacheUnacknowledged() {
@@ -104,7 +247,11 @@
Message message2 = setUpMessage("sequence2");
Message message3 = setUpMessage("sequence1");
- ready();
+ setupMessagePolicies(message1);
+ setupMessagePolicies(message2);
+ setupMessagePolicies(message3);
+
+ ready(false);
assertNotNull("expected resend candidate",
queue.cacheUnacknowledged(message1));
@@ -153,11 +300,14 @@
queue.getUnacknowledged().put("sequence1", sequenceList);
Message message1 =
setUpMessage("sequence1", messageNumbers[0]);
- sequenceList.add(queue.createResendCandidate(message1));
+ setupMessagePolicies(message1);
Message message2 =
setUpMessage("sequence1", messageNumbers[1]);
+ setupMessagePolicies(message2);
+ ready(false);
+
+ sequenceList.add(queue.createResendCandidate(message1));
sequenceList.add(queue.createResendCandidate(message2));
- ready();
queue.purgeAcknowledged(sequence);
assertEquals("unexpected unacked map size",
@@ -178,11 +328,14 @@
queue.getUnacknowledged().put("sequence1", sequenceList);
Message message1 =
setUpMessage("sequence1", messageNumbers[0]);
- sequenceList.add(queue.createResendCandidate(message1));
+ setupMessagePolicies(message1);
Message message2 =
setUpMessage("sequence1", messageNumbers[1]);
+ setupMessagePolicies(message2);
+ ready(false);
+
+ sequenceList.add(queue.createResendCandidate(message1));
sequenceList.add(queue.createResendCandidate(message2));
- ready();
queue.purgeAcknowledged(sequence);
assertEquals("unexpected unacked map size",
@@ -194,7 +347,7 @@
}
public void testIsEmpty() {
- ready();
+ ready(false);
assertTrue("queue is not empty" , queue.isEmpty());
}
@@ -209,11 +362,14 @@
queue.getUnacknowledged().put("sequence1", sequenceList);
Message message1 =
setUpMessage("sequence1", messageNumbers[0], false);
- sequenceList.add(queue.createResendCandidate(message1));
+ setupMessagePolicies(message1);
Message message2 =
setUpMessage("sequence1", messageNumbers[1], false);
+ setupMessagePolicies(message1);
+ ready(false);
+
+ sequenceList.add(queue.createResendCandidate(message1));
sequenceList.add(queue.createResendCandidate(message2));
- ready();
assertEquals("unexpected unacked count",
2,
@@ -226,234 +382,19 @@
SourceSequence sequence = setUpSequence("sequence1",
messageNumbers,
null);
- ready();
+ ready(false);
assertEquals("unexpected unacked count",
0,
queue.countUnacknowledged(sequence));
}
- public void xtestPopulate() {
-
- /*
- Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
- Collection<RMMessage> msgs = new ArrayList<RMMessage>();
- // List<Handler> handlerChain = new ArrayList<Handler>();
-
- RMStore store = createMock(RMStore.class);
- handler.getStore();
- EasyMock.expectLastCall().andReturn(store);
- SourceSequence ss = control.createMock(SourceSequence.class);
- sss.add(ss);
- Identifier id = control.createMock(Identifier.class);
- ss.getIdentifier();
- EasyMock.expectLastCall().andReturn(id);
- RMMessage msg = control.createMock(RMMessage.class);
- msgs.add(msg);
- store.getMessages(id, true);
- EasyMock.expectLastCall().andReturn(msgs);
- MessageContext context = control.createMock(MessageContext.class);
- msg.getContext();
- EasyMock.expectLastCall().andReturn(context);
-
- RMSoapHandler rmh = control.createMock(RMSoapHandler.class);
- MAPCodec wsah = control.createMock(MAPCodec.class);
-
- handler.getWsaSOAPHandler();
- EasyMock.expectLastCall().andReturn(wsah);
- handler.getRMSoapHandler();
- EasyMock.expectLastCall().andReturn(rmh);
- RMProperties rmps = control.createMock(RMProperties.class);
- rmh.unmarshalRMProperties(null);
- EasyMock.expectLastCall().andReturn(rmps);
- AddressingProperties maps = control.createMock(AddressingProperties.class);
- wsah.unmarshalMAPs(null);
- EasyMock.expectLastCall().andReturn(maps);
- SequenceType st = control.createMock(SequenceType.class);
- rmps.getSequence();
- EasyMock.expectLastCall().andReturn(st);
- st.getIdentifier();
- EasyMock.expectLastCall().andReturn(id);
- id.getValue();
- EasyMock.expectLastCall().andReturn("sequence1");
- ready();
-
- queue.populate(sss);
-
- assertTrue("queue is empty", !queue.isEmpty());
- */
- }
-
- public void testResendInitiatorBackoffLogic() {
- Message message1 = setUpMessage("sequence1");
- Message message2 = setUpMessage("sequence2");
- Message message3 = setUpMessage("sequence1");
-
- ready();
- RetransmissionQueueImpl.ResendCandidate candidate1 =
- queue.cacheUnacknowledged(message1);
- RetransmissionQueueImpl.ResendCandidate candidate2 =
- queue.cacheUnacknowledged(message2);
- RetransmissionQueueImpl.ResendCandidate candidate3 =
- queue.cacheUnacknowledged(message3);
- RetransmissionQueueImpl.ResendCandidate[] allCandidates =
- {candidate1, candidate2, candidate3};
- boolean [] expectAckRequested = {true, true, false};
-
- // initial run => none due
- runInitiator();
-
- // all 3 candidates due
- runInitiator(allCandidates);
- runCandidates(allCandidates, expectAckRequested);
-
- // exponential backoff => none due
- runInitiator();
-
- // all 3 candidates due
- runInitiator(allCandidates);
- runCandidates(allCandidates, expectAckRequested);
-
- for (int i = 0; i < 3; i++) {
- // exponential backoff => none due
- runInitiator();
- }
-
- // all 3 candidates due
- runInitiator(allCandidates);
- runCandidates(allCandidates, expectAckRequested);
-
- for (int i = 0; i < 7; i++) {
- // exponential backoff => none due
- runInitiator();
- }
-
- // all 3 candidates due
- runInitiator(allCandidates);
- runCandidates(allCandidates, expectAckRequested);
- }
-
-
- public void testResendInitiatorDueLogic() {
- Message message1 = setUpMessage("sequence1");
- Message message2 = setUpMessage("sequence2");
- Message message3 = setUpMessage("sequence1");
- ready();
- RetransmissionQueueImpl.ResendCandidate candidate1 =
- queue.cacheUnacknowledged(message1);
- RetransmissionQueueImpl.ResendCandidate candidate2 =
- queue.cacheUnacknowledged(message2);
- RetransmissionQueueImpl.ResendCandidate candidate3 =
- queue.cacheUnacknowledged(message3);
- RetransmissionQueueImpl.ResendCandidate[] allCandidates =
- {candidate1, candidate2, candidate3};
- boolean [] expectAckRequested = {true, true, false};
-
- // initial run => none due
- runInitiator();
-
- // all 3 candidates due
- runInitiator(allCandidates);
-
- // all still pending => none due
- runInitiator();
-
- candidate1.run();
- candidate2.run();
-
- // exponential backoff => none due
- runInitiator();
-
- // candidates 1 & 2 run => only these due
- runInitiator(new RetransmissionQueueImpl.ResendCandidate[] {candidate1, candidate2});
-
- runCandidates(allCandidates, expectAckRequested);
-
- // exponential backoff => none due
- runInitiator();
-
- // candidates 3 run belatedly => now due
- runInitiator(new RetransmissionQueueImpl.ResendCandidate[] {candidate3});
-
- // exponential backoff => none due
- runInitiator();
-
- // candidates 1 & 2 now due
- runInitiator(new RetransmissionQueueImpl.ResendCandidate[] {candidate1, candidate2});
- }
-
- public void testResendInitiatorResolvedLogic() {
- Message message1 = setUpMessage("sequence1");
- Message message2 = setUpMessage("sequence2");
- Message message3 = setUpMessage("sequence1");
- ready();
- RetransmissionQueueImpl.ResendCandidate candidate1 =
- queue.cacheUnacknowledged(message1);
- RetransmissionQueueImpl.ResendCandidate candidate2 =
- queue.cacheUnacknowledged(message2);
- RetransmissionQueueImpl.ResendCandidate candidate3 =
- queue.cacheUnacknowledged(message3);
- RetransmissionQueueImpl.ResendCandidate[] allCandidates =
- {candidate1, candidate2, candidate3};
- boolean [] expectAckRequested = {true, true, false};
-
- // initial run => none due
- runInitiator();
-
- // all 3 candidates due
- runInitiator(allCandidates);
- runCandidates(allCandidates, expectAckRequested);
-
- // exponential backoff => none due
- runInitiator();
-
- candidate1.resolved();
- candidate3.resolved();
-
- // candidates 1 & 3 resolved => only candidate2 due
- runInitiator(new RetransmissionQueueImpl.ResendCandidate[] {candidate2});
- }
-
- public void testResenderInitiatorReschedule() {
- ready();
-
- runInitiator();
- }
-
- public void xtestResenderInitiatorNoRescheduleOnShutdown() {
- /*
- ready();
-
- queue.shutdown();
- queue.getResendInitiator().run();
- */
- }
-
- public void testDefaultResenderClient() throws Exception {
- doTestDefaultResender(true);
+ public void testStartStop() {
+ control.replay();
+ queue.start();
+ queue.stop();
}
- public void xtestDefaultResenderServer() throws Exception {
- doTestDefaultResender(false);
- }
-
- private void doTestDefaultResender(boolean isRequestor) throws Exception {
- Message message1 = setUpMessage("sequence1");
- queue.replaceResender(queue.getDefaultResender());
- ready();
- RetransmissionQueueImpl.ResendCandidate candidate1 =
- queue.cacheUnacknowledged(message1);
- RetransmissionQueueImpl.ResendCandidate[] allCandidates = {candidate1};
-
- // initial run => none due
- runInitiator();
-
- // single candidate due
- runInitiator(allCandidates);
- setUpDefaultResender(0, isRequestor, message1);
- allCandidates[0].run();
- }
-
private Message setUpMessage(String sid) {
return setUpMessage(sid, null);
}
@@ -475,191 +416,23 @@
return message;
}
-
- /*
- private void setupContextMessage(ObjectMessageContext context) throws Exception {
- SOAPMessage message = createMock(SOAPMessage.class);
- context.get("org.apache.cxf.bindings.soap.message");
- EasyMock.expectLastCall().andReturn(message);
- SOAPPart part = createMock(SOAPPart.class);
- message.getSOAPPart();
- EasyMock.expectLastCall().andReturn(part);
- SOAPEnvelope env = createMock(SOAPEnvelope.class);
- part.getEnvelope();
- EasyMock.expectLastCall().andReturn(env);
- SOAPHeader header = createMock(SOAPHeader.class);
- env.getHeader();
- EasyMock.expectLastCall().andReturn(header).times(2);
- Iterator headerElements = createMock(Iterator.class);
- header.examineAllHeaderElements();
- EasyMock.expectLastCall().andReturn(headerElements);
-
- // RM header element
- headerElements.hasNext();
- EasyMock.expectLastCall().andReturn(true);
- SOAPHeaderElement headerElement = createMock(SOAPHeaderElement.class);
- headerElements.next();
- EasyMock.expectLastCall().andReturn(headerElement);
- Name headerName = createMock(Name.class);
- headerElement.getElementName();
- EasyMock.expectLastCall().andReturn(headerName);
- headerName.getURI();
- EasyMock.expectLastCall().andReturn(Names.WSRM_NAMESPACE_NAME);
- headerElement.detachNode();
- EasyMock.expectLastCall();
-
- // non-RM header element
- headerElements.hasNext();
- EasyMock.expectLastCall().andReturn(true);
- headerElements.next();
- EasyMock.expectLastCall().andReturn(headerElement);
- headerElement.getElementName();
- EasyMock.expectLastCall().andReturn(headerName);
- headerName.getURI();
- EasyMock.expectLastCall().andReturn(Names.WSA_NAMESPACE_NAME);
-
- headerElements.hasNext();
- EasyMock.expectLastCall().andReturn(false);
- }
- */
-
- private void ready() {
- control.replay();
- queue.start();
- }
- private void setUpDefaultResender(int i,
- boolean isRequestor,
- Message context)
- throws Exception {
- assertTrue("too few contexts", i < messages.size());
- assertTrue("too few properties", i < properties.size());
- assertTrue("too few sequences", i < sequences.size());
- control.verify();
- control.reset();
-
- messages.get(i).get(RMMessageConstants.RM_PROPERTIES_OUTBOUND);
- EasyMock.expectLastCall().andReturn(properties.get(i)).times(1);
- properties.get(i).getSequence();
- EasyMock.expectLastCall().andReturn(sequences.get(i)).times(1);
-
- messages.get(i).get(Message.REQUESTOR_ROLE);
- EasyMock.expectLastCall().andReturn(Boolean.valueOf(isRequestor));
-
- if (isRequestor) {
- Exchange ex = createMock(Exchange.class);
- messages.get(i).getExchange();
- EasyMock.expectLastCall().andReturn(ex);
- Conduit conduit = createMock(Conduit.class);
- ex.getConduit();
- EasyMock.expectLastCall().andReturn(conduit);
- conduit.send(messages.get(i));
- EasyMock.expectLastCall();
- OutputStream os = createMock(OutputStream.class);
- messages.get(i).getContent(OutputStream.class);
- EasyMock.expectLastCall().andReturn(os).times(2);
- ByteArrayOutputStream saved = createMock(ByteArrayOutputStream.class);
- messages.get(i).get(RMMessageConstants.SAVED_OUTPUT_STREAM);
- EasyMock.expectLastCall().andReturn(saved);
- byte[] content = "the saved message".getBytes();
- saved.toByteArray();
- EasyMock.expectLastCall().andReturn(content);
- os.write(EasyMock.isA(byte[].class), EasyMock.eq(0), EasyMock.eq(content.length));
- EasyMock.expectLastCall();
- os.flush();
- EasyMock.expectLastCall();
- os.close();
- EasyMock.expectLastCall();
- }
- control.replay();
- }
-
- /*
- private void setUpClientDispatch(
- HandlerInvoker handlerInvoker,
- AbstractBindingBase binding,
- OutputStreamMessageContext outputStreamContext,
- AbstractBindingImpl bindingImpl,
- Transport transport) throws Exception {
-
- InputStreamMessageContext inputStreamContext =
- createMock(InputStreamMessageContext.class);
- ((ClientTransport)transport).invoke(outputStreamContext);
- EasyMock.expectLastCall().andReturn(inputStreamContext);
- binding.getBindingImpl();
- EasyMock.expectLastCall().andReturn(bindingImpl);
- bindingImpl.createBindingMessageContext(inputStreamContext);
- MessageContext bindingContext =
- control.createMock(MessageContext.class);
- EasyMock.expectLastCall().andReturn(bindingContext);
- bindingImpl.read(inputStreamContext, bindingContext);
- EasyMock.expectLastCall();
- handlerInvoker.invokeProtocolHandlers(true, bindingContext);
- EasyMock.expectLastCall().andReturn(Boolean.TRUE);
- ObjectMessageContext objectContext = control.createMock(ObjectMessageContext.class);
- binding.createObjectContext();
- EasyMock.expectLastCall().andReturn(objectContext);
- bindingImpl.hasFault(bindingContext);
- EasyMock.expectLastCall().andReturn(false);
- bindingImpl.unmarshal(bindingContext, objectContext, null);
- EasyMock.expectLastCall();
- }
- */
-
- /*
- private void setUpServerDispatch(
- MessageContext bindingContext,
- OutputStreamMessageContext outputStreamContext) {
- DataBindingCallback callback =
- createMock(ServerDataBindingCallback.class);
- bindingContext.get(DATABINDING_CALLBACK_PROPERTY);
- EasyMock.expectLastCall().andReturn(callback);
- OutputStream outputStream = createMock(OutputStream.class);
- outputStreamContext.getOutputStream();
- EasyMock.expectLastCall().andReturn(outputStream);
- }
- */
-
- private void runInitiator() {
- runInitiator(null);
+ private void setupMessagePolicies(Message message) {
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ EasyMock.expect(manager.getRMAssertion()).andReturn(rma).times(2);
+ RMAssertion.BaseRetransmissionInterval bri =
+ createMock(RMAssertion.BaseRetransmissionInterval.class);
+ EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
+ EasyMock.expect(bri.getMilliseconds()).andReturn(new BigInteger("5000"));
+ RMAssertion.ExponentialBackoff eb = createMock(RMAssertion.ExponentialBackoff.class);
+ EasyMock.expect(rma.getExponentialBackoff()).andReturn(eb);
}
- private void runInitiator(
- RetransmissionQueueImpl.ResendCandidate[] dueCandidates) {
- control.verify();
- control.reset();
- for (int i = 0;
- dueCandidates != null && i < dueCandidates.length;
- i++) {
- Exchange ex = createMock(Exchange.class);
- dueCandidates[i].getMessage().getExchange();
- EasyMock.expectLastCall().andReturn(ex);
- Endpoint ep = createMock(Endpoint.class);
- ex.get(Endpoint.class);
- EasyMock.expectLastCall().andReturn(ep);
- ep.getExecutor();
- EasyMock.expectLastCall().andReturn(executor);
- executor.execute(dueCandidates[i]);
- EasyMock.expectLastCall();
- }
-
+ private void ready(boolean doStart) {
control.replay();
- queue.getResendInitiator().run();
- }
-
- private void runCandidates(
- RetransmissionQueueImpl.ResendCandidate[] candidates,
- boolean[] expectAckRequested) {
- for (int i = 0; i < candidates.length; i++) {
- candidates[i].run();
- assertEquals("unexpected request acknowledge",
- expectAckRequested[i],
- resender.includeAckRequested);
- assertSame("unexpected context",
- candidates[i].getMessage(),
- resender.message);
- resender.clear();
+ if (doStart) {
+ queue.start();
}
}
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=517714&r1=517713&r2=517714
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Tue Mar 13 07:53:26 2007
@@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.logging.Logger;
import org.apache.cxf.Bus;
@@ -83,13 +84,16 @@
private boolean doTestOnewayDeferredAnonymousAcks = testAll;
private boolean doTestOnewayDeferredNonAnonymousAcks = testAll;
private boolean doTestOnewayAnonymousAcksSequenceLength1 = testAll;
- private boolean doTestOnewayAnonymousAcksSupressed = testAll;
+ private boolean doTestOnewayAnonymousAcksSuppressed = testAll;
+ private boolean doTestOnewayAnonymousAcksSuppressedAsyncExecutor = testAll;
private boolean doTestTwowayNonAnonymous = testAll;
private boolean doTestTwowayNonAnonymousEndpointSpecific = testAll;
private boolean doTestTwowayNonAnonymousDeferred = testAll;
private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
private boolean doTestOnewayMessageLoss = testAll;
+ private boolean doTestOnewayMessageLossAsyncExecutor = testAll;
private boolean doTestTwowayMessageLoss = testAll;
+ private boolean doTestTwowayMessageLossAsyncExecutor = testAll;
private boolean doTestTwowayNonAnonymousNoOffer = testAll;
private boolean doTestConcurrency = testAll;
@@ -332,15 +336,27 @@
mf.verifyLastMessage(new boolean[] {false, false, false, false, false, false}, false);
mf.verifyAcknowledgements(new boolean[] {false, true, false, false, true, false}, false);
}
-
+
@Test
- public void testOnewayAnonymousAcksSupressed() throws Exception {
+ public void testOnewayAnonymousAcksSuppressed() throws Exception {
+ if (!doTestOnewayAnonymousAcksSuppressed) {
+ return;
+ }
+ testOnewayAnonymousAcksSuppressed(null);
+ }
- if (!doTestOnewayAnonymousAcksSupressed) {
+ @Test
+ public void testOnewayAnonymousAcksSuppressedAsyncExecutor() throws Exception {
+ if (!doTestOnewayAnonymousAcksSuppressedAsyncExecutor) {
return;
}
- setupGreeter("org/apache/cxf/systest/ws/rm/suppressed.xml");
+ testOnewayAnonymousAcksSuppressed(Executors.newSingleThreadExecutor());
+ }
+ private void testOnewayAnonymousAcksSuppressed(Executor executor) throws Exception {
+
+ setupGreeter("org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml", false, executor);
+
greeter.greetMeOneWay("once");
greeter.greetMeOneWay("twice");
greeter.greetMeOneWay("thrice");
@@ -604,12 +620,26 @@
expected[5] = true;
mf.verifyAcknowledgements(expected, false);
}
+
@Test
public void testOnewayMessageLoss() throws Exception {
if (!doTestOnewayMessageLoss) {
return;
}
- setupGreeter("org/apache/cxf/systest/ws/rm/message-loss.xml");
+ testOnewayMessageLoss(null);
+ }
+
+ @Test
+ public void testOnewayMessageLossAsyncExecutor() throws Exception {
+ if (!doTestOnewayMessageLossAsyncExecutor) {
+ return;
+ }
+ testOnewayMessageLoss(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayMessageLoss(Executor executor) throws Exception {
+
+ setupGreeter("org/apache/cxf/systest/ws/rm/message-loss.xml", false, executor);
greeterBus.getOutInterceptors().add(new MessageLossSimulator());
RMManager manager = greeterBus.getExtension(RMManager.class);
@@ -653,13 +683,26 @@
mf.verifyAcknowledgements(new boolean[] {false, true, true, true, true}, false);
}
-
+
@Test
public void testTwowayMessageLoss() throws Exception {
if (!doTestTwowayMessageLoss) {
return;
}
- setupGreeter("org/apache/cxf/systest/ws/rm/message-loss.xml", true);
+ testTwowayMessageLoss(null);
+ }
+
+ @Test
+ public void testTwowayMessageLossAsyncExecutor() throws Exception {
+ if (!doTestTwowayMessageLossAsyncExecutor) {
+ return;
+ }
+ testTwowayMessageLoss(Executors.newSingleThreadExecutor());
+ }
+
+ private void testTwowayMessageLoss(Executor executor) throws Exception {
+
+ setupGreeter("org/apache/cxf/systest/ws/rm/message-loss.xml", true, executor);
greeterBus.getOutInterceptors().add(new MessageLossSimulator());
RMManager manager = greeterBus.getExtension(RMManager.class);