You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2007/03/13 15:53:27 UTC

svn commit: r517714 - in /incubator/cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/ systests/src/test/java/org/apache/cxf/systest/ws/rm/

Author: andreasmyth
Date: Tue Mar 13 07:53:26 2007
New Revision: 517714

URL: http://svn.apache.org/viewvc?view=rev&rev=517714
Log:
[JIRA CXF-280] Message specific retransmission behaviour.

Removed:
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-deferred.xml
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-no-offer-test.xml
Modified:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties?view=diff&rev=517714&r1=517713&r2=517714
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties Tue Mar 13 07:53:26 2007
@@ -22,5 +22,6 @@
 RESEND_MSG = WS-RM retransmission of message {0}.
 RESEND_CANDIDATES_CONCURRENT_MODIFICATION_MSG = Candidates were acknowledged while iterating for resend.
 RESEND_FAILED_MSG = WS-RM retransmission failed.
+SCHEDULE_RESEND_FAILED_MSG = Scheduling of WS-RM retransmission failed.
 RESEND_INITIATION_FAILED_MSG = Failed to initiate retransmission.
 NO_TRANSPORT_FOR_RESEND_MSG = No transport available for WS-RM retransmission.

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=517714&r1=517713&r2=517714
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue Mar 13 07:53:26 2007
@@ -26,12 +26,10 @@
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.ConcurrentModificationException;
+import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
@@ -44,7 +42,11 @@
 import org.apache.cxf.io.CachedOutputStreamCallback;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.policy.AssertionInfo;
+import org.apache.cxf.ws.policy.AssertionInfoMap;
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
 import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMConstants;
 import org.apache.cxf.ws.rm.RMContextUtils;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
@@ -65,8 +67,6 @@
     
     private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
     private Resender resender;
