You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2011/02/03 22:36:06 UTC

svn commit: r1066987 - in /cxf/branches/2.3.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ s...

Author: dkulp
Date: Thu Feb  3 21:36:05 2011
New Revision: 1066987

URL: http://svn.apache.org/viewvc?rev=1066987&view=rev
Log:
Merged revisions 1066985 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/trunk

........
  r1066985 | dkulp | 2011-02-03 16:30:31 -0500 (Thu, 03 Feb 2011) | 4 lines
  
  [CXF-3271] WS-RM code does not support InOrder assurances
  Patch from Dennis Sosnoski applied
  Modified patch to use continuations when available to not consume
  threads.
........

Added:
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
      - copied unchanged from r1066985, cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
      - copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
      - copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
      - copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
      - copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
      - copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
Modified:
    cxf/branches/2.3.x-fixes/   (props changed)
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java

Propchange: cxf/branches/2.3.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Thu Feb  3 21:36:05 2011
@@ -47,8 +47,12 @@ public abstract class AbstractRMIntercep
     private RMManager manager;
     private Bus bus;
     
+    protected AbstractRMInterceptor(String phase) {
+        super(phase);
+    }
+    
     protected AbstractRMInterceptor() {
-        super(Phase.PRE_LOGICAL);
+        this(Phase.PRE_LOGICAL);
     }
      
     public RMManager getManager() {

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Thu Feb  3 21:36:05 2011
@@ -19,16 +19,26 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 
 public class Destination extends AbstractEndpoint {
+    
+    private static final Logger LOG = LogUtils.getL7dLogger(Destination.class);
 
     private Map<String, DestinationSequence> map;
 
@@ -91,12 +101,31 @@ public class Destination extends Abstrac
         DestinationSequence seq = getSequence(sequenceType.getIdentifier());
 
         if (null != seq) {
-            seq.applyDeliveryAssurance(sequenceType.getMessageNumber());
-            seq.acknowledge(message);
-
-            if (null != sequenceType.getLastMessage()) {
-                seq.setLastMessageNumber(sequenceType.getMessageNumber());
-                ackImmediately(seq, message);
+            if (seq.applyDeliveryAssurance(sequenceType.getMessageNumber(), message)) {
+                seq.acknowledge(message);
+    
+                if (null != sequenceType.getLastMessage()) {
+                    seq.setLastMessageNumber(sequenceType.getMessageNumber());
+                    ackImmediately(seq, message);
+                }
+            } else {
+                try {
+                    message.getInterceptorChain().abort();
+                    Conduit conduit = message.getExchange().getDestination()
+                        .getBackChannel(message, null, null);
+                    if (conduit != null) {
+                        //for a one-way, the back channel could be
+                        //null if it knows it cannot send anything.
+                        Message partial = createMessage(message.getExchange());
+                        partial.remove(Message.CONTENT_TYPE);
+                        partial.setExchange(message.getExchange());
+                        conduit.prepare(partial);
+                        conduit.close(partial);
+                    }
+                } catch (IOException e) {
+                    LOG.log(Level.SEVERE, e.getMessage());
+                    throw new RMException(e);
+                }
             }
         } else {
             SequenceFaultFactory sff = new SequenceFaultFactory();
@@ -140,5 +169,28 @@ public class Destination extends Abstrac
             getReliableEndpoint().getProxy().acknowledge(seq);                    
         }
     }
+    
+    void processingComplete(Message message) {
+        SequenceType sequenceType = RMContextUtils.retrieveRMProperties(message, false).getSequence();
+        if (null == sequenceType) {
+            return;
+        }
+        
+        DestinationSequence seq = getSequence(sequenceType.getIdentifier());
 
+        if (null != seq) {
+            seq.processingComplete(sequenceType.getMessageNumber());
+        }
+    }
+    
+    private static Message createMessage(Exchange exchange) {
+        Endpoint ep = exchange.get(Endpoint.class);
+        Message msg = null;
+        if (ep != null) {
+            msg = new MessageImpl();
+            msg.setExchange(exchange);
+            msg = ep.getBinding().createMessage(msg);
+        }
+        return msg;
+    }
 }

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Thu Feb  3 21:36:05 2011
@@ -22,12 +22,16 @@ package org.apache.cxf.ws.rm;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.TimerTask;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
 import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
@@ -51,6 +55,9 @@ public class DestinationSequence extends
     private List<DeferredAcknowledgment> deferredAcknowledgments;
     private SequenceTermination scheduledTermination;
     private String correlationID;
+    private BigInteger inProcessNumber;
+    private BigInteger highNumberCompleted = BigInteger.ZERO;
+    private List<Continuation> continuations = new LinkedList<Continuation>();
     
     public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d) {
         this(i, a, null, null);
@@ -141,7 +148,7 @@ public class DestinationSequence extends
                 acknowledgement.getAcknowledgementRange().add(i, range);
             }
             mergeRanges();
