You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2007/05/09 16:27:12 UTC

svn commit: r536543 [1/2] - in /incubator/cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/ systests/...

Author: andreasmyth
Date: Wed May  9 07:27:10 2007
New Revision: 536543

URL: http://svn.apache.org/viewvc?view=rev&rev=536543
Log:
[JIRA CXF-282] Try to terminate active sequences when endpoint/bus is shutdown by sending a (soap) message with an empty body and a Sequence header with LastMessage element, which must be responded to with a SequenceAcknowledgment. This in turn triggers sequence termination if all messages are acked.

Added:
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/terminate-on-shutdown.xml   (with props)
Modified:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/SourceSequenceTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Wed May  9 07:27:10 2007
@@ -87,6 +87,7 @@
         if (null == sequenceType) {
             return;
         }
+        
         DestinationSequence seq = getSequence(sequenceType.getIdentifier());
 
         if (null != seq) {
@@ -94,29 +95,49 @@
             seq.acknowledge(message);
 
             if (null != sequenceType.getLastMessage()) {
-
                 seq.setLastMessageNumber(sequenceType.getMessageNumber());
-
-                seq.scheduleImmediateAcknowledgement();
-
-                // if we cannot expect an outgoing message to which the
-                // acknowledgement
-                // can be added we need to send an out-of-band
-                // SequenceAcknowledgement message
-
-                AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
-                String replyToAddress = null;
-                if (null != maps.getReplyTo()) {
-                    replyToAddress = maps.getReplyTo().getAddress().getValue();
-                }
-                if (!(seq.getAcksTo().getAddress().getValue().equals(replyToAddress) || seq
-                    .canPiggybackAckOnPartialResponse())) {                    
-                    getReliableEndpoint().getProxy().acknowledge(seq);                    
-                }
+                ackImmediately(seq, message);
             }
         } else {
             SequenceFaultFactory sff = new SequenceFaultFactory();
             throw sff.createUnknownSequenceFault(sequenceType.getIdentifier());
+        }
+    }
+    
+    void ackRequested(Message message) throws SequenceFault, RMException {
+        // TODO
+        Collection<AckRequestedType> ars = RMContextUtils.retrieveRMProperties(message, false)
+            .getAcksRequested();
+        if (null == ars) {
+            return;
+        }
+        for (AckRequestedType ar : ars) {
+            Identifier id = ar.getIdentifier();
+            DestinationSequence seq = getSequence(id);
+            if (null == seq) {
+                continue;
+            }
+            ackImmediately(seq, message);
+        }
+    }
+    
+    void ackImmediately(DestinationSequence seq, Message message) throws RMException {
+        
+        seq.scheduleImmediateAcknowledgement();
+
+        // if we cannot expect an outgoing message to which the
+        // acknowledgement
+        // can be added we need to send an out-of-band
+        // SequenceAcknowledgement message
+
+        AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
+        String replyToAddress = null;
+        if (null != maps.getReplyTo()) {
+            replyToAddress = maps.getReplyTo().getAddress().getValue();
+        }
+        if (!(seq.getAcksTo().getAddress().getValue().equals(replyToAddress) || seq
+            .canPiggybackAckOnPartialResponse())) { 
+            getReliableEndpoint().getProxy().acknowledge(seq);                    
         }
     }
 

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Wed May  9 07:27:10 2007
@@ -181,9 +181,6 @@
     
     void mergeRanges() {
         List<AcknowledgementRange> ranges = acknowledgement.getAcknowledgementRange();
-        if (null == ranges) {
-            return;
-        }
         for (int i = ranges.size() - 1; i > 0; i--) {
             AcknowledgementRange current = ranges.get(i);
             AcknowledgementRange previous = ranges.get(i - 1);
@@ -351,6 +348,12 @@
         for (int i = deferredAcknowledgments.size() - 1; i >= 0; i--) {
             DeferredAcknowledgment da = deferredAcknowledgments.get(i);
             da.cancel();
+        }
+    }
+    
+    synchronized void cancelTermination() {
+        if (null != scheduledTermination) {
+            scheduledTermination.cancel();
         }
     }
 

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties Wed May  9 07:27:10 2007
@@ -24,6 +24,8 @@
 SEQ_TERMINATION_FAILURE = Failed to terminate sequence {0}.
 
 STANDALONE_ANON_ACKS_NOT_SUPPORTED = It is not possible to send out-of-band acknowledgments to the anonymous address.\nAn acknowledgement will be piggybacked on the next response. 
+STANDALONE_LAST_MESSAGE_NO_TARGET_MSG = No target address to send out-of-band last message to.
+STANDALONE_LAST_MESSAGE_ANON_TARGET_MSG = It is not possible to send an out-of-band last message to the anonymous address.
 
 POLICY_PROVIDER_CREATION_EXC = Failed to create provider for RM assertion.
 POLICY_REFERENCE_RESOLUTION_EXC = Policy reference {0} cannot be resolved.

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Wed May  9 07:27:10 2007
@@ -40,6 +40,7 @@
 import org.apache.cxf.service.model.InterfaceInfo;
 import org.apache.cxf.service.model.OperationInfo;
 import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.RelatesToType;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
@@ -161,9 +162,57 @@
     }
     
     void lastMessage(SourceSequence s) throws RMException {
-        // TODO
+        org.apache.cxf.ws.addressing.EndpointReferenceType target = s.getTarget();
+        AttributedURIType uri = null;
+        if (null != target) {
+            uri = target.getAddress();
+        }
+        String addr = null;
+        if (null != uri) {
+            addr = uri.getValue();
+        }
+        
+        if (addr == null) {
+            LOG.log(Level.WARNING, "STANDALONE_LAST_MESSAGE_NO_TARGET_MSG");
+            return;
+        }
+        
+        if (RMUtils.getAddressingConstants().getAnonymousURI().equals(addr)) {
+            LOG.log(Level.WARNING, "STANDALONE_LAST_MESSAGE_ANON_TARGET_MSG");
+            return; 
+        }
+        
+        OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
+            .getOperation(RMConstants.getLastMessageOperationName());
+        invoke(oi, new Object[] {}, null);
     }
     
+    void ackRequested(SourceSequence s) throws RMException {
+        org.apache.cxf.ws.addressing.EndpointReferenceType target = s.getTarget();
+        AttributedURIType uri = null;
+        if (null != target) {
+            uri = target.getAddress();
+        }
+        String addr = null;
+        if (null != uri) {
+            addr = uri.getValue();
+        }
+        
+        if (addr == null) {
+            LOG.log(Level.WARNING, "STANDALONE_ACK_REQUESTED_NO_TARGET_MSG");
+            return;
+        }
+        
+        if (RMUtils.getAddressingConstants().getAnonymousURI().equals(addr)) {
+            LOG.log(Level.WARNING, "STANDALONE_ACK_REQUESTED_ANON_TARGET_MSG");
+            return; 
+        }
+        
+        OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
+            .getOperation(RMConstants.getAckRequestedOperationName());
+        invoke(oi, new Object[] {}, null);
+    }
+        
     Identifier getOfferedIdentifier() {
         return offeredIdentifier;    
     }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java Wed May  9 07:27:10 2007