-    private Runnable resendInitiator;
-    private Timer timer;
     private RMManager manager;
     
     public RetransmissionQueueImpl(RMManager m) {
@@ -80,14 +80,84 @@
     public void setManager(RMManager m) {
         manager = m;
     }
+    
+    /**
+     * Returns the base retransmission interval for the specified message.
+     * This is obtained as the minimum base retransmission interval in all RMAssertions pertaining
+     * to the message, or the default configured for the RMManager if there are no such policy
+     * assertions.
+     * @param message the message
+     * @return the base retransmission interval for the message
+     */
+    public long getBaseRetransmissionInterval(Message message) {
+        AssertionInfoMap amap =  message.get(AssertionInfoMap.class);
+        boolean initialised = false;
+        long baseRetransmissionInterval = 0;
+        if (null != amap) {
+            Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
+            if (null != ais) {
+                for (AssertionInfo ai : ais) {
+                    JaxbAssertion<RMAssertion> ja = getAssertion(ai);
+                    RMAssertion rma = ja.getData();
+                    RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+                    if (null == bri) {
+                        continue;
+                    }
+                    BigInteger bival = bri.getMilliseconds();
+                    if (null == bival) {
+                        continue;
+                    }
+                    long lval = bival.longValue();
+                    if (initialised && lval < baseRetransmissionInterval) {
+                        baseRetransmissionInterval = lval;
+                    } else {
+                        baseRetransmissionInterval = lval;
+                    }
+                    initialised = true;
 
-    public long getBaseRetransmissionInterval() {
-        RMAssertion rma = null == manager ? null : manager.getRMAssertion();
-        if (null != rma && null != rma.getBaseRetransmissionInterval()
-            && null != rma.getBaseRetransmissionInterval().getMilliseconds()) {
-            return rma.getBaseRetransmissionInterval().getMilliseconds().longValue();
+                }
+            }
+        }
+        if (!initialised) {
+            RMAssertion rma = manager.getRMAssertion();
+            RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+            if (null != bri) {
+                BigInteger bival = bri.getMilliseconds();
+                if (null != bival) {
+                    baseRetransmissionInterval = bival.longValue();
+                }
+            }
         }
-        return new BigInteger(DEFAULT_BASE_RETRANSMISSION_INTERVAL).longValue();
+        return baseRetransmissionInterval;
+    }
+    
+    /**
+     * Determines if exponential backoff should be used in repeated attemprs to resend
+     * the specified message. 
+     * Returns false if there is at least one RMAssertion for this message indicating that no  
+     * exponential backoff algorithm should be used, or true otherwise.
+     * @param message the message
+     * @return true iff the exponential backoff algorithm should be used for the message
+     */
+    public boolean useExponentialBackoff(Message message) {
+        AssertionInfoMap amap =  message.get(AssertionInfoMap.class);
+        if (null != amap) {
+            Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
+            if (null != ais) {
+                for (AssertionInfo ai : ais) {
+                    JaxbAssertion<RMAssertion> ja = getAssertion(ai);
+                    RMAssertion rma = ja.getData();
+                    if (null == rma.getExponentialBackoff()) {
+                        return false;
+                    }
+                }
+            }
+        }
+        RMAssertion rma = manager.getRMAssertion();
+        if (null == rma.getExponentialBackoff()) {
+            return false;
+        }
+        return true;
     }
 
     public void addUnacknowledged(Message message) {
@@ -112,7 +182,6 @@
 
     public void populate(Collection<SourceSequence> sss) {
         // TODO Auto-generated method stub
-        
     }
 
     /**
@@ -151,41 +220,23 @@
 
     /**
      * Initiate resends.
-     * 
-     * @param queue the work queue providing async execution
      */
     public void start() {
-        if (null != timer) {
+        if (null != resender) {
             return;
         }
         LOG.fine("Starting retransmission queue");
-        // setup resender
-        if (null == resender) {
-            resender = getDefaultResender();
-        }
         
-        // start resend initiator
-        TimerTask task = new TimerTask() {
-            public void run() {
-                getResendInitiator().run();
-            }
-        };
-        timer = new Timer();
-        // TODO
-        // delay starting the queue to give the first request a chance to be sent before 
-        // waiting for another period.
-        timer.schedule(task, getBaseRetransmissionInterval() / 2, getBaseRetransmissionInterval());  
+        // setup resender
+       
+        resender = getDefaultResender();
     }
 
     /**
      * Stops retransmission queue.
      */ 
     public void stop() {
-        if (null != timer) {
-            LOG.fine("Stopping retransmission queue");
-            timer.cancel();
-            timer = null;
-        }  
+        // no-op
     }
     
     /**
@@ -196,16 +247,6 @@
     }
     
     /**
-     * @return the ResendInitiator
-     */
-    protected Runnable getResendInitiator() {
-        if (resendInitiator == null) {
-            resendInitiator = new ResendInitiator();
-        }
-        return resendInitiator;
-    }
-    
-    /**
      * @param message the message context
      * @return a ResendCandidate
      */
@@ -305,50 +346,15 @@
     }
     
     /**
-     * Manages scheduling of resend attempts. A single task runs every base
-     * transmission interval, determining which resend candidates are due a
-     * resend attempt.
-     */
-    protected class ResendInitiator implements Runnable {
-        public void run() {            
-            // iterate over resend candidates, resending any that are due
-            synchronized (RetransmissionQueueImpl.this) {
-                LOG.fine("Starting ResendInitiator on thread " + Thread.currentThread());
-                Iterator<Map.Entry<String, List<ResendCandidate>>> sequences = candidates.entrySet()
-                    .iterator();
-                while (sequences.hasNext()) {
-                    Iterator<ResendCandidate> sequenceCandidates = sequences.next().getValue().iterator();
-                    boolean requestAck = true;
-                    try {
-                        while (sequenceCandidates.hasNext()) {
-                            ResendCandidate candidate = sequenceCandidates.next();
-                            if (candidate.isDue()) {
-                                candidate.initiate(requestAck);
-                                requestAck = false;
-                            }
-                        }
-                    } catch (ConcurrentModificationException ex) {
-                        // TODO: 
-                        // can happen if resend occurs on same thread as resend initiation
-                        // i.e. when endpoint's executor executes on current thread
-                        LOG.log(Level.WARNING, "RESEND_CANDIDATES_CONCURRENT_MODIFICATION_MSG");
-                    }
-                }
-                LOG.fine("Completed ResendInitiator");
-            }
-            
-        }
-    }
-    
-    /**
-     * Represents a candidate for resend, i.e. an unacked outgoing message. When
-     * this is determined as due another resend attempt, an asynchronous task is
-     * scheduled for this purpose.
+     * Represents a candidate for resend, i.e. an unacked outgoing message.
      */
     protected class ResendCandidate implements Runnable {
         private Message message;
-        private int skips;
-        private int skipped;
+        private Date next;
+        private TimerTask nextTask;
+        private int resends;
+        private long nextInterval;
+        private long backoff;
         private boolean pending;
         private boolean includeAckRequested;
 
@@ -357,13 +363,39 @@
          */
         protected ResendCandidate(Message m) {
             message = m;
-            skipped = -1;
-            skips = 1;
+            resends = 0;
+            long baseRetransmissionInterval = getBaseRetransmissionInterval(m);
+            backoff = useExponentialBackoff(m) ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
+            next = new Date(System.currentTimeMillis() + baseRetransmissionInterval);            
+            nextInterval = baseRetransmissionInterval * backoff; 
+            if (null != manager.getTimer()) {
+                schedule();
+            }
         }
 
+        
         /**
-         * Async resend logic.
+         * Initiate resend asynchronsly.
+         * 
+         * @param requestAcknowledge true if a AckRequest header is to be sent
+         *            with resend
          */
+        protected void initiate(boolean requestAcknowledge) {
+            includeAckRequested = requestAcknowledge;
+            pending = true;
+            Endpoint ep = message.getExchange().get(Endpoint.class);
+            Executor executor = ep.getExecutor();
+            if (null == executor) {
+                executor = ep.getService().getExecutor();
+            }
+            LOG.log(Level.FINE, "Using executor {0}", executor.getClass().getName());
+            try {
+                executor.execute(this);
+            } catch (RejectedExecutionException ex) {
+                LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
+            }
+        }
+        
         public void run() {
             try {
                 // ensure ACK wasn't received while this task was enqueued
@@ -377,48 +409,26 @@
             }
         }
 
+        
         /**
-         * @return true if candidate is due a resend REVISIT should bound the
-         *         max number of resend attampts
+         * @return number of resend attempts
          */
-        protected synchronized boolean isDue() {
-            boolean due = false;
-            // skip count is used to model exponential backoff
-            // to avoid gratuitous time evaluation
-            if (!pending && ++skipped == skips) {
-                skips *= getExponentialBackoff();
-                skipped = 0;
-                due = true;
-            }
-            return due;
+        protected int getResends() {
+            return resends;
         }
-
+        
         /**
-         * @return if resend attempt is pending
+         * @return date of next resend
          */
-        protected synchronized boolean isPending() {
-            return pending;
+        protected Date getNext() {
+            return next;
         }
 
         /**
-         * Initiate resend asynchronsly.
-         * 
-         * @param requestAcknowledge true if a AckRequest header is to be sent
-         *            with resend
+         * @return if resend attempt is pending
          */
-        protected synchronized void initiate(boolean requestAcknowledge) {
-            includeAckRequested = requestAcknowledge;
-            pending = true;
-            Endpoint ep = message.getExchange().get(Endpoint.class);
-            Executor executor = ep.getExecutor();
-            if (null == executor) {
-                executor = ep.getService().getExecutor();
-            }
-            try {
-                executor.execute(this);
-            } catch (RejectedExecutionException ex) {
-                LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
-            }
+        protected synchronized boolean isPending() {
+            return pending;
         }
 
         /**
@@ -426,7 +436,10 @@
          */
         protected synchronized void resolved() {
             pending = false;
-            skips = Integer.MAX_VALUE;
+            next = null;
+            if (null != nextTask) {
+                nextTask.cancel();
+            }
         }
 
         /**
@@ -437,12 +450,43 @@
         }
 
         /**
-         * A resend has been attempted.
+         * A resend has been attempted. Schedule the next attempt.
          */
-        private synchronized void attempted() {
+        protected synchronized void attempted() {
             pending = false;
+            resends++;
+            if (null != next) {
+                next = new Date(next.getTime() + nextInterval);
+                nextInterval *= backoff;
+                schedule();
+            }
+        }
+        
+        protected final synchronized void schedule() {
+            if (null == manager.getTimer()) {
+                return;
+            }
+            class ResendTask extends TimerTask {
+                ResendCandidate candidate;
+                ResendTask(ResendCandidate c) {
+                    candidate = c;
+                }      
+                @Override
+                public void run() {
+                    if (!candidate.isPending()) {
+                        candidate.initiate(includeAckRequested);  
+                    }
+                }
+            }
+            nextTask = new ResendTask(this);
+            try {
+                manager.getTimer().schedule(nextTask, next);
+            } catch (IllegalStateException ex) {
+                LOG.log(Level.WARNING, "SCHEDULE_RESEND_FAILED_MSG", ex); 
+            }
         }
     }
+      
     
     /**
      * Encapsulates actual resend logic (pluggable to facilitate unit testing)
@@ -464,7 +508,7 @@
      */
     protected final Resender getDefaultResender() {
         return new Resender() {
-            public void resend(Message message, boolean requestAcknowledge) {                
+            public void resend(Message message, boolean requestAcknowledge) {    
                 RMProperties properties = RMContextUtils.retrieveRMProperties(message, true);
                 SequenceType st = properties.getSequence();
                 if (st != null) {
@@ -484,7 +528,7 @@
                 }
             }
         };
-    };
+    }
     
     /**
      * Plug in replacement resend logic (facilitates unit testing).
@@ -493,6 +537,11 @@
      */
     protected void replaceResender(Resender replacement) {
         resender = replacement;
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {
+        return (JaxbAssertion<RMAssertion>)ai.getAssertion();
     }
 
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?view=diff&rev=517714&r1=517713&r2=517714
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Tue Mar 13 07:53:26 2007
@@ -20,26 +20,28 @@
 
 package org.apache.cxf.ws.rm.soap;
 
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Executor;
 
 import junit.framework.TestCase;
 
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.policy.AssertionInfo;
+import org.apache.cxf.ws.policy.AssertionInfoMap;
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
 import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMConstants;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
 import org.apache.cxf.ws.rm.RMProperties;
 import org.apache.cxf.ws.rm.SequenceType;
 import org.apache.cxf.ws.rm.SourceSequence;
 import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.policy.RMAssertion;
 import org.easymock.IMocksControl;
 import org.easymock.classextension.EasyMock;
 
@@ -64,6 +66,7 @@
         new ArrayList<Identifier>();
     private List<Object> mocks =
         new ArrayList<Object>();
+    private RMAssertion rma;
     
     public void setUp() {
         control = EasyMock.createNiceControl();
@@ -72,7 +75,8 @@
         resender = new TestResender();
         queue.replaceResender(resender);
         executor = createMock(Executor.class);
-        
+        rma = createMock(RMAssertion.class);
+        assertNotNull(executor);
     }
     
     public void tearDown() {
@@ -85,18 +89,157 @@
         control.reset();
     }
     
+    
     public void testCtor() {
-        ready();
+        ready(false);        
         assertNotNull("expected unacked map", queue.getUnacknowledged());
         assertEquals("expected empty unacked map", 
                      0,
                      queue.getUnacknowledged().size());
-        assertEquals("unexpected base retransmission interval",
-                     3000L,
-                     queue.getBaseRetransmissionInterval());
-        assertEquals("unexpected exponential backoff",
-                     2,
-                     queue.getExponentialBackoff());
+        
+        queue = new RetransmissionQueueImpl(null);
+        assertNull(queue.getManager());
+        queue.setManager(manager);
+        assertSame("Unexpected RMManager", manager, queue.getManager());        
+    }
+    
+    public void testGetBaseRetranmissionIntervalFromPolicies() {
+        Message message = createMock(Message.class);
+        AssertionInfoMap aim = createMock(AssertionInfoMap.class);
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
+        AssertionInfo ai1 = createMock(AssertionInfo.class);
+        AssertionInfo ai2 = createMock(AssertionInfo.class);
+        AssertionInfo ai3 = createMock(AssertionInfo.class);
+        AssertionInfo ai4 = createMock(AssertionInfo.class);
+        Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
+        ais.add(ai1);
+        ais.add(ai2);
+        ais.add(ai3);
+        ais.add(ai4);
+        EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
+        JaxbAssertion ja1 = createMock(JaxbAssertion.class);
+        EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
+        RMAssertion rma1 = createMock(RMAssertion.class);
+        EasyMock.expect(ja1.getData()).andReturn(rma1);
+        EasyMock.expect(rma1.getBaseRetransmissionInterval()).andReturn(null);
+        JaxbAssertion ja2 = createMock(JaxbAssertion.class);
+        EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
+        RMAssertion rma2 = createMock(RMAssertion.class);
+        EasyMock.expect(ja2.getData()).andReturn(rma2);
+        RMAssertion.BaseRetransmissionInterval bri2 = 
+            createMock(RMAssertion.BaseRetransmissionInterval.class);
+        EasyMock.expect(rma2.getBaseRetransmissionInterval()).andReturn(bri2);
+        EasyMock.expect(bri2.getMilliseconds()).andReturn(null);
+        JaxbAssertion ja3 = createMock(JaxbAssertion.class);
+        EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
+        RMAssertion rma3 = createMock(RMAssertion.class);
+        EasyMock.expect(ja3.getData()).andReturn(rma3);
+        RMAssertion.BaseRetransmissionInterval bri3 = 
+            createMock(RMAssertion.BaseRetransmissionInterval.class);
+        EasyMock.expect(rma3.getBaseRetransmissionInterval()).andReturn(bri3);
+        EasyMock.expect(bri3.getMilliseconds()).andReturn(new BigInteger("10000"));
+        JaxbAssertion ja4 = createMock(JaxbAssertion.class);
+        EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
+        RMAssertion rma4 = createMock(RMAssertion.class);
+        EasyMock.expect(ja4.getData()).andReturn(rma4);
+        RMAssertion.BaseRetransmissionInterval bri4 = 
+            createMock(RMAssertion.BaseRetransmissionInterval.class);
+        EasyMock.expect(rma4.getBaseRetransmissionInterval()).andReturn(bri4);
+        EasyMock.expect(bri4.getMilliseconds()).andReturn(new BigInteger("5000"));
+        
+        control.replay();
+        assertEquals("Unexpected value for base retransmission interval", 
+                     5000, queue.getBaseRetransmissionInterval(message));
+    }
+    
+    public void testGetBaseRetransmissionIntervalFromManager() {
+        Message message = createMock(Message.class);
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+        EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(null);
+        control.replay();
+        assertEquals("Unexpected value for base retransmission interval", 
+                     0, queue.getBaseRetransmissionInterval(message));
+        control.verify();
+        control.reset();
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+        RMAssertion.BaseRetransmissionInterval bri = createMock(RMAssertion.BaseRetransmissionInterval.class);
+        EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
+        EasyMock.expect(bri.getMilliseconds()).andReturn(null);
+        control.replay();
+        assertEquals("Unexpected value for base retransmission interval", 
+                     0, queue.getBaseRetransmissionInterval(message));
+        control.verify();
+        control.reset();
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+        EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
+        EasyMock.expect(bri.getMilliseconds()).andReturn(new BigInteger("7000"));
+        control.replay();
+        assertEquals("Unexpected value for base retransmission interval", 
+                     7000, queue.getBaseRetransmissionInterval(message));
+    }
+    
+    public void testUseExponentialBackoff() {
+        Message message = createMock(Message.class);
+        AssertionInfoMap aim = createMock(AssertionInfoMap.class);
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
+        AssertionInfo ai = createMock(AssertionInfo.class);
+        Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
+        EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
+        ais.add(ai);
+        JaxbAssertion ja = createMock(JaxbAssertion.class);
+        EasyMock.expect(ai.getAssertion()).andReturn(ja);
+        EasyMock.expect(ja.getData()).andReturn(rma);
+        EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
+        control.replay();
+        assertTrue("Should not use exponential backoff", !queue.useExponentialBackoff(message));
+        control.verify();
+        control.reset();
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+        EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
+        control.replay();
+        assertTrue("Should not use exponential backoff", !queue.useExponentialBackoff(message));
+        control.verify();
+        control.reset();
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
+        RMAssertion.ExponentialBackoff eb = createMock(RMAssertion.ExponentialBackoff.class);
+        EasyMock.expect(rma.getExponentialBackoff()).andReturn(eb);
+        control.replay();
+        assertTrue("Should use exponential backoff", queue.useExponentialBackoff(message));        
+    }
+    
+    public void testResendCandidateCtor() {
+        Message message = createMock(Message.class);
+        setupMessagePolicies(message);
+        control.replay();
+        long now = System.currentTimeMillis();
+        RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
+        assertSame(message, candidate.getMessage());
+        assertEquals(0, candidate.getResends());
+        Date refDate = new Date(now + 5000);
+        assertTrue(!candidate.getNext().before(refDate));
+        refDate = new Date(now + 7000);
+        assertTrue(!candidate.getNext().after(refDate));
+        assertTrue(!candidate.isPending());
+    }
+    
+    public void testResendCandidateAttempted() {
+        Message message = createMock(Message.class);
+        setupMessagePolicies(message);
+        ready(true);
+        long now = System.currentTimeMillis();
+        RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
+        candidate.attempted();
+        assertEquals(1, candidate.getResends());
+        Date refDate = new Date(now + 15000);
+        assertTrue(!candidate.getNext().before(refDate));
+        refDate = new Date(now + 17000);
+        assertTrue(!candidate.getNext().after(refDate));
+        assertTrue(!candidate.isPending());        
     }
     
     public void testCacheUnacknowledged() {
@@ -104,7 +247,11 @@
         Message message2 = setUpMessage("sequence2");
         Message message3 = setUpMessage("sequence1");
         
-        ready();
+        setupMessagePolicies(message1);
+        setupMessagePolicies(message2);
+        setupMessagePolicies(message3);
+        
+        ready(false);
         
         assertNotNull("expected resend candidate",
                       queue.cacheUnacknowledged(message1));
@@ -153,11 +300,14 @@
         queue.getUnacknowledged().put("sequence1", sequenceList);
         Message message1 =
             setUpMessage("sequence1", messageNumbers[0]);
-        sequenceList.add(queue.createResendCandidate(message1));
+        setupMessagePolicies(message1);        
         Message message2 =
             setUpMessage("sequence1", messageNumbers[1]);
+        setupMessagePolicies(message2);
+        ready(false);
+        
+        sequenceList.add(queue.createResendCandidate(message1));
         sequenceList.add(queue.createResendCandidate(message2));
-        ready();
 
         queue.purgeAcknowledged(sequence);
         assertEquals("unexpected unacked map size", 
@@ -178,11 +328,14 @@
         queue.getUnacknowledged().put("sequence1", sequenceList);
         Message message1 =
             setUpMessage("sequence1", messageNumbers[0]);
-        sequenceList.add(queue.createResendCandidate(message1));
+        setupMessagePolicies(message1);        
         Message message2 =
             setUpMessage("sequence1", messageNumbers[1]);
+        setupMessagePolicies(message2);        
+        ready(false);
+        
+        sequenceList.add(queue.createResendCandidate(message1));
         sequenceList.add(queue.createResendCandidate(message2));
-        ready();
 
         queue.purgeAcknowledged(sequence);
         assertEquals("unexpected unacked map size", 
@@ -194,7 +347,7 @@
     }
     
     public void testIsEmpty() {
-        ready();
+        ready(false);
         assertTrue("queue is not empty" , queue.isEmpty());
     }
 
@@ -209,11 +362,14 @@
         queue.getUnacknowledged().put("sequence1", sequenceList);
         Message message1 =
             setUpMessage("sequence1", messageNumbers[0], false);
-        sequenceList.add(queue.createResendCandidate(message1));
+        setupMessagePolicies(message1);        
         Message message2 =
             setUpMessage("sequence1", messageNumbers[1], false);
+        setupMessagePolicies(message1);
+        ready(false);
+        
+        sequenceList.add(queue.createResendCandidate(message1));
         sequenceList.add(queue.createResendCandidate(message2));
-        ready();
 
         assertEquals("unexpected unacked count", 
                      2,
@@ -226,234 +382,19 @@
         SourceSequence sequence = setUpSequence("sequence1",
                                           messageNumbers, 
                                           null);
-        ready();
+        ready(false);
 
         assertEquals("unexpected unacked count", 
                      0,
                      queue.countUnacknowledged(sequence));
     }
     
-    public void xtestPopulate() {
-  
-        /*
-        Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
-        Collection<RMMessage> msgs = new ArrayList<RMMessage>();
-        // List<Handler> handlerChain = new ArrayList<Handler>();
-            
-        RMStore store = createMock(RMStore.class);
-        handler.getStore();
-        EasyMock.expectLastCall().andReturn(store);   
-        SourceSequence ss = control.createMock(SourceSequence.class);
-        sss.add(ss);
-        Identifier id = control.createMock(Identifier.class);
-        ss.getIdentifier();
-        EasyMock.expectLastCall().andReturn(id); 
-        RMMessage msg = control.createMock(RMMessage.class);
-        msgs.add(msg);
-        store.getMessages(id, true);
-        EasyMock.expectLastCall().andReturn(msgs); 
-        MessageContext context = control.createMock(MessageContext.class);
-        msg.getContext();
-        EasyMock.expectLastCall().andReturn(context);
-        
-        RMSoapHandler rmh = control.createMock(RMSoapHandler.class);
-        MAPCodec wsah = control.createMock(MAPCodec.class);
-
-        handler.getWsaSOAPHandler();
-        EasyMock.expectLastCall().andReturn(wsah);
-        handler.getRMSoapHandler();
-        EasyMock.expectLastCall().andReturn(rmh);
-        RMProperties rmps = control.createMock(RMProperties.class);
-        rmh.unmarshalRMProperties(null);
-        EasyMock.expectLastCall().andReturn(rmps);
-        AddressingProperties maps = control.createMock(AddressingProperties.class);
-        wsah.unmarshalMAPs(null);
-        EasyMock.expectLastCall().andReturn(maps);
-        SequenceType st = control.createMock(SequenceType.class);
-        rmps.getSequence();
-        EasyMock.expectLastCall().andReturn(st);
-        st.getIdentifier();
-        EasyMock.expectLastCall().andReturn(id);
-        id.getValue();
-        EasyMock.expectLastCall().andReturn("sequence1");
-        ready();
-        
-        queue.populate(sss);
-        
-        assertTrue("queue is empty", !queue.isEmpty()); 
-        */
-    }
-    
-    public void testResendInitiatorBackoffLogic() {
-        Message message1 = setUpMessage("sequence1");
-        Message message2 = setUpMessage("sequence2");
-        Message message3 = setUpMessage("sequence1");
-        
-        ready();
-        RetransmissionQueueImpl.ResendCandidate candidate1 =
-            queue.cacheUnacknowledged(message1);
-        RetransmissionQueueImpl.ResendCandidate candidate2 =
-            queue.cacheUnacknowledged(message2);
-        RetransmissionQueueImpl.ResendCandidate candidate3 =
-            queue.cacheUnacknowledged(message3);
-        RetransmissionQueueImpl.ResendCandidate[] allCandidates = 
-        {candidate1, candidate2, candidate3};
-        boolean [] expectAckRequested = {true, true, false};
-
-        // initial run => none due
-        runInitiator();
-
-        // all 3 candidates due
-        runInitiator(allCandidates);
-        runCandidates(allCandidates, expectAckRequested);  
-                        
-        // exponential backoff => none due
-        runInitiator();
-        
-        // all 3 candidates due
-        runInitiator(allCandidates);
-        runCandidates(allCandidates, expectAckRequested);
-
-        for (int i = 0; i < 3; i++) {
-            // exponential backoff => none due
-            runInitiator();
-        }
-
-        // all 3 candidates due
-        runInitiator(allCandidates);
-        runCandidates(allCandidates, expectAckRequested);
-        
-        for (int i = 0; i < 7; i++) {
-            // exponential backoff => none due
-            runInitiator();
-        }
-        
-        // all 3 candidates due
-        runInitiator(allCandidates);
-        runCandidates(allCandidates, expectAckRequested);
-    }
-
-
-    public void testResendInitiatorDueLogic() {
-        Message message1 = setUpMessage("sequence1");
-        Message message2 = setUpMessage("sequence2");
-        Message message3 = setUpMessage("sequence1");
-        ready();
-        RetransmissionQueueImpl.ResendCandidate candidate1 =
-            queue.cacheUnacknowledged(message1);
-        RetransmissionQueueImpl.ResendCandidate candidate2 =
-            queue.cacheUnacknowledged(message2);
-        RetransmissionQueueImpl.ResendCandidate candidate3 =
-            queue.cacheUnacknowledged(message3);
-        RetransmissionQueueImpl.ResendCandidate[] allCandidates = 
-        {candidate1, candidate2, candidate3};
-        boolean [] expectAckRequested = {true, true, false};
-
-        // initial run => none due
-        runInitiator();
-
-        // all 3 candidates due
-        runInitiator(allCandidates);
-                
-        // all still pending => none due
-        runInitiator();
-        
-        candidate1.run();
-        candidate2.run();
-        
-        // exponential backoff => none due
-        runInitiator();
-        
-        // candidates 1 & 2 run => only these due
-        runInitiator(new RetransmissionQueueImpl.ResendCandidate[] {candidate1, candidate2});
-
-        runCandidates(allCandidates, expectAckRequested);
-
-        // exponential backoff => none due
-        runInitiator();
-
-        // candidates 3 run belatedly => now due
-        runInitiator(new RetransmissionQueueImpl.ResendCandidate[] {candidate3});
-        
-        // exponential backoff => none due
-        runInitiator();
-
-        // candidates 1 & 2 now due
-        runInitiator(new RetransmissionQueueImpl.ResendCandidate[] {candidate1, candidate2});
-    }
-    
-    public void testResendInitiatorResolvedLogic() {
-        Message message1 = setUpMessage("sequence1");
-        Message message2 = setUpMessage("sequence2");
-        Message message3 = setUpMessage("sequence1");
-        ready();
-        RetransmissionQueueImpl.ResendCandidate candidate1 =
-            queue.cacheUnacknowledged(message1);
-        RetransmissionQueueImpl.ResendCandidate candidate2 =
-            queue.cacheUnacknowledged(message2);
-        RetransmissionQueueImpl.ResendCandidate candidate3 =
-            queue.cacheUnacknowledged(message3);
-        RetransmissionQueueImpl.ResendCandidate[] allCandidates = 
-        {candidate1, candidate2, candidate3};
-        boolean [] expectAckRequested = {true, true, false};
-        
-        // initial run => none due
-        runInitiator();
-
-        // all 3 candidates due
-        runInitiator(allCandidates);
-        runCandidates(allCandidates, expectAckRequested);
-
-        // exponential backoff => none due
-        runInitiator();
-        
-        candidate1.resolved();
-        candidate3.resolved();
-        
-        // candidates 1 & 3 resolved => only candidate2 due
-        runInitiator(new RetransmissionQueueImpl.ResendCandidate[] {candidate2});
-    }
-    
-    public void testResenderInitiatorReschedule() {
-        ready();
-        
-        runInitiator();
-    }
-
-    public void xtestResenderInitiatorNoRescheduleOnShutdown() {
-        /*
-        ready();
-        
-        queue.shutdown();
-        queue.getResendInitiator().run();
-        */
-    }
-    
-    public void testDefaultResenderClient() throws Exception {
-        doTestDefaultResender(true);
+    public void testStartStop() {
+        control.replay();
+        queue.start();
+        queue.stop();
     }
     
-    public void xtestDefaultResenderServer() throws Exception {
-        doTestDefaultResender(false);
-    }
-
-    private void doTestDefaultResender(boolean isRequestor) throws Exception {
-        Message message1 = setUpMessage("sequence1");
-        queue.replaceResender(queue.getDefaultResender());
-        ready();
-        RetransmissionQueueImpl.ResendCandidate candidate1 =
-            queue.cacheUnacknowledged(message1);
-        RetransmissionQueueImpl.ResendCandidate[] allCandidates = {candidate1};
-    
-        // initial run => none due
-        runInitiator();
-    
-        // single candidate due
-        runInitiator(allCandidates);
-        setUpDefaultResender(0, isRequestor, message1);
-        allCandidates[0].run();
-    }
-
     private Message setUpMessage(String sid) {
         return setUpMessage(sid, null);
     }
@@ -475,191 +416,23 @@
         
         return message;
     }
-   
-    /*
-    private void setupContextMessage(ObjectMessageContext context) throws Exception {
-        SOAPMessage message = createMock(SOAPMessage.class);
-        context.get("org.apache.cxf.bindings.soap.message");
-        EasyMock.expectLastCall().andReturn(message);
-        SOAPPart part = createMock(SOAPPart.class);
-        message.getSOAPPart();
-        EasyMock.expectLastCall().andReturn(part);
-        SOAPEnvelope env = createMock(SOAPEnvelope.class);
-        part.getEnvelope();
-        EasyMock.expectLastCall().andReturn(env);
-        SOAPHeader header = createMock(SOAPHeader.class);
-        env.getHeader();
-        EasyMock.expectLastCall().andReturn(header).times(2);
-        Iterator headerElements = createMock(Iterator.class);
-        header.examineAllHeaderElements();
-        EasyMock.expectLastCall().andReturn(headerElements);
-        
-        // RM header element
-        headerElements.hasNext();
-        EasyMock.expectLastCall().andReturn(true);
-        SOAPHeaderElement headerElement = createMock(SOAPHeaderElement.class);
-        headerElements.next();
-        EasyMock.expectLastCall().andReturn(headerElement);
-        Name headerName = createMock(Name.class);
-        headerElement.getElementName();
-        EasyMock.expectLastCall().andReturn(headerName);
-        headerName.getURI();
-        EasyMock.expectLastCall().andReturn(Names.WSRM_NAMESPACE_NAME);
-        headerElement.detachNode();
-        EasyMock.expectLastCall();
-        
-        // non-RM header element
-        headerElements.hasNext();
-        EasyMock.expectLastCall().andReturn(true);
-        headerElements.next();
-        EasyMock.expectLastCall().andReturn(headerElement);
-        headerElement.getElementName();
-        EasyMock.expectLastCall().andReturn(headerName);
-        headerName.getURI();
-        EasyMock.expectLastCall().andReturn(Names.WSA_NAMESPACE_NAME);
-
-        headerElements.hasNext();
-        EasyMock.expectLastCall().andReturn(false);
-    }
-    */
-
-    private void ready() {
-        control.replay();
-        queue.start();
-    }
     
-    private void setUpDefaultResender(int i,
-                                      boolean isRequestor,
-                                      Message context) 
-        throws Exception {
-        assertTrue("too few contexts", i < messages.size());
-        assertTrue("too few properties", i < properties.size());
-        assertTrue("too few sequences", i < sequences.size());
-        control.verify();
-        control.reset();  
-        
-        messages.get(i).get(RMMessageConstants.RM_PROPERTIES_OUTBOUND);
-        EasyMock.expectLastCall().andReturn(properties.get(i)).times(1);
-        properties.get(i).getSequence();
-        EasyMock.expectLastCall().andReturn(sequences.get(i)).times(1);
-        
-        messages.get(i).get(Message.REQUESTOR_ROLE);
-        EasyMock.expectLastCall().andReturn(Boolean.valueOf(isRequestor));
-        
-        if (isRequestor) {
-            Exchange ex = createMock(Exchange.class);
-            messages.get(i).getExchange();
-            EasyMock.expectLastCall().andReturn(ex);
-            Conduit conduit = createMock(Conduit.class);
-            ex.getConduit();
-            EasyMock.expectLastCall().andReturn(conduit);
-            conduit.send(messages.get(i));
-            EasyMock.expectLastCall();
-            OutputStream os = createMock(OutputStream.class);
-            messages.get(i).getContent(OutputStream.class);
-            EasyMock.expectLastCall().andReturn(os).times(2);
-            ByteArrayOutputStream saved = createMock(ByteArrayOutputStream.class);
-            messages.get(i).get(RMMessageConstants.SAVED_OUTPUT_STREAM);
-            EasyMock.expectLastCall().andReturn(saved);
-            byte[] content = "the saved message".getBytes();
-            saved.toByteArray();
-            EasyMock.expectLastCall().andReturn(content);
-            os.write(EasyMock.isA(byte[].class), EasyMock.eq(0), EasyMock.eq(content.length));
-            EasyMock.expectLastCall();
-            os.flush();
-            EasyMock.expectLastCall();
-            os.close();
-            EasyMock.expectLastCall(); 
-        }
-        control.replay();
-    }
-
-    /*
-    private void setUpClientDispatch(
-                              HandlerInvoker handlerInvoker,
-                              AbstractBindingBase binding,
-                              OutputStreamMessageContext outputStreamContext,
-                              AbstractBindingImpl bindingImpl,
-                              Transport transport) throws Exception {
-             
-        InputStreamMessageContext inputStreamContext =
-            createMock(InputStreamMessageContext.class);
-        ((ClientTransport)transport).invoke(outputStreamContext);
-        EasyMock.expectLastCall().andReturn(inputStreamContext);        
-        binding.getBindingImpl();
-        EasyMock.expectLastCall().andReturn(bindingImpl); 
-        bindingImpl.createBindingMessageContext(inputStreamContext);
-        MessageContext bindingContext = 
-            control.createMock(MessageContext.class);
-        EasyMock.expectLastCall().andReturn(bindingContext);        
-        bindingImpl.read(inputStreamContext, bindingContext);
-        EasyMock.expectLastCall();        
-        handlerInvoker.invokeProtocolHandlers(true, bindingContext);
-        EasyMock.expectLastCall().andReturn(Boolean.TRUE);        
-        ObjectMessageContext objectContext = control.createMock(ObjectMessageContext.class);
-        binding.createObjectContext();
-        EasyMock.expectLastCall().andReturn(objectContext);        
-        bindingImpl.hasFault(bindingContext);
-        EasyMock.expectLastCall().andReturn(false);        
-        bindingImpl.unmarshal(bindingContext, objectContext, null);
-        EasyMock.expectLastCall();
-    }
-    */
-
-    /*
-    private void setUpServerDispatch(
-                            MessageContext bindingContext,
-                            OutputStreamMessageContext outputStreamContext) {
-        DataBindingCallback callback =
-            createMock(ServerDataBindingCallback.class);
-        bindingContext.get(DATABINDING_CALLBACK_PROPERTY);
-        EasyMock.expectLastCall().andReturn(callback);
-        OutputStream outputStream = createMock(OutputStream.class);
-        outputStreamContext.getOutputStream();
-        EasyMock.expectLastCall().andReturn(outputStream);
-    }
-    */
-
-    private void runInitiator() {
-        runInitiator(null);
+    private void setupMessagePolicies(Message message) {
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+        EasyMock.expect(manager.getRMAssertion()).andReturn(rma).times(2);
+        RMAssertion.BaseRetransmissionInterval bri = 
+            createMock(RMAssertion.BaseRetransmissionInterval.class);
+        EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
+        EasyMock.expect(bri.getMilliseconds()).andReturn(new BigInteger("5000"));
+        RMAssertion.ExponentialBackoff eb = createMock(RMAssertion.ExponentialBackoff.class);
+        EasyMock.expect(rma.getExponentialBackoff()).andReturn(eb);        
     }
     
-    private void runInitiator(
-                       RetransmissionQueueImpl.ResendCandidate[] dueCandidates) {
-        control.verify();
-        control.reset();
 
-        for (int i = 0; 
-             dueCandidates != null && i < dueCandidates.length;
-             i++) {
-            Exchange ex = createMock(Exchange.class);
-            dueCandidates[i].getMessage().getExchange();
-            EasyMock.expectLastCall().andReturn(ex);
-            Endpoint ep = createMock(Endpoint.class);
-            ex.get(Endpoint.class);
-            EasyMock.expectLastCall().andReturn(ep);
-            ep.getExecutor();
-            EasyMock.expectLastCall().andReturn(executor);
-            executor.execute(dueCandidates[i]);
-            EasyMock.expectLastCall();
-        }
-        
+    private void ready(boolean doStart) {
         control.replay();
-        queue.getResendInitiator().run();
-    }
-    
-    private void runCandidates(
-                          RetransmissionQueueImpl.ResendCandidate[] candidates,
-                          boolean[] expectAckRequested) {
-        for (int i = 0; i < candidates.length; i++) {
-            candidates[i].run();
-            assertEquals("unexpected request acknowledge",
-                         expectAckRequested[i],
-                         resender.includeAckRequested);
-            assertSame("unexpected context",
-                       candidates[i].getMessage(),
-                       resender.message);
-            resender.clear();
+        if (doStart) {
+            queue.start();
         }
     }
     

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=517714&r1=517713&r2=517714
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Tue Mar 13 07:53:26 2007
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.logging.Logger;
 
 import org.apache.cxf.Bus;
@@ -83,13 +84,16 @@
     private boolean doTestOnewayDeferredAnonymousAcks = testAll;
     private boolean doTestOnewayDeferredNonAnonymousAcks = testAll;
     private boolean doTestOnewayAnonymousAcksSequenceLength1 = testAll;
-    private boolean doTestOnewayAnonymousAcksSupressed = testAll;
+    private boolean doTestOnewayAnonymousAcksSuppressed = testAll;
+    private boolean doTestOnewayAnonymousAcksSuppressedAsyncExecutor = testAll;
     private boolean doTestTwowayNonAnonymous = testAll;
     private boolean doTestTwowayNonAnonymousEndpointSpecific = testAll;
     private boolean doTestTwowayNonAnonymousDeferred = testAll;
     private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
     private boolean doTestOnewayMessageLoss = testAll;
+    private boolean doTestOnewayMessageLossAsyncExecutor = testAll;
     private boolean doTestTwowayMessageLoss = testAll;
+    private boolean doTestTwowayMessageLossAsyncExecutor = testAll;
     private boolean doTestTwowayNonAnonymousNoOffer = testAll;
     private boolean doTestConcurrency = testAll;
 
@@ -332,15 +336,27 @@
         mf.verifyLastMessage(new boolean[] {false, false, false, false, false, false}, false);
         mf.verifyAcknowledgements(new boolean[] {false, true, false, false, true, false}, false);
     }
-    
+   
     @Test
-    public void testOnewayAnonymousAcksSupressed() throws Exception {
+    public void testOnewayAnonymousAcksSuppressed() throws Exception {
+        if (!doTestOnewayAnonymousAcksSuppressed) {
+            return;
+        }
+        testOnewayAnonymousAcksSuppressed(null);
+    }
 
-        if (!doTestOnewayAnonymousAcksSupressed) {
+    @Test
+    public void testOnewayAnonymousAcksSuppressedAsyncExecutor() throws Exception {
+        if (!doTestOnewayAnonymousAcksSuppressedAsyncExecutor) {
             return;
         }
-        setupGreeter("org/apache/cxf/systest/ws/rm/suppressed.xml");
+        testOnewayAnonymousAcksSuppressed(Executors.newSingleThreadExecutor());
+    }
 
+    private void testOnewayAnonymousAcksSuppressed(Executor executor) throws Exception {
+
+        setupGreeter("org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml", false, executor);
+ 
         greeter.greetMeOneWay("once");
         greeter.greetMeOneWay("twice");
         greeter.greetMeOneWay("thrice");
@@ -604,12 +620,26 @@
         expected[5] = true;
         mf.verifyAcknowledgements(expected, false);
     }
+
     @Test    
     public void testOnewayMessageLoss() throws Exception {
         if (!doTestOnewayMessageLoss) {
             return;
         }
-        setupGreeter("org/apache/cxf/systest/ws/rm/message-loss.xml");
+        testOnewayMessageLoss(null);
+    }
+    
+    @Test    
+    public void testOnewayMessageLossAsyncExecutor() throws Exception {
+        if (!doTestOnewayMessageLossAsyncExecutor) {
+            return;
+        }
+        testOnewayMessageLoss(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayMessageLoss(Executor executor) throws Exception {
+
+        setupGreeter("org/apache/cxf/systest/ws/rm/message-loss.xml", false, executor);
         
         greeterBus.getOutInterceptors().add(new MessageLossSimulator());
         RMManager manager = greeterBus.getExtension(RMManager.class);
@@ -653,13 +683,26 @@
         mf.verifyAcknowledgements(new boolean[] {false, true, true, true, true}, false);
   
     }
-    
+
     @Test
     public void testTwowayMessageLoss() throws Exception {
         if (!doTestTwowayMessageLoss) {
             return;
         }
-        setupGreeter("org/apache/cxf/systest/ws/rm/message-loss.xml", true);
+        testTwowayMessageLoss(null);
+    }
+
+    @Test
+    public void testTwowayMessageLossAsyncExecutor() throws Exception {
+        if (!doTestTwowayMessageLossAsyncExecutor) {
+            return;
+        }
+        testTwowayMessageLoss(Executors.newSingleThreadExecutor());
+    }
+    
+    private void testTwowayMessageLoss(Executor executor) throws Exception {
+
+        setupGreeter("org/apache/cxf/systest/ws/rm/message-loss.xml", true, executor);
         
         greeterBus.getOutInterceptors().add(new MessageLossSimulator());
         RMManager manager = greeterBus.getExtension(RMManager.class);