-            notifyAll();
+            wakeupAll();
         }
         
         purgeAcknowledged(messageNumber);
@@ -208,45 +215,102 @@ public class DestinationSequence extends
         // can be included in a HTTP response
         return getAcksTo().getAddress().getValue().equals(RMConstants.getAnonymousAddress());
     }
-       
+    
     /**
      * Ensures that the delivery assurance is honored, e.g. by throwing an 
      * exception if the message had already been delivered and the delivery
      * assurance is AtMostOnce.
-     * This method blocks in case the delivery assurance is 
-     * InOrder and and not all messages with lower message numbers have been 
-     * delivered.
+     * If the delivery assurance includes either AtLeastOnce or ExactlyOnce, combined with InOrder, this
+     * queues out-of-order messages for processing after the missing messages have been received.
      * 
-     * @param s the SequenceType object including identifier and message number
+     * @param mn message number
+     * @return <code>true</code> if message processing to continue, <code>false</code> if to be dropped
      * @throws Fault if message had already been acknowledged
      */
-    void applyDeliveryAssurance(BigInteger mn) throws RMException {
+    boolean applyDeliveryAssurance(BigInteger mn, Message message) throws RMException {
+        Continuation cont = getContinuation(message);
         DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
-        if (da.isSetAtMostOnce() && isAcknowledged(mn)) {            
+        if (cont != null && da.isSetInOrder() && !cont.isNew()) {
+            return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
+                               message, cont);
+        }
+        if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) && isAcknowledged(mn)) {            
             org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
                 "MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn, getIdentifier().getValue());
-            LOG.log(Level.SEVERE, msg.toString());
+            LOG.log(Level.INFO, msg.toString());
             throw new RMException(msg);
         } 