@@ -31,6 +31,7 @@
  */
 public final class RMConstants {
    
+    // namespaces
     private static final String WSRM_NAMESPACE_NAME = 
         "http://schemas.xmlsoap.org/ws/2005/02/rm";
     
@@ -46,6 +47,8 @@
         WSRM_NAMESPACE_NAME + "/wsdl";
     
   
+    // element and header names
+    
     private static final String WSRM_SEQUENCE_NAME =
         "Sequence";    
     
@@ -81,11 +84,13 @@
     private static final QName RMASSERTION_QNAME = 
         new QName(WSRMP_NAMESPACE_NAME, RMASSERTION_NAME);
       
-    /**
-     * The set of headers understood by the protocol binding.
-     */
     private static final Set<QName> HEADERS;
     
+    // protocol operation names
+    
+    private static final QName WSRM_PORT_NAME =
+        new QName(WSRM_WSDL_NAMESPACE_NAME, "SequenceAbstractSoapPort"); 
+    
     private static final QName WSRM_CREATE_SEQUENCE_QNAME =
         new QName(WSRM_WSDL_NAMESPACE_NAME, "CreateSequence");
     
@@ -103,7 +108,16 @@
     
     private static final QName WSRM_SEQUENCE_ACKNOWLEDGEMENT_QNAME =
         new QName(WSRM_WSDL_NAMESPACE_NAME, "SequenceAcknowledgement");
+    
+    private static final QName WSRM_LAST_MESSAGE_QNAME =
+        new QName(WSRM_WSDL_NAMESPACE_NAME, "LastMessage");
+    
+    private static final QName WSRM_ACK_REQ_QNAME =
+        new QName(WSRM_WSDL_NAMESPACE_NAME, "AckRequested");
+ 
 
+    // actions
+    
     private static final String WSRM_CREATE_SEQUENCE_ACTION =
         WSRM_NAMESPACE_NAME + "/CreateSequence";
     
@@ -119,12 +133,17 @@
     private static final String WSRM_LAST_MESSAGE_ACTION =
         WSRM_NAMESPACE_NAME + "/LastMessage";
     
+    private static final String WSRM_ACK_REQUESTED_ACTION =
+        WSRM_NAMESPACE_NAME + "/AckRequested";
+    
     private static final String WSRM_SEQUENCE_ACKNOWLEDGMENT_ACTION =
         WSRM_NAMESPACE_NAME + "/SequenceAcknowledgement";
     
     private static final String WSRM_SEQUENCE_INFO_ACTION =
         WSRM_NAMESPACE_NAME + "/SequenceInfo";
     
+    // fault codes
+    
     private static final String WSRM_UNKNOWN_SEQUENCE_FAULT_CODE =
         "UnknownSequence";
     
@@ -224,6 +243,10 @@
     
     // service model constants
     
+    public static QName getPortName() {
+        return WSRM_PORT_NAME;
+    }
+    
     public static QName getCreateSequenceOperationName() {
         return WSRM_CREATE_SEQUENCE_QNAME;
     }
@@ -248,6 +271,14 @@
         return WSRM_SEQUENCE_ACKNOWLEDGEMENT_QNAME;
     }
     
+    public static QName getLastMessageOperationName() {
+        return WSRM_LAST_MESSAGE_QNAME;
+    }
+    
+    public static QName getAckRequestedOperationName() {
+        return WSRM_ACK_REQ_QNAME;
+    }
+    
     // actions
     
     public static String getCreateSequenceAction() {
@@ -268,6 +299,10 @@
     
     public static String getLastMessageAction() {
         return WSRM_LAST_MESSAGE_ACTION;
+    }
+    
+    public static String getAckRequestedAction() {
+        return WSRM_ACK_REQUESTED_ACTION;
     }
     
     public static String getSequenceAcknowledgmentAction() {

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Wed May  9 07:27:10 2007
@@ -21,6 +21,8 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import javax.wsdl.extensions.ExtensibilityElement;
 import javax.xml.bind.JAXBException;
@@ -28,6 +30,7 @@
 
 import org.apache.cxf.binding.soap.model.SoapBindingInfo;
 import org.apache.cxf.binding.soap.model.SoapOperationInfo;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.databinding.DataBinding;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.interceptor.Interceptor;
@@ -48,19 +51,22 @@
 import org.apache.cxf.ws.policy.EndpointPolicy;
 import org.apache.cxf.ws.policy.PolicyEngine;
 import org.apache.cxf.ws.policy.PolicyInterceptorProviderRegistry;
+import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
+import org.apache.cxf.ws.rm.manager.SourcePolicyType;
 import org.apache.neethi.Assertion;
 import org.apache.neethi.Policy;
 
 public class RMEndpoint {
     
+    private static final Logger LOG = LogUtils.getL7dLogger(RMEndpoint.class);
+    
     private static final QName SERVICE_NAME = 
         new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractService");
     private static final QName INTERFACE_NAME = 
          new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractPortType");
     private static final QName BINDING_NAME = 
         new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractSoapBinding");
-    private static final QName PORT_NAME = 
-        new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractSoapPort");
+
     private static final QName CREATE_PART_NAME =
         new QName(RMConstants.getWsdlNamespace(), "create");
     private static final QName CREATE_RESPONSE_PART_NAME =
@@ -172,6 +178,34 @@
     } 
     
     /** 
+     * @return The time when last application message was received.
+     */
+    public long getLastApplicationMessage() {
+        return lastApplicationMessage;
+    }
+
+    /**
+     * Indicates that an application message has been received.
+     */
+    public void receivedApplicationMessage() {
+        lastApplicationMessage = System.currentTimeMillis();
+    }
+
+    /** 
+     * @return The time when last RM protocol message was received.
+     */
+    public long getLastControlMessage() {
+        return lastControlMessage;
+    }
+
+    /**
+     * Indicates that an RM protocol message has been received.
+     */
+    public void receivedControlMessage() {
+        lastControlMessage = System.currentTimeMillis();
+    }
+    
+    /** 
      * @return Returns the conduit.
      */
     public Conduit getConduit() {
@@ -226,7 +260,7 @@
         
         ei.setAddress(aei.getAddress());
         
-        ei.setName(PORT_NAME);
+        ei.setName(RMConstants.getPortName());
         ei.setBinding(si.getBinding(BINDING_NAME));
 
         // if addressing was enabled on the application endpoint by means 
@@ -286,6 +320,8 @@
         buildCreateSequenceOperationInfo(ii);
         buildTerminateSequenceOperationInfo(ii);
         buildSequenceAckOperationInfo(ii);
+        buildLastMessageOperationInfo(ii);
+        buildAckRequestedOperationInfo(ii);
         
         // TODO: FaultInfo (SequenceFault)
     }
@@ -353,6 +389,26 @@
         messageInfo = operationInfo.createMessage(RMConstants.getSequenceAckOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
     }
+    
+    void buildLastMessageOperationInfo(InterfaceInfo ii) {
+
+        OperationInfo operationInfo = null;
+        MessageInfo messageInfo = null;
+
+        operationInfo = ii.addOperation(RMConstants.getLastMessageOperationName());
+        messageInfo = operationInfo.createMessage(RMConstants.getLastMessageOperationName());
+        operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
+    }
+    
+    void buildAckRequestedOperationInfo(InterfaceInfo ii) {
+
+        OperationInfo operationInfo = null;
+        MessageInfo messageInfo = null;
+
+        operationInfo = ii.addOperation(RMConstants.getAckRequestedOperationName());
+        messageInfo = operationInfo.createMessage(RMConstants.getAckRequestedOperationName());
+        operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
+    }
 
     void buildBindingInfo(ServiceInfo si) {
         // use same binding id as for application endpoint
@@ -385,6 +441,20 @@
             boi.addExtensor(soi);
             bi.addOperation(boi);
             
+            boi = bi.buildOperation(RMConstants.getLastMessageOperationName(), null, null);
+            assert null != boi;
+            soi = new SoapOperationInfo();
+            soi.setAction(RMConstants.getLastMessageAction());
+            boi.addExtensor(soi);
+            bi.addOperation(boi);
+            
+            boi = bi.buildOperation(RMConstants.getAckRequestedOperationName(), null, null);
+            assert null != boi;
+            soi = new SoapOperationInfo();
+            soi.setAction(RMConstants.getAckRequestedAction());
+            boi.addExtensor(soi);
+            bi.addOperation(boi);
+            
             boi = bi.buildOperation(RMConstants.getCreateSequenceOnewayOperationName(), 
                 RMConstants.getCreateSequenceOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
@@ -450,20 +520,49 @@
         manager = m;
     }
     
-    public long getLastApplicationMessage() {
-        return lastApplicationMessage;
-    }
-
-    public void receivedApplicationMessage() {
-        lastApplicationMessage = System.currentTimeMillis();
-    }
-
-    public long getLastControlMessage() {
-        return lastControlMessage;
-    }
-
-    public void receivedControlMessage() {
-        lastControlMessage = System.currentTimeMillis();
+    void shutdown() {
+        // cancel outstanding timer tasks (deferred acknowledgements)
+        // and scheduled termination for all
+        // destination sequences of this endpoint
+        
+        for (DestinationSequence ds : getDestination().getAllSequences()) {
+            ds.cancelDeferredAcknowledgments();
+            ds.cancelTermination();
+        }
+        
+        // try terminating sequences
+        SourcePolicyType sp = manager.getSourcePolicy();
+        SequenceTerminationPolicyType stp = null;
+        if (null != sp) {
+            stp = sp.getSequenceTerminationPolicy();
+        }
+        if (null != stp && stp.isTerminateOnShutdown()) {
+            
+            Collection<SourceSequence> seqs = source.getAllUnacknowledgedSequences();
+            LOG.log(Level.FINE, "Trying to terminate {0} sequences", seqs.size());
+            for (SourceSequence seq : seqs) {
+                try {
+                    // destination MUST respond with a 
+                    // sequence acknowledgement
+                    if (seq.isLastMessage()) {
+                        // REVISIT: this may be non-standard
+                        // getProxy().ackRequested(seq);
+                    } else {
+                        
+                        getProxy().lastMessage(seq);
+                    }
+                } catch (RMException ex) {
+                    // already logged
+                }
+            }
+        }     
+        
+        // cancel outstanding resends for all source sequences
+        // of this endpoint
+        
+        for (SourceSequence ss : getSource().getAllSequences()) {
+            manager.getRetransmissionQueue().stop(ss);
+        }
     }
     
     

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Wed May  9 07:27:10 2007
@@ -79,12 +79,12 @@
         // for application AND out of band messages
         
         RMEndpoint rme = getManager().getReliableEndpoint(message);
+        Destination destination = getManager().getDestination(message);
         
-        if (isApplicationMessage) {            
-            Destination destination = getManager().getDestination(message);
+        if (isApplicationMessage) {                        
             if (null != rmps) {
-                processAcknowledgments(rmps);
-                processAcknowledgmentRequests(rmps);
+                processAcknowledgments(rme.getSource(), rmps);
+                processAcknowledgmentRequests(destination, message);
                 processSequence(destination, message);
                 processDeliveryAssurance(rmps);
             }
@@ -92,7 +92,9 @@
         } else {
             rme.receivedControlMessage();
             if (RMConstants.getSequenceAckAction().equals(action)) {
-                processAcknowledgments(rmps);
+                processAcknowledgments(rme.getSource(), rmps);
+            } else if (RMConstants.getLastMessageAction().equals(action)) {
+                processSequence(destination, message);
             } else if (RMConstants.getCreateSequenceAction().equals(action) && !isServer) {
                 LOG.fine("Processing inbound CreateSequence on client side.");
                 Servant servant = rme.getServant();
@@ -106,13 +108,13 @@
         assertReliability(message);
     }
     
-    void processAcknowledgments(RMProperties rmps) throws SequenceFault, RMException {
+    void processAcknowledgments(Source source, RMProperties rmps) throws SequenceFault, RMException {
         
         Collection<SequenceAcknowledgement> acks = rmps.getAcks();
         if (null != acks) {
             for (SequenceAcknowledgement ack : acks) {
                 Identifier id = ack.getIdentifier();
-                SourceSequence ss = getManager().getSourceSequence(id);                
+                SourceSequence ss = source.getSequence(id);                
                 if (null != ss) {
                     ss.setAcknowledged(ack);
                 } else {
@@ -122,13 +124,13 @@
         }
     }
 
-    void processAcknowledgmentRequests(RMProperties rmps) {
-        // TODO
+    void processAcknowledgmentRequests(Destination destination, Message message) 
+        throws SequenceFault, RMException {
+        destination.ackRequested(message); 
     }
     
     void processSequence(Destination destination, Message message) 
         throws SequenceFault, RMException {
-
         destination.acknowledge(message);
     }
     

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed May  9 07:27:10 2007
@@ -35,6 +35,8 @@
 import org.apache.cxf.binding.Binding;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.endpoint.ServerLifeCycleListener;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
@@ -63,8 +65,25 @@
     private SequenceIdentifierGenerator idGenerator;
     private RetransmissionQueue retransmissionQueue;
     private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
-    private Map<String, SourceSequence> sourceSequences;
     private Timer timer = new Timer(true);
+    private final ServerLifeCycleListener serverLifeCycleListener;
+
+    public RMManager() {
+        serverLifeCycleListener = new ServerLifeCycleListener() {
+
+            public void startServer(Server server) {
+            }
+
+            public void stopServer(Server server) {
+                RMManager.this.shutdownReliableEndpoint(server.getEndpoint());
+            }
+            
+        };        
+    }
+    
+    public ServerLifeCycleListener getServerLifeCycleListener() {
+        return serverLifeCycleListener;
+    }
 
     public Bus getBus() {
         return bus;
@@ -116,18 +135,17 @@
 
     public synchronized RMEndpoint getReliableEndpoint(Message message) {
         Endpoint endpoint = message.getExchange().get(Endpoint.class);
+        QName name = endpoint.getEndpointInfo().getName();
         if (LOG.isLoggable(Level.FINE)) {
-            LOG.fine("Getting RMEndpoint for endpoint with info: " + endpoint.getEndpointInfo().getName());
+            LOG.fine("Getting RMEndpoint for endpoint with info: " + name);
         }
-        if (endpoint.getEndpointInfo().getName().equals(
-                                                        new QName(RMConstants.getWsdlNamespace(),
-                                                                  "SequenceAbstractSoapPort"))) {
+        if (name.equals(RMConstants.getPortName())) {
             WrappedEndpoint wrappedEndpoint = (WrappedEndpoint)endpoint;
             endpoint = wrappedEndpoint.getWrappedEndpoint();
         }
         RMEndpoint rme = reliableEndpoints.get(endpoint);
         if (null == rme) {
-            rme = new RMEndpoint(this, endpoint);
+            rme = createReliableEndpoint(this, endpoint);
             org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
             org.apache.cxf.ws.addressing.EndpointReferenceType replyTo = null;
             if (null != destination) {
@@ -149,10 +167,6 @@
         return null;
     }
 
-    public SourceSequence getSourceSequence(Identifier id) {
-        return sourceSequences.get(id.getValue());
-    }
-
     public Source getSource(Message message) {
         RMEndpoint rme = getReliableEndpoint(message);
         if (null != rme) {
@@ -210,21 +224,15 @@
         }
 
         return seq;
-    }
+    }    
 
     @PreDestroy
     public void shutdown() {
-        // shutdown retransmission queue
-        if (null != retransmissionQueue) {
-            retransmissionQueue.stop();
-        }
-
-        // cancel outstanding timer tasks (deferred acknowledgements) for all
-        // destination sequences
+        
+        // shutdown remaining endpoints 
+        
         for (RMEndpoint rme : reliableEndpoints.values()) {
-            for (DestinationSequence ds : rme.getDestination().getAllSequences()) {
-                ds.cancelDeferredAcknowledgments();
-            }
+            rme.shutdown();
         }
 
         // remove references to timer tasks cancelled above to make them
@@ -232,6 +240,26 @@
         timer.purge();
         timer.cancel();
     }
+    
+    synchronized void shutdownReliableEndpoint(Endpoint e) {
+        RMEndpoint rme = reliableEndpoints.get(e);
+        if (null == rme) {
+            // not interested
+            return;
+        }
+        
+        rme.shutdown();        
+        
+        // remove references to timer tasks cancelled above to make them
+        // eligible for garbage collection
+        timer.purge();
+        
+        reliableEndpoints.remove(e);
+    }
+    
+    RMEndpoint createReliableEndpoint(RMManager manager, Endpoint endpoint) {
+        return new RMEndpoint(manager, endpoint);
+    }
 
     @PostConstruct
     void initialise() {
@@ -245,13 +273,9 @@
             setDeliveryAssurance(da);
         }
         if (!isSetSourcePolicy()) {
-            SourcePolicyType sp = factory.createSourcePolicyType();
-            setSourcePolicy(sp);
+            setSourcePolicy(null);
 
-        }
-        if (!getSourcePolicy().isSetSequenceTerminationPolicy()) {
-            getSourcePolicy().setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
-        }
+        }       
         if (!isSetDestinationPolicy()) {
             DestinationPolicyType dp = factory.createDestinationPolicyType();
             dp.setAcksPolicy(factory.createAcksPolicyType());
@@ -285,17 +309,25 @@
         super.setRMAssertion(rma);
     }
 
-    void addSourceSequence(SourceSequence ss) {
-        if (null == sourceSequences) {
-            sourceSequences = new HashMap<String, SourceSequence>();
+    
+    @Override
+    public void setSourcePolicy(SourcePolicyType sp) {
+        org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
+        if (null == sp) {
+            sp = factory.createSourcePolicyType();
         }
-        sourceSequences.put(ss.getIdentifier().getValue(), ss);
+        if (!sp.isSetSequenceTerminationPolicy()) {
+            sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
+        }
+        super.setSourcePolicy(sp);
     }
 
-    void removeSourceSequence(Identifier id) {
-        if (null != sourceSequences) {
-            sourceSequences.remove(id.getValue());
-        }
+    Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
+        return reliableEndpoints;
+    }
+    
+    void setReliableEndpointsMap(Map<Endpoint, RMEndpoint> map) {
+        reliableEndpoints = map;
     }
 
     class DefaultSequenceIdentifierGenerator implements SequenceIdentifierGenerator {

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Wed May  9 07:27:10 2007
@@ -44,14 +44,12 @@
         addAfter(MAPAggregator.class.getName());
     }
     
-    protected void handle(Message message) throws SequenceFault, RMException {
-        LOG.entering(getClass().getName(), "handleMessage");
-        
+    protected void handle(Message message) throws SequenceFault, RMException {  
         if (isRuntimeFault(message)) {
             LogUtils.log(LOG, Level.WARNING, "RUNTIME_FAULT_MSG");
             // TODO: in case of a SequenceFault need to set action
             // to http://schemas.xmlsoap.org/ws/2004/a08/addressing/fault
-            // but: need to defer propagation of received MAPS to oubound chain            
+            // but: need to defer propagation of received MAPS to outbound chain first           
             return;
         }
        
@@ -78,8 +76,7 @@
 
         boolean isApplicationMessage = !RMContextUtils.isRMProtocolMessage(action);
         boolean isPartialResponse = MessageUtils.isPartialResponse(message);
-        LOG.fine("isApplicationMessage: " + isApplicationMessage);
-        LOG.fine("isPartialResponse: " + isPartialResponse);
+        boolean isLastMessage = RMConstants.getLastMessageAction().equals(action);
         
         if (isApplicationMessage && !isPartialResponse) {
             RetransmissionInterceptor ri = new RetransmissionInterceptor();
@@ -113,7 +110,8 @@
             }
         }
         
-        if (isApplicationMessage && !isPartialResponse) {
+        if ((isApplicationMessage || isLastMessage)
+            && !isPartialResponse) {
       
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
@@ -128,7 +126,8 @@
                 // increase message number and store a sequence type object in
                 // context
 
-                seq.nextMessageNumber(inSeqId, inMessageNumber);
+                seq.nextMessageNumber(inSeqId, inMessageNumber, isLastMessage);
+                
                 rmpsOut.setSequence(seq);
 
                 // if this was the last message in the sequence, reset the
@@ -197,7 +196,7 @@
         if (LOG.isLoggable(Level.FINE)) {
             Collection<SequenceAcknowledgement> acks = rmpsOut.getAcks();
             if (null == acks) {
-                LOG.fine("No acknowledgements added");
+                LOG.fine("No acknowledgements added.");
             } else {
                 LOG.fine("Added " + acks.size() + " acknowledgements.");
             }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Wed May  9 07:27:10 2007
@@ -63,7 +63,7 @@
     /**
      * Stops retransmission queue.
      */
-    void stop();
+    void stop(SourceSequence seq);
     
     /**
      * Populates the retransmission queue with messages recovered from

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java Wed May  9 07:27:10 2007
@@ -58,9 +58,7 @@
     } 
     
     public void addSequence(SourceSequence seq) { 
-        addSequence(seq, true);
-        getReliableEndpoint().getManager().addSourceSequence(seq);
-        
+        addSequence(seq, true);        
     }
     
     public void addSequence(SourceSequence seq, boolean persist) {

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java Wed May  9 07:27:10 2007
@@ -143,7 +143,7 @@
             RMEndpoint rme = source.getReliableEndpoint();
             Proxy proxy = rme.getProxy();
             proxy.terminate(this);
-            source.getManager().removeSourceSequence(id);
+            source.removeSequence(this);
         }
     }
 
@@ -194,7 +194,7 @@
      * @return the next message number.
      */
     BigInteger nextMessageNumber() {
-        return nextMessageNumber(null, null);
+        return nextMessageNumber(null, null, false);
     }
 
     /**
@@ -206,27 +206,22 @@
      * 
      * @return the next message number.
      */
-    public BigInteger nextMessageNumber(Identifier inSeqId, BigInteger inMsgNumber) {
+    public BigInteger nextMessageNumber(Identifier inSeqId, BigInteger inMsgNumber, boolean last) {
         assert !lastMessage;
 
         BigInteger result = null;
         synchronized (this) {
             currentMessageNumber = currentMessageNumber.add(BigInteger.ONE);
-            checkLastMessage(inSeqId, inMsgNumber);
+            if (last) {
+                lastMessage = true;
+            } else { 
+                checkLastMessage(inSeqId, inMsgNumber);
+            } 
             result = currentMessageNumber;
         }
         return result;
     }
-
-    void nextAndLastMessageNumber() {
-        assert !lastMessage;
-
-        synchronized (this) {
-            currentMessageNumber = currentMessageNumber.add(BigInteger.ONE);
-            lastMessage = true;
-        }
-    }
-
+    
     SequenceAcknowledgement getAcknowledgement() {
         return acknowledgement;
     }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java Wed May  9 07:27:10 2007
@@ -473,6 +473,8 @@
             boi = bi.getOperation(RMConstants.getSequenceAckOperationName()); 
         } else if (RMConstants.getTerminateSequenceAction().equals(action)) {
             boi = bi.getOperation(RMConstants.getTerminateSequenceOperationName()); 
+        } else if (RMConstants.getLastMessageAction().equals(action)) {
+            boi = bi.getOperation(RMConstants.getLastMessageOperationName()); 
         }
         assert boi != null;
         exchange.put(BindingOperationInfo.class, boi);

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Wed May  9 07:27:10 2007
@@ -160,10 +160,23 @@
     }
 
     /**
-     * Stops retransmission queue.
+     * Stops resending messages for the specified source sequence.
      */
-    public void stop() {
-        // no-op
+    public void stop(SourceSequence seq) {
+        synchronized (this) {
+            List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+            if (null != sequenceCandidates) {
+                for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+                    ResendCandidate candidate = sequenceCandidates.get(i);
+                    candidate.cancel();
+                }
+                LOG.log(Level.FINE, "Cancelled resends for sequence {0}.", seq.getIdentifier().getValue());
+            }           
+        }
+    }
+    
+    void stop() {
+        
     }
 
     /**
@@ -421,6 +434,15 @@
         protected synchronized void resolved() {
             pending = false;
             next = null;
+            if (null != nextTask) {
+                nextTask.cancel();
+            }
+        }
+        
+        /**
+         * Cancel further resend (although no ACK has been received).
+         */
+        protected void cancel() {
             if (null != nextTask) {
                 nextTask.cancel();
             }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java Wed May  9 07:27:10 2007
@@ -63,4 +63,18 @@
         assertSame(ae, tested.getEndpoint());
         assertSame(mgr, tested.getManager());
     }
-}
+    
+    @Test
+    public void testGenerateSequenceIdentifier() {
+        RMManager mgr = control.createMock(RMManager.class); 
+        EasyMock.expect(rme.getManager()).andReturn(mgr);
+        SequenceIdentifierGenerator generator = control.createMock(SequenceIdentifierGenerator.class);
+        EasyMock.expect(mgr.getIdGenerator()).andReturn(generator);
+        Identifier id = control.createMock(Identifier.class);
+        EasyMock.expect(generator.generateSequenceIdentifier()).andReturn(id);
+        control.replay();
+        AbstractEndpoint tested = new AbstractEndpoint(rme);
+        assertSame(id, tested.generateSequenceIdentifier());
+        control.verify();
+    }
+}
\ No newline at end of file

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java Wed May  9 07:27:10 2007
@@ -76,7 +76,27 @@
     }
     
     @Test
-    public void testHandleMessage() {
+    public void testHandleMessageSequenceFaultNoBinding() {
+        RMInterceptor interceptor = new RMInterceptor();
+        Message message = control.createMock(Message.class);
+        SequenceFault sf = control.createMock(SequenceFault.class);
+        interceptor.setSequenceFault(sf);
+        Exchange ex = control.createMock(Exchange.class);
+        EasyMock.expect(message.getExchange()).andReturn(ex);
+        Endpoint e = control.createMock(Endpoint.class);
+        EasyMock.expect(ex.get(Endpoint.class)).andReturn(e);
+        EasyMock.expect(e.getBinding()).andReturn(null);
+        control.replay();
+        try {
+            interceptor.handleMessage(message);
+            fail("Expected Fault not thrown.");
+        } catch (Fault f) {
+            assertSame(sf, f.getCause());            
+        }
+    }
+    
+    @Test
+    public void testHandleMessageSequenceFault() {
         RMInterceptor interceptor = new RMInterceptor();
         Message message = control.createMock(Message.class);
         SequenceFault sf = control.createMock(SequenceFault.class);
@@ -101,7 +121,21 @@
         } catch (Fault f) {
             assertSame(f, fault);            
         }
-        control.verify();
+    }
+    
+    @Test
+    public void testHandleMessageRMException() {
+        RMInterceptor interceptor = new RMInterceptor();
+        Message message = control.createMock(Message.class);
+        RMException rme = control.createMock(RMException.class);      
+        interceptor.setRMException(rme);
+        control.replay();
+        try {
+            interceptor.handleMessage(message);
+            fail("Expected Fault not thrown.");
+        } catch (Fault f) {
+            assertSame(rme, f.getCause());            
+        }
     }
     
     @Test
@@ -122,21 +156,29 @@
         interceptor.assertReliability(message);
         assertTrue(!ai.isAsserted());
         ais.add(ai);
-        interceptor.assertReliability(message);     
+        interceptor.assertReliability(message);   
+        
     }
 
     class RMInterceptor extends AbstractRMInterceptor {
 
         private SequenceFault sequenceFault;
+        private RMException rmException;
         
         void setSequenceFault(SequenceFault sf) {
             sequenceFault = sf;
         }
         
+        void setRMException(RMException rme) {
+            rmException = rme;
+        }
+        
         @Override
-        protected void handle(Message msg) throws SequenceFault {
+        protected void handle(Message msg) throws SequenceFault, RMException {
             if (null != sequenceFault) { 
                 throw sequenceFault;
+            } else if (null != rmException) {
+                throw rmException;
             }
         }     
     }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Wed May  9 07:27:10 2007
@@ -21,18 +21,22 @@
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Timer;
 
 import javax.xml.namespace.QName;
 
+import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.addressing.v200408.AttributedURI;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
 import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
 import org.apache.cxf.ws.rm.manager.AcksPolicyType;
 import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
 import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RMAssertion;
 import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
 import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
@@ -647,6 +651,110 @@
             // ignore
         }
         
+        control.verify();
+    }
+
+    @Test
+    public void testSequenceTermination() {
+        destination = control.createMock(Destination.class);
+        DestinationSequence seq = new DestinationSequence(id, ref, destination);
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme);
+        DestinationSequence.SequenceTermination st = seq.new SequenceTermination();
+        st.updateInactivityTimeout(30000L);
+        long lastAppMessage = System.currentTimeMillis() - 30000L;
+        EasyMock.expect(rme.getLastControlMessage()).andReturn(0L);
+        EasyMock.expect(rme.getLastApplicationMessage()).andReturn(lastAppMessage);
+        destination.removeSequence(seq);
+        EasyMock.expectLastCall();
+        control.replay();
+        st.run();
+        control.verify();      
+    }
+    
+    @Test
+    public void testSequenceTerminationNotNecessary() {
+        destination = control.createMock(Destination.class);
+        manager = control.createMock(RMManager.class);
+        EasyMock.expect(destination.getManager()).andReturn(manager);
+        Timer t = new Timer();
+        EasyMock.expect(manager.getTimer()).andReturn(t);
+        DestinationSequence seq = new DestinationSequence(id, ref, destination);
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme);
+        DestinationSequence.SequenceTermination st = seq.new SequenceTermination();
+        st.updateInactivityTimeout(30000L);
+        long lastAppMessage = System.currentTimeMillis() - 1000L;
+        EasyMock.expect(rme.getLastControlMessage()).andReturn(0L);
+        EasyMock.expect(rme.getLastApplicationMessage()).andReturn(lastAppMessage);
+        EasyMock.expectLastCall();
+        control.replay();
+        st.run();
+        control.verify();
+    }
+    
+    @Test
+    public void testCanPiggybackAckOnPartialResponse() {
+        DestinationSequence seq = new DestinationSequence(id, ref, destination);
+        AttributedURI uri = control.createMock(AttributedURI.class);
+        EasyMock.expect(ref.getAddress()).andReturn(uri);
+        String addr = "http://localhost:9999/reponses";
+        EasyMock.expect(uri.getValue()).andReturn(addr);
+        control.replay();
+        assertTrue(!seq.canPiggybackAckOnPartialResponse());
+        control.verify();
+        control.reset();
+        EasyMock.expect(ref.getAddress()).andReturn(uri);
+        EasyMock.expect(uri.getValue()).andReturn(RMConstants.getAnonymousAddress());
+        control.replay();
+        assertTrue(seq.canPiggybackAckOnPartialResponse());
+        control.verify();
+    }
+    
+    @Test
+    public void testPurgeAcknowledged() {
+        destination = control.createMock(Destination.class);
+        DestinationSequence seq = new DestinationSequence(id, ref, destination);        
+        manager = control.createMock(RMManager.class);
+        EasyMock.expect(destination.getManager()).andReturn(manager);
+        RMStore store = control.createMock(RMStore.class);
+        EasyMock.expect(manager.getStore()).andReturn(store);
+        store.removeMessages(EasyMock.eq(id), 
+            CastUtils.cast(EasyMock.isA(Collection.class), BigInteger.class), EasyMock.eq(false));
+        EasyMock.expectLastCall();
+        control.replay();
+        seq.purgeAcknowledged(BigInteger.ONE);
+        control.verify();
+    }
+    
+    @Test
+    public void testCancelDeferredAcknowledgements() {
+        destination = control.createMock(Destination.class);
+        manager = control.createMock(RMManager.class);
+        EasyMock.expect(destination.getManager()).andReturn(manager);
+        Timer t = new Timer();
+        EasyMock.expect(manager.getTimer()).andReturn(t);
+        DestinationSequence seq = new DestinationSequence(id, ref, destination);
+        control.replay();
+        seq.scheduleDeferredAcknowledgement(30000L);
+        seq.cancelDeferredAcknowledgments();
+        seq.cancelDeferredAcknowledgments();
+        t.cancel();
+        control.verify();
+    }
+    
+    @Test
+    public void testCancelTermination() {
+        destination = control.createMock(Destination.class);
+        manager = control.createMock(RMManager.class);
+        EasyMock.expect(destination.getManager()).andReturn(manager);
+        Timer t = new Timer();
+        EasyMock.expect(manager.getTimer()).andReturn(t);
+        DestinationSequence seq = new DestinationSequence(id, ref, destination);
+        control.replay();
+        seq.scheduleSequenceTermination(30000L);
+        seq.cancelTermination();
+        t.cancel();
         control.verify();
     }
     

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Wed May  9 07:27:10 2007
@@ -91,7 +91,7 @@
         store.removeDestinationSequence(id);
         EasyMock.expectLastCall();
         control.replay();
-        destination.addSequence(ds, true);
+        destination.addSequence(ds);
         assertEquals(1, destination.getAllSequences().size());
         assertSame(ds, destination.getSequence(id));
         destination.removeSequence(ds);
@@ -175,7 +175,8 @@
         BigInteger nr = BigInteger.TEN;
         EasyMock.expect(st.getMessageNumber()).andReturn(nr).times(2);
         DestinationSequence ds = control.createMock(DestinationSequence.class);
-        EasyMock.expect(destination.getSequence(id)).andReturn(ds);   
+        EasyMock.expect(destination.getSequence(id)).andReturn(ds);
+        
         ds.applyDeliveryAssurance(nr);
         EasyMock.expectLastCall();
         ds.acknowledge(message);

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java Wed May  9 07:27:10 2007
@@ -131,6 +131,46 @@
         proxy.acknowledge(ds);      
     }
     
+    @Test
+    public void testLastMessage() throws NoSuchMethodException, RMException {
+        Method m = Proxy.class.getDeclaredMethod("invoke", 
+            new Class[] {OperationInfo.class, Object[].class, Map.class});
+        Proxy proxy = control.createMock(Proxy.class, new Method[] {m});
+        proxy.setReliableEndpoint(rme);
+        SourceSequence ss = control.createMock(SourceSequence.class);
+        EasyMock.expect(ss.getTarget()).andReturn(null);
+        control.replay();
+        proxy.lastMessage(ss);
+        control.verify();
+        
+        control.reset();
+        org.apache.cxf.ws.addressing.EndpointReferenceType target
+            = RMUtils.createAnonymousReference();
+        EasyMock.expect(ss.getTarget()).andReturn(target);
+        control.replay();
+        proxy.lastMessage(ss);
+        control.verify();
+        
+        control.reset();
+        target = RMUtils.createReference("http://localhost:9000/greeterPort");
+        EasyMock.expect(ss.getTarget()).andReturn(target);
+        Endpoint endpoint = control.createMock(Endpoint.class);
+        EasyMock.expect(rme.getEndpoint()).andReturn(endpoint);
+        EndpointInfo epi = control.createMock(EndpointInfo.class);
+        EasyMock.expect(endpoint.getEndpointInfo()).andReturn(epi);
+        ServiceInfo si = control.createMock(ServiceInfo.class);
+        EasyMock.expect(epi.getService()).andReturn(si);
+        InterfaceInfo ii = control.createMock(InterfaceInfo.class);
+        EasyMock.expect(si.getInterface()).andReturn(ii);
+        OperationInfo oi = control.createMock(OperationInfo.class);
+        EasyMock.expect(ii.getOperation(RMConstants.getLastMessageOperationName())).andReturn(oi);
+        expectInvoke(proxy, oi, null);
+        control.replay();
+        
+        proxy.lastMessage(ss);
+        
+    }
+    
     @Test    
     public void testTerminate() throws NoSuchMethodException, RMException {
         Method m = Proxy.class.getDeclaredMethod("invoke", 

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java Wed May  9 07:27:10 2007
@@ -21,12 +21,16 @@
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 
+import javax.wsdl.extensions.ExtensibilityElement;
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.model.BindingInfo;
 import org.apache.cxf.service.model.BindingOperationInfo;
@@ -35,11 +39,13 @@
 import org.apache.cxf.service.model.OperationInfo;
 import org.apache.cxf.service.model.ServiceInfo;
 import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.addressing.Names;
 import org.apache.cxf.ws.policy.EffectivePolicy;
 import org.apache.cxf.ws.policy.EndpointPolicy;
 import org.apache.cxf.ws.policy.PolicyEngine;
 import org.apache.cxf.ws.policy.PolicyInterceptorProviderRegistry;
 import org.apache.neethi.Assertion;
+import org.apache.neethi.Policy;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
 import org.junit.After;
@@ -48,12 +54,12 @@
 import org.junit.Test;
 
 public class RMEndpointTest extends Assert {
-    
+
     private IMocksControl control;
     private RMManager manager;
     private Endpoint ae;
     private RMEndpoint rme;
-    
+
     @Before
     public void setUp() {
         control = EasyMock.createNiceControl();
@@ -61,12 +67,12 @@
         ae = control.createMock(Endpoint.class);
         rme = new RMEndpoint(manager, ae);
     }
-    
+
     @After
     public void tearDown() {
         control.verify();
     }
-    
+
     @Test
     public void testConstructor() {
         control.replay();
@@ -76,65 +82,65 @@
         assertNull(rme.getConduit());
         assertNull(rme.getReplyTo());
     }
-    
+
     @Test
     public void testGetName() {
-        EndpointInfo aei = control.createMock(EndpointInfo.class);        
+        EndpointInfo aei = control.createMock(EndpointInfo.class);
         EasyMock.expect(ae.getEndpointInfo()).andReturn(aei);
         QName qn = new QName("cxf");
         EasyMock.expect(aei.getName()).andReturn(qn);
         control.replay();
         assertSame(qn, rme.getName());
     }
-    
+
     @Test
     public void testGetManager() {
         control.replay();
         assertSame(manager, rme.getManager());
     }
-    
+
     @Test
     public void testGetApplicationEndpoint() {
         control.replay();
         assertSame(ae, rme.getApplicationEndpoint());
     }
-    
+
     @Test
     public void testGetProxy() {
         control.replay();
         assertSame(rme, rme.getProxy().getReliableEndpoint());
     }
-    
+
     @Test
     public void testGetServant() {
         control.replay();
         assertNotNull(rme.getServant());
     }
-    
+
     @Test
     public void testGetSetDestination() {
         Destination d = control.createMock(Destination.class);
         control.replay();
         assertSame(rme, rme.getDestination().getReliableEndpoint());
         rme.setDestination(d);
-        assertSame(d, rme.getDestination());        
+        assertSame(d, rme.getDestination());
     }
-    
+
     @Test
     public void testGetSetSource() {
         Source s = control.createMock(Source.class);
         control.replay();
         assertSame(rme, rme.getSource().getReliableEndpoint());
         rme.setSource(s);
-        assertSame(s, rme.getSource());        
+        assertSame(s, rme.getSource());
     }
-    
+
     @Test
     public void testInitialise() throws NoSuchMethodException {
         Method m1 = RMEndpoint.class.getDeclaredMethod("createService", new Class[] {});
         Method m2 = RMEndpoint.class.getDeclaredMethod("createEndpoint", new Class[] {});
         Method m3 = RMEndpoint.class.getDeclaredMethod("setPolicies", new Class[] {});
-        
+
         rme = control.createMock(RMEndpoint.class, new Method[] {m1, m2, m3});
         rme.createService();
         EasyMock.expectLastCall();
@@ -143,14 +149,14 @@
         rme.setPolicies();
         EasyMock.expectLastCall();
         Conduit c = control.createMock(Conduit.class);
-        org.apache.cxf.ws.addressing.EndpointReferenceType epr = 
-            control.createMock(org.apache.cxf.ws.addressing.EndpointReferenceType.class);        
+        org.apache.cxf.ws.addressing.EndpointReferenceType epr = control
+            .createMock(org.apache.cxf.ws.addressing.EndpointReferenceType.class);
         control.replay();
         rme.initialise(c, epr);
         assertSame(c, rme.getConduit());
-        assertSame(epr, rme.getReplyTo());  
+        assertSame(epr, rme.getReplyTo());
     }
-    
+
     @Test
     public void testCreateService() {
         Service as = control.createMock(Service.class);
@@ -164,13 +170,13 @@
         assertSame(rme.getServant(), s.getInvoker());
         verifyService();
     }
-    
+
     @Test
     public void testCreateEndpoint() throws NoSuchMethodException {
         Method m = RMEndpoint.class.getDeclaredMethod("getUsingAddressing", new Class[] {EndpointInfo.class});
         rme = control.createMock(RMEndpoint.class, new Method[] {m});
         rme.setAplicationEndpoint(ae);
-        rme.setManager(manager);  
+        rme.setManager(manager);
         Service as = control.createMock(Service.class);
         EasyMock.expect(ae.getService()).andReturn(as);
         EndpointInfo aei = control.createMock(EndpointInfo.class);
@@ -189,22 +195,69 @@
         rme.createEndpoint();
         Endpoint e = rme.getEndpoint();
         WrappedEndpoint we = (WrappedEndpoint)e;
-        assertSame(ae, we.getWrappedEndpoint());  
+        assertSame(ae, we.getWrappedEndpoint());
         Service s = rme.getService();
         assertEquals(1, s.getEndpoints().size());
-        assertSame(e, s.getEndpoints().get(new QName(RMConstants.getWsdlNamespace(), 
-                                                     "SequenceAbstractSoapPort")));   
+        assertSame(e, s.getEndpoints().get(RMConstants.getPortName()));
     }
-    
+
+    @Test
+    public void testGetUsingAddressing() {
+        EndpointInfo ei = null;
+        control.replay();
+        assertNull(rme.getUsingAddressing(ei));
+        control.verify();
+
+        control.reset();
+        ExtensibilityElement ua = control.createMock(ExtensibilityElement.class);
+        ei = control.createMock(EndpointInfo.class);
+        List<ExtensibilityElement> noExts = new ArrayList<ExtensibilityElement>();
+        List<ExtensibilityElement> exts = new ArrayList<ExtensibilityElement>();
+        exts.add(ua);
+        EasyMock.expect(ei.getExtensors(ExtensibilityElement.class)).andReturn(noExts);
+        BindingInfo bi = control.createMock(BindingInfo.class);
+        EasyMock.expect(ei.getBinding()).andReturn(bi).times(2);
+        EasyMock.expect(bi.getExtensors(ExtensibilityElement.class)).andReturn(noExts);
+        ServiceInfo si = control.createMock(ServiceInfo.class);
+        EasyMock.expect(ei.getService()).andReturn(si).times(2);
+        EasyMock.expect(si.getExtensors(ExtensibilityElement.class)).andReturn(exts);
+        EasyMock.expect(ua.getElementType()).andReturn(Names.WSAW_USING_ADDRESSING_QNAME);
+        control.replay();
+        assertSame(ua, rme.getUsingAddressing(ei));
+    }
+
+    @Test
+    public void testGetUsingAddressingFromExtensions() {
+        List<ExtensibilityElement> exts = new ArrayList<ExtensibilityElement>();
+        ExtensibilityElement ua = control.createMock(ExtensibilityElement.class);
+        exts.add(ua);
+        EasyMock.expect(ua.getElementType()).andReturn(Names.WSAW_USING_ADDRESSING_QNAME);
+        control.replay();
+        assertSame(ua, rme.getUsingAddressing(exts));
+    }
+
+    @Test
+    public void testMessageArrivals() {
+        assertEquals(0L, rme.getLastApplicationMessage());
+        assertEquals(0L, rme.getLastControlMessage());
+        rme.receivedControlMessage();
+        assertEquals(0L, rme.getLastApplicationMessage());
+        assertTrue(rme.getLastControlMessage() > 0);
+        rme.receivedApplicationMessage();
+        assertTrue(rme.getLastApplicationMessage() > 0);
+        assertTrue(rme.getLastControlMessage() > 0);
+        control.replay();
+    }
+
     @Test
     public void testSetPoliciesNoEngine() {
         Bus bus = control.createMock(Bus.class);
         EasyMock.expect(manager.getBus()).andReturn(bus);
         EasyMock.expect(bus.getExtension(PolicyEngine.class)).andReturn(null);
         control.replay();
-        rme.setPolicies();   
+        rme.setPolicies();
     }
-    
+
     @Test
     public void testSetPoliciesEngineDisabled() {
         Bus bus = control.createMock(Bus.class);
@@ -213,9 +266,9 @@
         EasyMock.expect(bus.getExtension(PolicyEngine.class)).andReturn(pe);
         EasyMock.expect(pe.isEnabled()).andReturn(false);
         control.replay();
-        rme.setPolicies();   
+        rme.setPolicies();
     }
-    
+
     @Test
     public void testSetPolicies() throws NoSuchMethodException {
         Method m = RMEndpoint.class.getDeclaredMethod("getEndpoint", new Class[] {});
@@ -238,38 +291,81 @@
         EndpointPolicy epi = control.createMock(EndpointPolicy.class);
         EasyMock.expect(pe.getServerEndpointPolicy(aei, null)).andReturn(epi);
         EasyMock.expect(epi.getChosenAlternative()).andReturn(new ArrayList<Assertion>());
-        
+
         pe.setEndpointPolicy(ei, epi);
         EasyMock.expectLastCall();
         BindingInfo bi = control.createMock(BindingInfo.class);
         EasyMock.expect(ei.getBinding()).andReturn(bi);
         BindingOperationInfo boi = control.createMock(BindingOperationInfo.class);
         EasyMock.expect(bi.getOperations()).andReturn(Collections.singletonList(boi));
-        pe.setEffectiveServerRequestPolicy(EasyMock.eq(ei), EasyMock.eq(boi), 
-                                      EasyMock.isA(EffectivePolicy.class));
+        pe.setEffectiveServerRequestPolicy(EasyMock.eq(ei), EasyMock.eq(boi), EasyMock
+            .isA(EffectivePolicy.class));
         EasyMock.expectLastCall();
-        pe.setEffectiveServerResponsePolicy(EasyMock.eq(ei), EasyMock.eq(boi), 
-                                      EasyMock.isA(EffectivePolicy.class));
+        pe.setEffectiveServerResponsePolicy(EasyMock.eq(ei), EasyMock.eq(boi), EasyMock
+            .isA(EffectivePolicy.class));
         EasyMock.expectLastCall();
-        pe.setEffectiveClientRequestPolicy(EasyMock.eq(ei), EasyMock.eq(boi), 
-                                      EasyMock.isA(EffectivePolicy.class));
+        pe.setEffectiveClientRequestPolicy(EasyMock.eq(ei), EasyMock.eq(boi), EasyMock
+            .isA(EffectivePolicy.class));
         EasyMock.expectLastCall();
-        pe.setEffectiveClientResponsePolicy(EasyMock.eq(ei), EasyMock.eq(boi), 
-                                      EasyMock.isA(EffectivePolicy.class));
+        pe.setEffectiveClientResponsePolicy(EasyMock.eq(ei), EasyMock.eq(boi), EasyMock
+            .isA(EffectivePolicy.class));
         EasyMock.expectLastCall();
         control.replay();
         rme.setPolicies();
     }
-       
-    private void verifyService() {        
+
+    @Test
+    public void testShutdown() {
+        DestinationSequence ds = control.createMock(DestinationSequence.class);
+        Identifier did = control.createMock(Identifier.class);
+        EasyMock.expect(ds.getIdentifier()).andReturn(did);
+        String d = "d";
+        EasyMock.expect(did.getValue()).andReturn(d);        
+        SourceSequence ss = control.createMock(SourceSequence.class);
+        Identifier sid = control.createMock(Identifier.class);
+        EasyMock.expect(ss.getIdentifier()).andReturn(sid);
+        String s = "s";
+        EasyMock.expect(sid.getValue()).andReturn(s);        
+        ds.cancelDeferredAcknowledgments();
+        EasyMock.expectLastCall();
+        ds.cancelTermination();
+        EasyMock.expectLastCall();
+        RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
+        EasyMock.expect(manager.getRetransmissionQueue()).andReturn(queue);
+        queue.stop(ss);
+        EasyMock.expectLastCall();
+        control.replay();
+        rme.getDestination().addSequence(ds, false);
+        rme.getSource().addSequence(ss, false);
+        rme.shutdown();
+    }
+
+    @Test
+    public void testEffectivePolicyImpl() {
+        EndpointPolicy ep = control.createMock(EndpointPolicy.class);
+        Collection<Assertion> alt = new ArrayList<Assertion>();
+        EasyMock.expect(ep.getChosenAlternative()).andReturn(alt).times(2);
+        PolicyInterceptorProviderRegistry reg = control.createMock(PolicyInterceptorProviderRegistry.class);
+        List<Interceptor> li = new ArrayList<Interceptor>();
+        EasyMock.expect(reg.getInterceptors(alt, true, false)).andReturn(li);
+        Policy p = control.createMock(Policy.class);
+        EasyMock.expect(ep.getPolicy()).andReturn(p);
+        control.replay();
+        EffectivePolicy effective = rme.new EffectivePolicyImpl(ep, reg, true, false);
+        assertSame(alt, effective.getChosenAlternative());
+        assertSame(li, effective.getInterceptors());
+        assertSame(p, effective.getPolicy());
+    }
+
+    private void verifyService() {
         Service service = rme.getService();
         ServiceInfo si = service.getServiceInfos().get(0);
         assertNotNull("service info is null", si);
 
         InterfaceInfo intf = si.getInterface();
-        
-        assertEquals(5, intf.getOperations().size());
-        
+
+        assertEquals(7, intf.getOperations().size());
+
         String ns = si.getName().getNamespaceURI();
         OperationInfo oi = intf.getOperation(new QName(ns, "CreateSequence"));
         assertNotNull("No operation info.", oi);
@@ -277,19 +373,27 @@
         assertTrue("Operation is unwrapped.", !oi.isUnwrapped());
         assertTrue("Operation is unwrappedCapable.", !oi.isUnwrappedCapable());
         assertNull("Unexpected unwrapped operation.", oi.getUnwrappedOperation());
-        
+
         oi = intf.getOperation(new QName(ns, "TerminateSequence"));
         assertNotNull("No operation info.", oi);
         assertTrue("Operation is toway.", oi.isOneWay());
-        
+
         oi = intf.getOperation(new QName(ns, "SequenceAcknowledgement"));
         assertNotNull("No operation info.", oi);
         assertTrue("Operation is toway.", oi.isOneWay());
         
-        oi = intf.getOperation(new QName(ns, "CreateSequenceOneway"));
+        oi = intf.getOperation(new QName(ns, "LastMessage"));
         assertNotNull("No operation info.", oi);
         assertTrue("Operation is toway.", oi.isOneWay());
         
+        oi = intf.getOperation(new QName(ns, "AckRequested"));
+        assertNotNull("No operation info.", oi);
+        assertTrue("Operation is toway.", oi.isOneWay());
+
+        oi = intf.getOperation(new QName(ns, "CreateSequenceOneway"));
+        assertNotNull("No operation info.", oi);
+        assertTrue("Operation is toway.", oi.isOneWay());
+
         oi = intf.getOperation(new QName(ns, "CreateSequenceResponseOneway"));
         assertNotNull("No operation info.", oi);
         assertTrue("Operation is toway.", oi.isOneWay());

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java Wed May  9 07:27:10 2007
@@ -122,12 +122,14 @@
     private void testHandleSequenceAck(boolean onServer) throws SequenceFault, RMException, 
     NoSuchMethodException {
         Method m = RMInInterceptor.class.getDeclaredMethod("processAcknowledgments",
-            new Class[] {RMProperties.class});
+            new Class[] {Source.class, RMProperties.class});
         interceptor = control.createMock(RMInInterceptor.class, new Method[] {m});
         Message message = setupInboundMessage(RMConstants.getSequenceAckAction(), onServer);
         rme.receivedControlMessage();
         EasyMock.expectLastCall();
-        interceptor.processAcknowledgments(rmps);
+        Source s = control.createMock(Source.class);
+        EasyMock.expect(rme.getSource()).andReturn(s);
+        interceptor.processAcknowledgments(s, rmps);
         EasyMock.expectLastCall();
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
 
@@ -168,9 +170,9 @@
     
     private void testAppMessage(boolean onServer) throws SequenceFault, RMException, NoSuchMethodException {
         Method m1 = RMInInterceptor.class.getDeclaredMethod("processAcknowledgments",
-                                                            new Class[] {RMProperties.class});
+                                                            new Class[] {Source.class, RMProperties.class});
         Method m2 = RMInInterceptor.class.getDeclaredMethod("processAcknowledgmentRequests",
-                                                            new Class[] {RMProperties.class});
+                                                            new Class[] {Destination.class, Message.class});
         Method m3 = RMInInterceptor.class.getDeclaredMethod("processSequence",
                                                             new Class[] {Destination.class, Message.class});
         Method m4 = RMInInterceptor.class.getDeclaredMethod("processDeliveryAssurance",
@@ -180,9 +182,11 @@
         Message message = setupInboundMessage("greetMe", true);
         Destination d = control.createMock(Destination.class);
         EasyMock.expect(manager.getDestination(message)).andReturn(d);
-        interceptor.processAcknowledgments(rmps);
+        Source s = control.createMock(Source.class);
+        EasyMock.expect(rme.getSource()).andReturn(s);
+        interceptor.processAcknowledgments(s, rmps);
         EasyMock.expectLastCall();
-        interceptor.processAcknowledgmentRequests(rmps);
+        interceptor.processAcknowledgmentRequests(d, message);
         EasyMock.expectLastCall();
         interceptor.processSequence(d, message);
         EasyMock.expectLastCall();
@@ -200,6 +204,7 @@
     public void testProcessAcknowledgments() throws RMException {
         interceptor = new RMInInterceptor();
         manager = control.createMock(RMManager.class);
+        Source source = control.createMock(Source.class);
         interceptor.setManager(manager);
         SequenceAcknowledgement ack1 = control.createMock(SequenceAcknowledgement.class);
         SequenceAcknowledgement ack2 = control.createMock(SequenceAcknowledgement.class);
@@ -210,16 +215,16 @@
         Identifier id1 = control.createMock(Identifier.class);
         EasyMock.expect(ack1.getIdentifier()).andReturn(id1);
         SourceSequence ss1 = control.createMock(SourceSequence.class);
-        EasyMock.expect(manager.getSourceSequence(id1)).andReturn(ss1);
+        EasyMock.expect(source.getSequence(id1)).andReturn(ss1);
         ss1.setAcknowledged(ack1);
         EasyMock.expectLastCall();
         Identifier id2 = control.createMock(Identifier.class);
         EasyMock.expect(ack2.getIdentifier()).andReturn(id2);
-        EasyMock.expect(manager.getSourceSequence(id2)).andReturn(null);
+        EasyMock.expect(source.getSequence(id2)).andReturn(null);
   
         control.replay();
         try {
-            interceptor.processAcknowledgments(rmps);
+            interceptor.processAcknowledgments(source, rmps);
             fail("Expected SequenceFault not thrown");
         } catch (SequenceFault sf) {
             assertEquals(RMConstants.getUnknownSequenceFaultCode(), sf.getSequenceFault().getFaultCode());