-        if (da.isSetInOrder() && da.isSetAtLeastOnce()) {
-            synchronized (this) {
-                boolean ok = allPredecessorsAcknowledged(mn);
-                while (!ok) {
-                    try {
-                        wait();                        
-                        ok = allPredecessorsAcknowledged(mn);
-                    } catch (InterruptedException ie) {
-                        // ignore
-                    }
+        if (da.isSetInOrder()) {
+            return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
+                               message, cont);
+        }
+        return true;
+    }
+    
+    private Continuation getContinuation(Message message) {
+        if (message == null) {
+            return null;
+        }
+        return message.get(Continuation.class);
+    }
+    
+    synchronized boolean waitInQueue(BigInteger mn, boolean canSkip,
+                                     Message message, Continuation continuation) {
+        while (true) {
+            
+            // can process now if no other in process and this one is next
+            if (inProcessNumber == null) {
+                BigInteger diff = mn.subtract(highNumberCompleted);
+                if (BigInteger.ONE.equals(diff) || (canSkip && diff.signum() > 0)) {
+                    inProcessNumber = mn;
+                    return true;
+                }
+            }
+            
+            // can abort now if same message in process or already processed
+            BigInteger compare = inProcessNumber == null ? highNumberCompleted : inProcessNumber;
+            if (compare.compareTo(mn) >= 0) {
+                return false;
+            }
+            if (continuation == null) {
+                ContinuationProvider p = message.get(ContinuationProvider.class);
+                if (p != null) {
+                    boolean isOneWay = message.getExchange().isOneWay();
+                    message.getExchange().setOneWay(false);
+                    continuation = p.getContinuation();
+                    message.getExchange().setOneWay(isOneWay);
+                    message.put(Continuation.class, continuation);
+                }
+            }
+
+            if (continuation != null) {
+                continuation.setObject(message);
+                if (continuation.suspend(-1)) {
+                    continuations.add(continuation);
+                    throw new SuspendedInvocationException();
                 }
             }
+            try {
+                //if we get here, there isn't a continuation available
+                //so we need to block/wait
+                wait();                        
+            } catch (InterruptedException ie) {
+                // ignore
+            }
         }
     }
+    synchronized void wakeupAll() {
+        while (!continuations.isEmpty()) {
+            Continuation c = continuations.remove(0);
+            c.resume();
+        }
+        notifyAll();
+    }
     
-    synchronized boolean allPredecessorsAcknowledged(BigInteger mn) {
-        return acknowledgement.getAcknowledgementRange().size() == 1
-            && acknowledgement.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
-            && acknowledgement.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
+    synchronized void processingComplete(BigInteger mn) {
+        inProcessNumber = null;
+        highNumberCompleted = mn;
+        wakeupAll();
     }
     
     void purgeAcknowledged(BigInteger messageNr) {

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java Thu Feb  3 21:36:05 2011
@@ -23,6 +23,7 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.common.injection.NoJSR250Annotations;
 import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
 import org.apache.cxf.ws.rm.RMInInterceptor;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMOutInterceptor;
@@ -47,6 +48,7 @@ public class RMFeature extends AbstractF
 
     private RMInInterceptor rmLogicalIn = new RMInInterceptor();
     private RMOutInterceptor rmLogicalOut = new RMOutInterceptor();
+    private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
     private RMSoapInterceptor rmCodec = new RMSoapInterceptor();
 
     public void setDeliveryAssurance(DeliveryAssuranceType da) {
@@ -91,15 +93,18 @@ public class RMFeature extends AbstractF
 
         rmLogicalIn.setBus(bus);
         rmLogicalOut.setBus(bus);
+        rmDelivery.setBus(bus);
 
         provider.getInInterceptors().add(rmLogicalIn);
         provider.getInInterceptors().add(rmCodec);
+        provider.getInInterceptors().add(rmDelivery);
 
         provider.getOutInterceptors().add(rmLogicalOut);
         provider.getOutInterceptors().add(rmCodec);
 
         provider.getInFaultInterceptors().add(rmLogicalIn);
         provider.getInFaultInterceptors().add(rmCodec);
+        provider.getInInterceptors().add(rmDelivery);
 
         provider.getOutFaultInterceptors().add(rmLogicalOut);
         provider.getOutFaultInterceptors().add(rmCodec);

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java Thu Feb  3 21:36:05 2011
@@ -25,6 +25,7 @@ import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.ws.policy.AbstractPolicyInterceptorProvider;
+import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
 import org.apache.cxf.ws.rm.RMInInterceptor;
 import org.apache.cxf.ws.rm.RMOutInterceptor;
 import org.apache.cxf.ws.rm.soap.RMSoapInterceptor;
@@ -35,6 +36,7 @@ public class RMPolicyInterceptorProvider
     private RMInInterceptor rmIn = new RMInInterceptor();
     private RMOutInterceptor rmOut = new RMOutInterceptor();
     private RMSoapInterceptor rmSoap = new RMSoapInterceptor();
+    private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
 
     static {
         Collection<QName> types = new ArrayList<QName>();
@@ -46,15 +48,18 @@ public class RMPolicyInterceptorProvider
         super(ASSERTION_TYPES);
         rmIn.setBus(bus);
         rmOut.setBus(bus);
+        rmDelivery.setBus(bus);
         
         getInInterceptors().add(rmIn);
         getInInterceptors().add(rmSoap);
+        getInInterceptors().add(rmDelivery);
 
         getOutInterceptors().add(rmOut);
         getOutInterceptors().add(rmSoap);
 
         getInFaultInterceptors().add(rmIn);
         getInFaultInterceptors().add(rmSoap);
+        getInInterceptors().add(rmDelivery);
 
         getOutFaultInterceptors().add(rmOut);
         getOutFaultInterceptors().add(rmSoap);

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Thu Feb  3 21:36:05 2011
@@ -462,7 +462,7 @@ public class DestinationSequenceTest ext
         control.replay();        
         DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
         ds.setDestination(destination);
-        ds.applyDeliveryAssurance(mn);
+        ds.applyDeliveryAssurance(mn, null);
         control.verify();
         
         control.reset();
@@ -475,7 +475,7 @@ public class DestinationSequenceTest ext
         EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
         control.replay();     
         try {
-            ds.applyDeliveryAssurance(mn);
+            ds.applyDeliveryAssurance(mn, null);
             fail("Expected Fault not thrown.");
         } catch (RMException ex) {
             assertEquals("MESSAGE_ALREADY_DELIVERED_EXC", ex.getCode());
@@ -486,34 +486,6 @@ public class DestinationSequenceTest ext
     }
     
     @Test
-    public void testInOrderNoWait() throws RMException {
-        setUpDestination();
-
-        BigInteger mn = BigInteger.TEN;
-        
-        DeliveryAssuranceType da = control.createMock(DeliveryAssuranceType.class);
-        EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da);
-        EasyMock.expect(da.isSetAtMostOnce()).andReturn(false);
-        EasyMock.expect(da.isSetAtLeastOnce()).andReturn(true);
-        EasyMock.expect(da.isSetInOrder()).andReturn(true); 
-        
-        SequenceAcknowledgement ack = control.createMock(SequenceAcknowledgement.class);
-        List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
-        AcknowledgementRange r = control.createMock(AcknowledgementRange.class);
-        ranges.add(r);
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
-        EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
-        EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
-        
-        control.replay(); 
-        
-        DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
-        ds.setDestination(destination);
-        ds.applyDeliveryAssurance(mn);
-        control.verify();
-    }
-    
-    @Test
     public void testInOrderWait() {
         setUpDestination();
         Message[] messages = new Message[5];
@@ -549,7 +521,7 @@ public class DestinationSequenceTest ext
             public void run() {
                 try {
                     ds.acknowledge(message);
-                    ds.applyDeliveryAssurance(messageNr);
+                    ds.applyDeliveryAssurance(messageNr, message);
                 } catch (Exception ex) {
                     // ignore
                 }
@@ -583,51 +555,6 @@ public class DestinationSequenceTest ext
     }
     
     @Test
-    public void testAllPredecessorsAcknowledged() {
-
-        SequenceAcknowledgement ack = control.createMock(SequenceAcknowledgement.class);
-        List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
-        AcknowledgementRange r = control.createMock(AcknowledgementRange.class);
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
-        control.replay();
-        DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
-        ds.setDestination(destination);
-        assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-        
-        control.reset();
-        ranges.add(r);
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(2);
-        EasyMock.expect(r.getLower()).andReturn(BigInteger.TEN);
-        control.replay();
-        assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-        
-        control.reset();
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
-        EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
-        EasyMock.expect(r.getUpper()).andReturn(new BigInteger("5"));
-        control.replay();
-        assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-        
-        control.reset();
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
-        EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
-        EasyMock.expect(r.getUpper()).andReturn(BigInteger.TEN);
-        control.replay();
-        assertTrue("not all predecessors acknowledged", ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-        
-        ranges.add(r);
-        control.reset();
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
-        control.replay();
-        assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-    }
-    
-    @Test
     public void testScheduleSequenceTermination() throws SequenceFault {
         Timer timer = new Timer();
         setUpDestination(timer);

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Thu Feb  3 21:36:05 2011
@@ -146,7 +146,7 @@ public class DestinationTest extends Ass
         BigInteger nr = BigInteger.TEN;
         EasyMock.expect(st.getMessageNumber()).andReturn(nr);  
         RMException ex = new RMException(new RuntimeException("already acknowledged"));
-        ds.applyDeliveryAssurance(nr);
+        ds.applyDeliveryAssurance(nr, message);
         EasyMock.expectLastCall().andThrow(ex);
         control.replay();
         try {
@@ -177,8 +177,8 @@ public class DestinationTest extends Ass
         DestinationSequence ds = control.createMock(DestinationSequence.class);
         EasyMock.expect(destination.getSequence(id)).andReturn(ds);
         
-        ds.applyDeliveryAssurance(nr);
-        EasyMock.expectLastCall();
+        ds.applyDeliveryAssurance(nr, message);
+        EasyMock.expectLastCall().andReturn(Boolean.TRUE);
         ds.acknowledge(message);
         EasyMock.expectLastCall();
         SequenceType.LastMessage lm = control.createMock(SequenceType.LastMessage.class);

Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Thu Feb  3 21:36:05 2011
@@ -1104,7 +1104,8 @@ public class SequenceTest extends Abstra
             OutMessageRecorder outRecorder;  
             String id;
             
-            ClientThread(SpringBusFactory bf, String cfgResource, int n) { 
+            ClientThread(SpringBusFactory bf, String cfgResource, int n) {
+                super("client " + n);
                 SequenceTest.this.initGreeter(bf, cfgResource, true, null);
                 greeter = SequenceTest.this.greeter;
                 greeterBus = SequenceTest.this.greeterBus;
@@ -1114,9 +1115,18 @@ public class SequenceTest extends Abstra
             }
             
             public void run() {
-                greeter.greetMe(id + ": a");
-                greeter.greetMe(id + ": b");
-                greeter.greetMe(id + ": c");
+                String s = greeter.greetMe(id + ": a").toLowerCase();
+                if (!s.contains(id)) {
+                    System.out.println("Correlation problem <" + s + ">  <" + id + ">");
+                }
+                s = greeter.greetMe(id + ": b").toLowerCase();
+                if (!s.contains(id)) {
+                    System.out.println("Correlation problem <" + s + ">  <" + id + ">");
+                }
+                s = greeter.greetMe(id + ": c").toLowerCase();
+                if (!s.contains(id)) {
+                    System.out.println("Correlation problem <" + s + ">  <" + id + ">");
+                }
 
                 // three application messages plus createSequence
 
@@ -1134,9 +1144,10 @@ public class SequenceTest extends Abstra
             for (int i = 0; i < clients.length; i++) {
                 clients[i].start();
             }
-
             for (int i = 0; i < clients.length; i++) {
                 clients[i].join();
+            }
+            for (int i = 0; i < clients.length; i++) {
                 MessageFlow mf = new MessageFlow(clients[i].outRecorder.getOutboundMessages(), 
                                                  clients[i].inRecorder.getInboundMessages());
                                 

Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml Thu Feb  3 21:36:05 2011
@@ -31,6 +31,9 @@ http://www.springframework.org/schema/be
         <property name="bus" ref="cxf"/>
     </bean>
     <bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+    <bean id="rmDelivery" class="org.apache.cxf.ws.rm.RMDeliveryInterceptor">
+        <property name="bus" ref="cxf"/>
+    </bean>
 
     <!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
 
@@ -41,7 +44,8 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalIn"/>
                 <ref bean="rmCodec"/>
-                <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
+                <ref bean="rmDelivery"/>
+                <!-- bean class="org.apache.cxf.interceptor.LoggingInInterceptor" /-->
             </list>
         </property>
         <property name="inFaultInterceptors">
@@ -50,7 +54,8 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalIn"/>
                 <ref bean="rmCodec"/>
-                <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
+                <ref bean="rmDelivery"/>
+                <!--bean class="org.apache.cxf.interceptor.LoggingInInterceptor" /-->
             </list>
         </property>
         <property name="outInterceptors">
@@ -68,7 +73,7 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalOut"/>
                 <ref bean="rmCodec"/>
-                <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor" />
+                <!-- bean class="org.apache.cxf.interceptor.LoggingOutInterceptor" /-->
             </list>
         </property>
     </bean>

Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml Thu Feb  3 21:36:05 2011
@@ -31,6 +31,9 @@ http://www.springframework.org/schema/be
         <property name="bus" ref="cxf"/>
     </bean>
     <bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+    <bean id="rmDelivery" class="org.apache.cxf.ws.rm.RMDeliveryInterceptor">
+        <property name="bus" ref="cxf"/>
+    </bean>
 
     <!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
 
@@ -41,6 +44,7 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalIn"/>
                 <ref bean="rmCodec"/>
+                <ref bean="rmDelivery"/>
                 <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
             </list>
         </property>
@@ -50,6 +54,7 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalIn"/>
                 <ref bean="rmCodec"/>
+                <ref bean="rmDelivery"/>
                 <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
             </list>
         </property>

Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java Thu Feb  3 21:36:05 2011
@@ -55,7 +55,8 @@ public class InMessageRecorder extends A
             IOUtils.copy(is, bos);
             is.close();
             bos.close();
-            inbound.add(bos.toByteArray());
+            byte bytes[] = bos.toByteArray();
+            inbound.add(bytes);
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("inbound: " + bos.toString());
             }

Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java Thu Feb  3 21:36:05 2011
@@ -24,8 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import javax.xml.namespace.QName;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
 
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -33,6 +31,7 @@ import org.w3c.dom.Node;
 
 import junit.framework.Assert;
 
+import org.apache.cxf.staxutils.StaxUtils;
 import org.apache.cxf.ws.rm.RMConstants;
 
 
@@ -64,21 +63,18 @@ public class MessageFlow extends Assert 
             out.remove(0);
         }
         outStreams = out;
-        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-        factory.setNamespaceAware(true);
-        DocumentBuilder parser = factory.newDocumentBuilder();
         inboundMessages.clear();
         for (int i = 0; i < inStreams.size(); i++) {
             byte[] bytes = inStreams.get(i);
             ByteArrayInputStream is = new ByteArrayInputStream(bytes);
-            Document document = parser.parse(is);
+            Document document = StaxUtils.read(is);
             inboundMessages.add(document);
         }
         outboundMessages.clear();
         for (int i = 0; i < outStreams.size(); i++) {
             byte[] bytes = outStreams.get(i);
             ByteArrayInputStream is = new ByteArrayInputStream(bytes);
-            Document document = parser.parse(is);
+            Document document = StaxUtils.read(is);
             outboundMessages.add(document);
         }
     }