You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2007/05/01 10:49:31 UTC

svn commit: r533968 - in /incubator/cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/...

Author: andreasmyth
Date: Tue May  1 01:49:30 2007
New Revision: 533968

URL: http://svn.apache.org/viewvc?view=rev&rev=533968
Log:
[JIRA CXF-347] Implementation of InactivityTimeout:
* Recorded arrival time of last application and RM protocol messages.
* Terminate sequence if no protocol or application message have been sent to an endpoint for more than the time specified in the inactvity timeout parameter of the RMAssertion.
* Override Assertion.equal and AssertionBuilder.buildCompatible for RMAssertion and use these in refactored PolicyUtils.

Added:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java   (with props)
Modified:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Tue May  1 01:49:30 2007
@@ -38,6 +38,8 @@
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.PolicyUtils;
 import org.apache.cxf.ws.rm.policy.RMAssertion;
+import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
 
 public class DestinationSequence extends AbstractSequence {
     
@@ -49,6 +51,7 @@
     private SequenceMonitor monitor;
     private boolean acknowledgeOnNextOccasion;
     private List<DeferredAcknowledgment> deferredAcknowledgments;
+    private SequenceTermination scheduledTermination;
     private String correlationID;
     
     public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d) {
@@ -153,7 +156,27 @@
         
         purgeAcknowledged(messageNumber);
         
-        scheduleAcknowledgement(message);
+        RMAssertion rma = PolicyUtils.getRMAssertion(destination.getManager().getRMAssertion(), message);
+        long acknowledgementInterval = 0;
+        AcknowledgementInterval ai = rma.getAcknowledgementInterval();
+        if (null != ai) {
+            BigInteger val = ai.getMilliseconds(); 
+            if (null != val) {
+                acknowledgementInterval = val.longValue();
+            }
+        }
+        
+        scheduleAcknowledgement(acknowledgementInterval);
+       
+        long inactivityTimeout = 0;
+        InactivityTimeout iat = rma.getInactivityTimeout();
+        if (null != iat) {
+            BigInteger val = iat.getMilliseconds(); 
+            if (null != val) {
+                inactivityTimeout = val.longValue();
+            }
+        }
+        scheduleSequenceTermination(inactivityTimeout);
         
     }
     
@@ -273,24 +296,12 @@
         return correlationID;
     }
 
-    void scheduleAcknowledgement(Message message) {  
-        BigInteger interval = PolicyUtils.getAcknowledgmentInterval(message);
-        if (null == interval) {
-            RMAssertion rma = destination.getManager().getRMAssertion();
-            if (null != rma.getAcknowledgementInterval()) {
-                interval = rma.getAcknowledgementInterval().getMilliseconds();
-            }
-        }
-        
-        long delay = 0;
-        if (null != interval) {
-            delay = interval.longValue();
-        }
+    void scheduleAcknowledgement(long acknowledgementInterval) {  
         AcksPolicyType ap = destination.getManager().getDestinationPolicy().getAcksPolicy();
  
-        if (delay > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) {
+        if (acknowledgementInterval > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) {
             LOG.fine("Schedule deferred acknowledgment");
-            scheduleDeferredAcknowledgement(delay);
+            scheduleDeferredAcknowledgement(acknowledgementInterval);
         } else {
             LOG.fine("Schedule immediate acknowledgment");
             scheduleImmediateAcknowledgement();
@@ -301,6 +312,20 @@
     void scheduleImmediateAcknowledgement() {
         acknowledgeOnNextOccasion = true;
     }
+    
+    synchronized void scheduleSequenceTermination(long inactivityTimeout) { 
+        if (inactivityTimeout <= 0) {
+            return;
+        }
+        boolean scheduled = null != scheduledTermination;
+        if (null == scheduledTermination) {
+            scheduledTermination = new SequenceTermination();
+        }
+        scheduledTermination.updateInactivityTimeout(inactivityTimeout);
+        if (!scheduled) {
+            destination.getManager().getTimer().schedule(scheduledTermination, inactivityTimeout);
+        }
+    }
 
     synchronized void scheduleDeferredAcknowledgement(long delay) {
         
@@ -349,6 +374,43 @@
                
             }
 
+        }
+    }
+    
+    final class SequenceTermination extends TimerTask {
+        
+        private long maxInactivityTimeout;
+        
+        void updateInactivityTimeout(long timeout) {
+            maxInactivityTimeout = Math.max(maxInactivityTimeout, timeout);
+        }
+        
+        public void run() {
+            synchronized (DestinationSequence.this) {
+                DestinationSequence.this.scheduledTermination = null;
+                RMEndpoint rme = destination.getReliableEndpoint();
+                long lat = Math.max(rme.getLastControlMessage(), rme.getLastApplicationMessage());
+                if (0 == lat) {
+                    return;
+                }                
+                long now = System.currentTimeMillis();
+                if (now - lat >= maxInactivityTimeout) {
+                    
+                    // terminate regardless outstanding acknowledgments - as we assume that the client is
+                    // gone there is no point in sending a SequenceAcknowledgment
+                    
+                    LogUtils.log(LOG, Level.WARNING, "TERMINATING_INACTIVE_SEQ_MSG", 
+                                 DestinationSequence.this.getIdentifier().getValue());
+                    DestinationSequence.this.destination.removeSequence(DestinationSequence.this);
+
+                } else {
+                   // reschedule 
+                    SequenceTermination st = new SequenceTermination();
+                    st.updateInactivityTimeout(maxInactivityTimeout);
+                    DestinationSequence.this.destination.getManager().getTimer()
+                        .schedule(st, maxInactivityTimeout);
+                }
+            }
         }
     }
     

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties Tue May  1 01:49:30 2007
@@ -18,6 +18,8 @@
 #    under the License.
 #
 #
+TERMINATING_INACTIVE_SEQ_MSG = Terminating sequence {0} due to inactivity.
+
 RM_INVOCATION_FAILED = Invocation of RM protocol operation failed.
 SEQ_TERMINATION_FAILURE = Failed to terminate sequence {0}.
 

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Tue May  1 01:49:30 2007
@@ -78,8 +78,9 @@
     private Endpoint endpoint;
     private Proxy proxy;
     private Servant servant;
-    
-    
+    private long lastApplicationMessage;
+    private long lastControlMessage;
+     
     public RMEndpoint(RMManager m, Endpoint ae) {
         manager = m;
         applicationEndpoint = ae;
@@ -448,6 +449,24 @@
     void setManager(RMManager m) {
         manager = m;
     }
+    
+    public long getLastApplicationMessage() {
+        return lastApplicationMessage;
+    }
+
+    public void receivedApplicationMessage() {
+        lastApplicationMessage = System.currentTimeMillis();
+    }
+
+    public long getLastControlMessage() {
+        return lastControlMessage;
+    }
+
+    public void receivedControlMessage() {
+        lastControlMessage = System.currentTimeMillis();
+    }
+    
+    
     
     class EffectivePolicyImpl implements EffectivePolicy {
         

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Tue May  1 01:49:30 2007
@@ -78,7 +78,9 @@
         
         // for application AND out of band messages
         
-        if (isApplicationMessage) {
+        RMEndpoint rme = getManager().getReliableEndpoint(message);
+        
+        if (isApplicationMessage) {            
             Destination destination = getManager().getDestination(message);
             if (null != rmps) {
                 processAcknowledgments(rmps);
@@ -86,16 +88,19 @@
                 processSequence(destination, message);
                 processDeliveryAssurance(rmps);
             }
-        } else if (RMConstants.getSequenceAckAction().equals(action)) {
-            processAcknowledgments(rmps);
-        } else if (RMConstants.getCreateSequenceAction().equals(action) && !isServer) {
-            LOG.fine("Processing inbound CreateSequence on client side.");
-            RMEndpoint rme = getManager().getReliableEndpoint(message);
-            Servant servant = rme.getServant();
-            CreateSequenceResponseType csr = servant.createSequence(message);
-            Proxy proxy = rme.getProxy();
-            proxy.createSequenceResponse(csr);
-            return;
+            rme.receivedApplicationMessage();
+        } else {
+            rme.receivedControlMessage();
+            if (RMConstants.getSequenceAckAction().equals(action)) {
+                processAcknowledgments(rmps);
+            } else if (RMConstants.getCreateSequenceAction().equals(action) && !isServer) {
+                LOG.fine("Processing inbound CreateSequence on client side.");
+                Servant servant = rme.getServant();
+                CreateSequenceResponseType csr = servant.createSequence(message);
+                Proxy proxy = rme.getProxy();
+                proxy.createSequenceResponse(csr);
+                return;
+            }
         }
         
         assertReliability(message);

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Tue May  1 01:49:30 2007
@@ -239,14 +239,7 @@
     @PostConstruct
     void initialise() {
         if (!isSetRMAssertion()) {
-            org.apache.cxf.ws.rm.policy.ObjectFactory factory = 
-                new org.apache.cxf.ws.rm.policy.ObjectFactory();
-            RMAssertion rma = factory.createRMAssertion();
-            BaseRetransmissionInterval bri = factory.createRMAssertionBaseRetransmissionInterval();
-            bri.setMilliseconds(new BigInteger(RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
-            rma.setBaseRetransmissionInterval(bri);
-            rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
-            setRMAssertion(rma);
+            setRMAssertion(null);
         }
         org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
         if (!isSetDeliveryAssurance()) {
@@ -275,7 +268,28 @@
             idGenerator = new DefaultSequenceIdentifierGenerator();
         }
     }
-    
+       
+    @Override
+    public void setRMAssertion(RMAssertion rma) {        
+        
+        org.apache.cxf.ws.rm.policy.ObjectFactory factory = 
+            new org.apache.cxf.ws.rm.policy.ObjectFactory();
+        if (null == rma) {
+            rma = factory.createRMAssertion();
+            rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
+        }
+        BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+        if (null == bri) {
+            bri = factory.createRMAssertionBaseRetransmissionInterval();  
+            rma.setBaseRetransmissionInterval(bri);
+        }
+        if (null == bri.getMilliseconds()) {
+            bri.setMilliseconds(new BigInteger(RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
+        }
+                
+        super.setRMAssertion(rma);
+    }
+
     void addSourceSequence(SourceSequence ss) {
         if (null == sourceSequences) {
             sourceSequences = new HashMap<String, SourceSequence>();
@@ -298,4 +312,5 @@
             return sid;
         }   
     }
+   
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java Tue May  1 01:49:30 2007
@@ -58,6 +58,13 @@
         
         return mpm;
     }
+        
+    public synchronized long getLastArrivalTime() {
+        if (receiveTimes.size() > 0) {
+            return receiveTimes.get(receiveTimes.size() - 1).longValue();
+        }
+        return 0;
+    }
     
     protected void setMonitorInterval(long i) {
         if (receiveTimes.size() == 0) {

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java Tue May  1 01:49:30 2007
@@ -27,6 +27,9 @@
 import org.apache.cxf.ws.policy.AssertionInfoMap;
 import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
 import org.apache.cxf.ws.rm.RMConstants;
+import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
 
 /**
  * 
@@ -40,94 +43,168 @@
     private PolicyUtils() {        
     }
 
-
     /**
-     * Returns the base retransmission interval for the specified message.
-     * This is obtained as the minimum base retransmission interval in all RMAssertions pertaining
-     * to the message, or null if there are no such policy assertions.
+     * Returns an RMAssertion that is compatible with the default value
+     * and all RMAssertions pertaining to the message (can never be null).
+     * 
+     * @param rma the default value
      * @param message the message
-     * @return the base retransmission interval for the message
+     * @return the compatible RMAssertion
      */
-    public static BigInteger getBaseRetransmissionInterval(Message message) {
+    public static RMAssertion getRMAssertion(RMAssertion defaultValue, Message message) {        
+        RMAssertion compatible = defaultValue;
         AssertionInfoMap amap =  message.get(AssertionInfoMap.class);
-        BigInteger result = null;
         if (null != amap) {
             Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
             if (null != ais) {
+                
                 for (AssertionInfo ai : ais) {
                     JaxbAssertion<RMAssertion> ja = getAssertion(ai);
                     RMAssertion rma = ja.getData();
-                    RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
-                    if (null == bri) {
-                        continue;
-                    }
-                    BigInteger val = bri.getMilliseconds();
-                    if (null == result || val.compareTo(result) < 0) {
-                        result = val;
-                    }
+                    compatible = null == defaultValue ? rma
+                        : intersect(compatible, rma);
                 }
             }
         }
-        return result;
+        return compatible;
     }
     
-    /**
-     * Determines if exponential backoff should be used in repeated attempts to
-     * resend the specified message. Returns false if there is at least one
-     * RMAssertion for this message indicating that no exponential backoff
-     * algorithm should be used, or true otherwise.
-     * 
-     * @param message the message
-     * @return true iff the exponential backoff algorithm should be used for the
-     *         message
-     */
-    public static boolean useExponentialBackoff(Message message) {
-        AssertionInfoMap amap = message.get(AssertionInfoMap.class);
-        if (null != amap) {
-            Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
-            if (null != ais) {
-                for (AssertionInfo ai : ais) {
-                    JaxbAssertion<RMAssertion> ja = getAssertion(ai);
-                    RMAssertion rma = ja.getData();
-                    if (null == rma.getExponentialBackoff()) {
-                        return false;
-                    }
-                }
+    public static RMAssertion intersect(RMAssertion a, RMAssertion b) {
+        if (equals(a, b)) {
+            return a;
+        }
+        RMAssertion compatible = new RMAssertion();
+        
+        // use maximum of inactivity timeout
+        
+        BigInteger aval = null;
+        if (null != a.getInactivityTimeout()) {
+            aval = a.getInactivityTimeout().getMilliseconds();
+        }
+        BigInteger bval = null;
+        if (null != b.getInactivityTimeout()) {
+            bval = b.getInactivityTimeout().getMilliseconds();            
+        }
+        if (null != aval || null != bval) {
+            InactivityTimeout ia = new RMAssertion.InactivityTimeout();
+            if (null != aval && null != bval) {
+                ia.setMilliseconds(aval.max(bval));
+            } else {
+                ia.setMilliseconds(aval != null ? aval : bval);
             }
+            compatible.setInactivityTimeout(ia);
         }
-        return true;
+        
+        // use minimum of base retransmission interval
+        
+        aval = null;
+        if (null != a.getBaseRetransmissionInterval()) {
+            aval = a.getBaseRetransmissionInterval().getMilliseconds();
+        }
+        bval = null;
+        if (null != b.getBaseRetransmissionInterval()) {
+            bval = b.getBaseRetransmissionInterval().getMilliseconds();            
+        }
+        if (null != aval || null != bval) {
+            BaseRetransmissionInterval bri = new RMAssertion.BaseRetransmissionInterval();
+            if (null != aval && null != bval) {
+                bri.setMilliseconds(aval.min(bval));
+            } else {
+                bri.setMilliseconds(aval != null ? aval : bval);
+            }
+            compatible.setBaseRetransmissionInterval(bri);
+        }
+        
+        // use minimum of acknowledgement interval
+        
+        aval = null;
+        if (null != a.getAcknowledgementInterval()) {
+            aval = a.getAcknowledgementInterval().getMilliseconds();
+        }
+        bval = null;
+        if (null != b.getAcknowledgementInterval()) {
+            bval = b.getAcknowledgementInterval().getMilliseconds(); 
+        }
+        if (null != aval || null != bval) {
+            AcknowledgementInterval ai = new RMAssertion.AcknowledgementInterval();
+            if (null != aval && null != bval) {
+                ai.setMilliseconds(aval.min(bval));
+            } else {
+                ai.setMilliseconds(aval != null ? aval : bval);
+            }
+            compatible.setAcknowledgementInterval(ai);
+        }
+    
+        // backoff parameter
+        if (null != a.getExponentialBackoff() || null != b.getExponentialBackoff()) {
+            compatible.setExponentialBackoff(new RMAssertion.ExponentialBackoff());
+        }
+        return compatible;
     }
     
-    /**
-     * Returns the acknowledgment interval for the specified message.
-     * This is obtained as the minimum acknowledgment interval in all RMAssertions pertaining
-     * to the message, or null of if there are no such policy assertions.
-     * @param message the message
-     * @return the base retransmission interval for the message
-     */
-    public static BigInteger getAcknowledgmentInterval(Message message) {
-        AssertionInfoMap amap =  message.get(AssertionInfoMap.class);
-        BigInteger result = null;
-        if (null != amap) {
-            Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
-            if (null != ais) {
-                for (AssertionInfo ai : ais) {
-                    JaxbAssertion<RMAssertion> ja = getAssertion(ai);
-                    RMAssertion rma = ja.getData();
-                    RMAssertion.AcknowledgementInterval interval = rma.getAcknowledgementInterval();
-                    if (null == interval) {
-                        continue;
-                    }
-                    BigInteger val = interval.getMilliseconds();
-                    if (null == result || val.compareTo(result) < 0) {
-                        result = val;
-                    }
+    public static boolean equals(RMAssertion a, RMAssertion b) {
+        if (a == b) {
+            return true;
+        }
+        
+        BigInteger aval = null;
+        if (null != a.getInactivityTimeout()) {
+            aval = a.getInactivityTimeout().getMilliseconds();
+        }
+        BigInteger bval = null;
+        if (null != b.getInactivityTimeout()) {
+            bval = b.getInactivityTimeout().getMilliseconds();            
+        }
+        if (!equals(aval, bval)) {
+            return false;
+        }
+            
+        aval = null;
+        if (null != a.getBaseRetransmissionInterval()) {
+            aval = a.getBaseRetransmissionInterval().getMilliseconds();
+        }
+        bval = null;
+        if (null != b.getBaseRetransmissionInterval()) {
+            bval = b.getBaseRetransmissionInterval().getMilliseconds();            
+        }
+        if (!equals(aval, bval)) {
+            return false;
+        }
+        
+        aval = null;
+        if (null != a.getAcknowledgementInterval()) {
+            aval = a.getAcknowledgementInterval().getMilliseconds();
+        }
+        bval = null;
+        if (null != b.getAcknowledgementInterval()) {
+            bval = b.getAcknowledgementInterval().getMilliseconds(); 
+        }
+        if (!equals(aval, bval)) {
+            return false;
+        }
+        
+        return null == a.getExponentialBackoff()
+            ? null == b.getExponentialBackoff() 
+            : null != b.getExponentialBackoff();         
+    }
+        
+    private static boolean equals(BigInteger aval, BigInteger bval) {
+        if (null != aval) {
+            if (null != bval) {
+                if (!aval.equals(bval)) {
+                    return false;
                 }
+            } else {
+                return false;
             }
+        } else {
+            if (null != bval) {
+                return false;
+            }
+            return true;
         }
-        return result;
+        return true;
     }
-
 
     @SuppressWarnings("unchecked")
     private static JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java?view=auto&rev=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java Tue May  1 01:49:30 2007
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.policy;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertionBuilder;
+import org.apache.cxf.ws.rm.RMConstants;
+import org.apache.neethi.Assertion;
+import org.apache.neethi.Constants;
+import org.apache.neethi.PolicyComponent;
+
+/**
+ * 
+ */
+public class RMAssertionBuilder extends JaxbAssertionBuilder<RMAssertion> {
+ 
+    public RMAssertionBuilder() throws JAXBException {
+        super(RMAssertion.class, RMConstants.getRMAssertionQName());     
+    }
+
+    @Override
+    public Assertion buildCompatible(Assertion a, Assertion b) {
+        if (RMConstants.getRMAssertionQName().equals(a.getName())
+            && RMConstants.getRMAssertionQName().equals(b.getName())) {
+            
+            RMAssertion compatible = PolicyUtils.intersect(
+                JaxbAssertion.cast(a, RMAssertion.class).getData(),
+                JaxbAssertion.cast(b, RMAssertion.class).getData());
+            if (null == compatible) {
+                return null;
+            }
+            JaxbAssertion<RMAssertion> ca = 
+                new JaxbAssertion<RMAssertion>(RMConstants.getRMAssertionQName(), 
+                    a.isOptional() && b.isOptional());
+            ca.setData(compatible);
+            return ca;
+        }
+        return null;
+    }
+
+    @Override
+    protected JaxbAssertion<RMAssertion> buildAssertion() {
+        return new RMPolicyAssertion();
+    }
+    
+    class RMPolicyAssertion extends JaxbAssertion<RMAssertion> {
+        RMPolicyAssertion() {
+            super(RMConstants.getRMAssertionQName(), false);
+        }
+
+        @Override
+        public boolean equal(PolicyComponent policyComponent) {
+            if (policyComponent.getType() != Constants.TYPE_ASSERTION
+                || !getName().equals(((Assertion)policyComponent).getName())) {
+                return false;
+            }
+            JaxbAssertion<RMAssertion> other = JaxbAssertion.cast((Assertion)policyComponent);            
+            return PolicyUtils.equals(this.getData(), other.getData());  
+        }
+        
+        @Override
+        protected Assertion cloneMandatory() {
+            RMPolicyAssertion a = new RMPolicyAssertion();
+            a.setData(getData());
+            return a;        
+        }
+    }
+    
+    
+
+        
+}
+

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue May  1 01:49:30 2007
@@ -81,54 +81,6 @@
         manager = m;
     }
 
-    /**
-     * Returns the base retransmission interval for the specified message. This
-     * is obtained as the minimum base retransmission interval in all
-     * RMAssertions pertaining to the message, or the default configured for the
-     * RMManager if there are no such policy assertions.
-     * 
-     * @param message the message
-     * @return the base retransmission interval for the message
-     */
-    public long getBaseRetransmissionInterval(Message message) {
-        BigInteger val = PolicyUtils.getBaseRetransmissionInterval(message);
-        if (null != val) {
-            return val.longValue();
-        } else {
-
-            RMAssertion rma = manager.getRMAssertion();
-            RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
-            if (null != bri) {
-                val = bri.getMilliseconds();
-            }
-        }
-        if (null != val) {
-            return val.longValue();
-        }
-        return 0;
-    }
-
-    /**
-     * Determines if exponential backoff should be used in repeated attemprs to
-     * resend the specified message. Returns false if there is at least one
-     * RMAssertion for this message indicating that no exponential backoff
-     * algorithm should be used, or true otherwise.
-     * 
-     * @param message the message
-     * @return true iff the exponential backoff algorithm should be used for the
-     *         message
-     */
-    public boolean useExponentialBackoff(Message message) {
-        if (!PolicyUtils.useExponentialBackoff(message)) {
-            return false;
-        }
-        RMAssertion rma = manager.getRMAssertion();
-        if (null == rma.getExponentialBackoff()) {
-            return false;
-        }
-        return true;
-    }
-
     public void addUnacknowledged(Message message) {
         cacheUnacknowledged(message);
     }
@@ -335,8 +287,11 @@
         protected ResendCandidate(Message m) {
             message = m;
             resends = 0;
-            long baseRetransmissionInterval = getBaseRetransmissionInterval(m);
-            backoff = useExponentialBackoff(m) ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
+            RMAssertion rma = PolicyUtils.getRMAssertion(manager.getRMAssertion(), message);
+            long baseRetransmissionInterval = 
+                rma.getBaseRetransmissionInterval().getMilliseconds().longValue();
+            backoff = null != rma.getExponentialBackoff() 
+                ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
             next = new Date(System.currentTimeMillis() + baseRetransmissionInterval);
             nextInterval = baseRetransmissionInterval * backoff;
             if (null != manager.getTimer()) {

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java Tue May  1 01:49:30 2007
@@ -45,6 +45,7 @@
 public class AbstractRMInterceptorTest extends Assert {
 
     private IMocksControl control;
+    
 
     @Before
     public void setUp() {

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Tue May  1 01:49:30 2007
@@ -36,6 +36,7 @@
 import org.apache.cxf.ws.rm.policy.RMAssertion;
 import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
 import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
 import org.junit.After;
@@ -614,6 +615,38 @@
         EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
         control.replay();
         assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
+        control.verify();
+    }
+    
+    @Test
+    public void testScheduleSequenceTermination() throws SequenceFault, IOException {
+        Timer timer = new Timer();
+        setUpDestination(timer);
+        
+        DestinationSequence seq = new DestinationSequence(id, ref, destination);
+        destination.removeSequence(seq);
+        EasyMock.expectLastCall();
+        
+        Message message = setUpMessage("1");
+        
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme);
+        long arrival = System.currentTimeMillis();
+        EasyMock.expect(rme.getLastApplicationMessage()).andReturn(arrival);
+
+        control.replay();
+        InactivityTimeout iat = new RMAssertion.InactivityTimeout();
+        iat.setMilliseconds(new BigInteger("200"));
+        rma.setInactivityTimeout(iat); 
+        
+        seq.acknowledge(message);
+        
+        try {
+            Thread.sleep(250);
+        } catch (InterruptedException ex) {
+            // ignore
+        }
+        
         control.verify();
     }
     

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java Tue May  1 01:49:30 2007
@@ -44,8 +44,12 @@
 public class RMInInterceptorTest extends Assert {
     
     private IMocksControl control;
+    private RMInInterceptor interceptor;
+    private RMManager manager;
+    private RMEndpoint rme;
     private RMProperties rmps;
     
+    
     @Before
     public void setUp() {
         control = EasyMock.createNiceControl();
@@ -76,8 +80,10 @@
     
     @Test
     public void testHandleCreateSequenceOnServer() throws SequenceFault, RMException {
-        RMInInterceptor interceptor = new RMInInterceptor();         
-        Message message = setupInboundMessage(RMConstants.getCreateSequenceAction(), true);   
+        interceptor = new RMInInterceptor();         
+        Message message = setupInboundMessage(RMConstants.getCreateSequenceAction(), true);  
+        rme.receivedControlMessage();
+        EasyMock.expectLastCall();
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
         
         control.replay();
@@ -86,12 +92,10 @@
     
     @Test
     public void testHandleCreateSequenceOnClient() throws SequenceFault, RMException {
-        RMInInterceptor interceptor = new RMInInterceptor();         
-        Message message = setupInboundMessage(RMConstants.getCreateSequenceAction(), false);       
-        RMManager manager = control.createMock(RMManager.class);
-        interceptor.setManager(manager);
-        RMEndpoint rme = control.createMock(RMEndpoint.class);
-        EasyMock.expect(manager.getReliableEndpoint(message)).andReturn(rme);
+        interceptor = new RMInInterceptor();         
+        Message message = setupInboundMessage(RMConstants.getCreateSequenceAction(), false); 
+        rme.receivedControlMessage();
+        EasyMock.expectLastCall();
         Servant servant = control.createMock(Servant.class);
         EasyMock.expect(rme.getServant()).andReturn(servant);
         CreateSequenceResponseType csr = control.createMock(CreateSequenceResponseType.class);
@@ -119,8 +123,10 @@
     NoSuchMethodException {
         Method m = RMInInterceptor.class.getDeclaredMethod("processAcknowledgments",
             new Class[] {RMProperties.class});
-        RMInInterceptor interceptor = control.createMock(RMInInterceptor.class, new Method[] {m});
+        interceptor = control.createMock(RMInInterceptor.class, new Method[] {m});
         Message message = setupInboundMessage(RMConstants.getSequenceAckAction(), onServer);
+        rme.receivedControlMessage();
+        EasyMock.expectLastCall();
         interceptor.processAcknowledgments(rmps);
         EasyMock.expectLastCall();
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
@@ -140,8 +146,9 @@
     }
     
     private void testHandleTerminateSequence(boolean onServer) throws SequenceFault, RMException {
-        RMInInterceptor interceptor = new RMInInterceptor();
+        interceptor = new RMInInterceptor();
         Message message = setupInboundMessage(RMConstants.getTerminateSequenceAction(), onServer);
+        rme.receivedControlMessage();
         EasyMock.expectLastCall();
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
 
@@ -168,11 +175,9 @@
                                                             new Class[] {Destination.class, Message.class});
         Method m4 = RMInInterceptor.class.getDeclaredMethod("processDeliveryAssurance",
                                                             new Class[] {RMProperties.class});
-        RMInInterceptor interceptor = control
+        interceptor = control
             .createMock(RMInInterceptor.class, new Method[] {m1, m2, m3, m4});
         Message message = setupInboundMessage("greetMe", true);
-        RMManager manager = control.createMock(RMManager.class);
-        interceptor.setManager(manager);
         Destination d = control.createMock(Destination.class);
         EasyMock.expect(manager.getDestination(message)).andReturn(d);
         interceptor.processAcknowledgments(rmps);
@@ -184,6 +189,8 @@
         interceptor.processDeliveryAssurance(rmps);
         EasyMock.expectLastCall();
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+        rme.receivedApplicationMessage();
+        EasyMock.expectLastCall();
 
         control.replay();
         interceptor.handle(message);
@@ -191,8 +198,8 @@
     
     @Test
     public void testProcessAcknowledgments() {
-        RMInInterceptor interceptor = new RMInInterceptor();
-        RMManager manager = control.createMock(RMManager.class);
+        interceptor = new RMInInterceptor();
+        manager = control.createMock(RMManager.class);
         interceptor.setManager(manager);
         SequenceAcknowledgement ack1 = control.createMock(SequenceAcknowledgement.class);
         SequenceAcknowledgement ack2 = control.createMock(SequenceAcknowledgement.class);
@@ -232,7 +239,7 @@
         destination.acknowledge(message);
         EasyMock.expectLastCall();        
         control.replay();
-        RMInInterceptor interceptor = new RMInInterceptor();
+        interceptor = new RMInInterceptor();
         interceptor.processSequence(destination, message);
     }
     
@@ -267,6 +274,11 @@
         org.apache.cxf.transport.Destination td = 
             serverSide ? control.createMock(org.apache.cxf.transport.Destination.class) : null;
         EasyMock.expect(exchange.getDestination()).andReturn(td);
+        
+        manager = control.createMock(RMManager.class);
+        interceptor.setManager(manager);
+        rme = control.createMock(RMEndpoint.class);
+        EasyMock.expect(manager.getReliableEndpoint(message)).andReturn(rme);
         return message;
     }
     

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java Tue May  1 01:49:30 2007
@@ -28,9 +28,12 @@
 import org.apache.cxf.ws.policy.AssertionInfoMap;
 import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
 import org.apache.cxf.ws.rm.RMConstants;
+import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.ExponentialBackoff;
+import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
 import org.easymock.IMocksControl;
 import org.easymock.classextension.EasyMock;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,133 +50,120 @@
         control = EasyMock.createNiceControl();
     }
     
-    @After
-    public void tearDown() {
-        control.verify();
+    @Test
+    public void testRMAssertionEquals() {
+        RMAssertion a = new RMAssertion();
+        assertTrue(PolicyUtils.equals(a, a));
+        
+        RMAssertion b = new RMAssertion();
+        assertTrue(PolicyUtils.equals(a, b));
+        
+        InactivityTimeout iat = new RMAssertion.InactivityTimeout();
+        iat.setMilliseconds(BigInteger.TEN);
+        a.setInactivityTimeout(iat);
+        assertTrue(!PolicyUtils.equals(a, b));
+        b.setInactivityTimeout(iat);
+        assertTrue(PolicyUtils.equals(a, b));
+        
+        ExponentialBackoff eb = new RMAssertion.ExponentialBackoff();
+        a.setExponentialBackoff(eb);
+        assertTrue(!PolicyUtils.equals(a, b));
+        b.setExponentialBackoff(eb);
+        assertTrue(PolicyUtils.equals(a, b));    
     }
     
     @Test
-    public void testGetBaseRetranmissionInterval() {
-        Message message = control.createMock(Message.class);
-        AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
-        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
-        AssertionInfo ai1 = control.createMock(AssertionInfo.class);
-        AssertionInfo ai2 =  control.createMock(AssertionInfo.class);
-        AssertionInfo ai3 =  control.createMock(AssertionInfo.class);
-        AssertionInfo ai4 =  control.createMock(AssertionInfo.class);
-        Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
-        ais.add(ai1);
-        ais.add(ai2);
-        ais.add(ai3);
-        ais.add(ai4);
-        EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
-        JaxbAssertion ja1 =  control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
-        RMAssertion rma1 =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja1.getData()).andReturn(rma1);
-        EasyMock.expect(rma1.getBaseRetransmissionInterval()).andReturn(null);
-        JaxbAssertion ja2 =  control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
-        RMAssertion rma2 =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja2.getData()).andReturn(rma2);
-        RMAssertion.BaseRetransmissionInterval bri2 = 
-            control.createMock(RMAssertion.BaseRetransmissionInterval.class);
-        EasyMock.expect(rma2.getBaseRetransmissionInterval()).andReturn(bri2);
-        EasyMock.expect(bri2.getMilliseconds()).andReturn(null);
-        JaxbAssertion ja3 =  control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
-        RMAssertion rma3 =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja3.getData()).andReturn(rma3);
-        RMAssertion.BaseRetransmissionInterval bri3 = 
-            control.createMock(RMAssertion.BaseRetransmissionInterval.class);
-        EasyMock.expect(rma3.getBaseRetransmissionInterval()).andReturn(bri3);
-        EasyMock.expect(bri3.getMilliseconds()).andReturn(new BigInteger("10000"));
-        JaxbAssertion ja4 =  control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
-        RMAssertion rma4 =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja4.getData()).andReturn(rma4);
-        RMAssertion.BaseRetransmissionInterval bri4 = 
-            control.createMock(RMAssertion.BaseRetransmissionInterval.class);
-        EasyMock.expect(rma4.getBaseRetransmissionInterval()).andReturn(bri4);
-        EasyMock.expect(bri4.getMilliseconds()).andReturn(new BigInteger("5000"));
+    public void testIntersect() {
+        RMAssertion a = new RMAssertion();
+        RMAssertion b = new RMAssertion();
+        assertSame(a, PolicyUtils.intersect(a, b));
         
-        control.replay();
-        assertEquals("Unexpected value for base retransmission interval", 
-                     5000, PolicyUtils.getBaseRetransmissionInterval(message).intValue());
+        InactivityTimeout aiat = new RMAssertion.InactivityTimeout();
+        aiat.setMilliseconds(new BigInteger("3600000"));
+        a.setInactivityTimeout(aiat);
+        InactivityTimeout biat = new RMAssertion.InactivityTimeout();
+        biat.setMilliseconds(new BigInteger("7200000"));
+        b.setInactivityTimeout(biat);
+        
+        RMAssertion c = PolicyUtils.intersect(a, b);
+        assertEquals(7200000L, c.getInactivityTimeout().getMilliseconds().longValue());
+        assertNull(c.getBaseRetransmissionInterval());
+        assertNull(c.getAcknowledgementInterval());
+        assertNull(c.getExponentialBackoff());
+        
+        BaseRetransmissionInterval abri = new RMAssertion.BaseRetransmissionInterval();
+        abri.setMilliseconds(new BigInteger("10000"));
+        a.setBaseRetransmissionInterval(abri);
+        BaseRetransmissionInterval bbri = new RMAssertion.BaseRetransmissionInterval();
+        bbri.setMilliseconds(new BigInteger("20000"));
+        b.setBaseRetransmissionInterval(bbri);
+        
+        c = PolicyUtils.intersect(a, b);
+        assertEquals(7200000L, c.getInactivityTimeout().getMilliseconds().longValue());
+        assertEquals(10000L, c.getBaseRetransmissionInterval().getMilliseconds().longValue());
+        assertNull(c.getAcknowledgementInterval());
+        assertNull(c.getExponentialBackoff());
+       
+        AcknowledgementInterval aai = new RMAssertion.AcknowledgementInterval();
+        aai.setMilliseconds(new BigInteger("2000"));
+        a.setAcknowledgementInterval(aai);
+        
+        c = PolicyUtils.intersect(a, b);
+        assertEquals(7200000L, c.getInactivityTimeout().getMilliseconds().longValue());
+        assertEquals(10000L, c.getBaseRetransmissionInterval().getMilliseconds().longValue());
+        assertEquals(2000L, c.getAcknowledgementInterval().getMilliseconds().longValue());
+        assertNull(c.getExponentialBackoff());
+        
+        b.setExponentialBackoff(new RMAssertion.ExponentialBackoff());
+        c = PolicyUtils.intersect(a, b);
+        assertEquals(7200000L, c.getInactivityTimeout().getMilliseconds().longValue());
+        assertEquals(10000L, c.getBaseRetransmissionInterval().getMilliseconds().longValue());
+        assertEquals(2000L, c.getAcknowledgementInterval().getMilliseconds().longValue());
+        assertNotNull(c.getExponentialBackoff());    
     }
     
     @Test
-    public void testUseExponentialBackoff() {
+    public void testGetRMAssertion() {
+        RMAssertion a = new RMAssertion();
+        BaseRetransmissionInterval abri = new RMAssertion.BaseRetransmissionInterval();
+        abri.setMilliseconds(new BigInteger("3000"));
+        a.setBaseRetransmissionInterval(abri);
+        a.setExponentialBackoff(new RMAssertion.ExponentialBackoff());
+        
         Message message = control.createMock(Message.class);
+        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+        control.replay();
+        assertSame(a, PolicyUtils.getRMAssertion(a, message));
+        control.verify();
+        
+        control.reset();
         AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
-        AssertionInfo ai = control.createMock(AssertionInfo.class);
         Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
         EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
-        ais.add(ai);
-        JaxbAssertion ja = control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai.getAssertion()).andReturn(ja);
-        RMAssertion rma =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja.getData()).andReturn(rma);
-        EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
         control.replay();
-        assertTrue("Should not use exponential backoff", !PolicyUtils.useExponentialBackoff(message));
+        assertSame(a, PolicyUtils.getRMAssertion(a, message));
         control.verify();
+        
         control.reset();
-        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
-        control.replay();
-        assertTrue("Should use exponential backoff", PolicyUtils.useExponentialBackoff(message));    
-    }
-   
-    @Test
-    public void testGetAcknowledgmentInterval() {
-        Message message = control.createMock(Message.class);
-        AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
+        RMAssertion b = new RMAssertion();
+        BaseRetransmissionInterval bbri = new RMAssertion.BaseRetransmissionInterval();
+        bbri.setMilliseconds(new BigInteger("2000"));
+        b.setBaseRetransmissionInterval(bbri);
+        JaxbAssertion<RMAssertion> assertion = new JaxbAssertion<RMAssertion>();
+        assertion.setName(RMConstants.getRMAssertionQName());
+        assertion.setData(b);
+        AssertionInfo ai = new AssertionInfo(assertion);
+        ais.add(ai);
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
-        AssertionInfo ai1 = control.createMock(AssertionInfo.class);
-        AssertionInfo ai2 =  control.createMock(AssertionInfo.class);
-        AssertionInfo ai3 =  control.createMock(AssertionInfo.class);
-        AssertionInfo ai4 =  control.createMock(AssertionInfo.class);
-        Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
-        ais.add(ai1);
-        ais.add(ai2);
-        ais.add(ai3);
-        ais.add(ai4);
         EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
-        JaxbAssertion ja1 =  control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
-        RMAssertion rma1 =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja1.getData()).andReturn(rma1);
-        EasyMock.expect(rma1.getAcknowledgementInterval()).andReturn(null);
-        JaxbAssertion ja2 =  control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
-        RMAssertion rma2 =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja2.getData()).andReturn(rma2);
-        RMAssertion.AcknowledgementInterval aint2 = 
-            control.createMock(RMAssertion.AcknowledgementInterval.class);
-        EasyMock.expect(rma2.getAcknowledgementInterval()).andReturn(aint2);
-        EasyMock.expect(aint2.getMilliseconds()).andReturn(null);
-        JaxbAssertion ja3 =  control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
-        RMAssertion rma3 =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja3.getData()).andReturn(rma3);
-        RMAssertion.AcknowledgementInterval aint3 = 
-            control.createMock(RMAssertion.AcknowledgementInterval.class);
-        EasyMock.expect(rma3.getAcknowledgementInterval()).andReturn(aint3);
-        EasyMock.expect(aint3.getMilliseconds()).andReturn(new BigInteger("10000"));
-        JaxbAssertion ja4 =  control.createMock(JaxbAssertion.class);
-        EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
-        RMAssertion rma4 =  control.createMock(RMAssertion.class);
-        EasyMock.expect(ja4.getData()).andReturn(rma4);
-        RMAssertion.AcknowledgementInterval aint4 = 
-            control.createMock(RMAssertion.AcknowledgementInterval.class);
-        EasyMock.expect(rma4.getAcknowledgementInterval()).andReturn(aint4);
-        EasyMock.expect(aint4.getMilliseconds()).andReturn(new BigInteger("5000"));
-        
         control.replay();
-        assertEquals("Unexpected value for acknowledgment interval", 
-                     5000, PolicyUtils.getAcknowledgmentInterval(message).intValue());
+        RMAssertion c = PolicyUtils.getRMAssertion(a, message);
+        assertNull(c.getAcknowledgementInterval());
+        assertNull(c.getInactivityTimeout());
+        assertEquals(2000L, c.getBaseRetransmissionInterval().getMilliseconds().longValue());
+        assertNotNull(c.getExponentialBackoff());   
+        control.verify();
     }
-    
     
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Tue May  1 01:49:30 2007
@@ -22,17 +22,13 @@
 
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Executor;
 
 import org.apache.cxf.message.Message;
-import org.apache.cxf.ws.policy.AssertionInfo;
 import org.apache.cxf.ws.policy.AssertionInfoMap;
-import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
 import org.apache.cxf.ws.rm.Identifier;
-import org.apache.cxf.ws.rm.RMConstants;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
 import org.apache.cxf.ws.rm.RMProperties;
@@ -108,68 +104,6 @@
     }
     
     @Test
-    public void testGetBaseRetransmissionIntervalFromManager() {
-        Message message = createMock(Message.class);
-        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
-        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
-        EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(null);
-        control.replay();
-        assertEquals("Unexpected value for base retransmission interval", 
-                     0L, queue.getBaseRetransmissionInterval(message));
-        control.verify();
-        control.reset();
-        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
-        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
-        RMAssertion.BaseRetransmissionInterval bri = createMock(RMAssertion.BaseRetransmissionInterval.class);
-        EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
-        EasyMock.expect(bri.getMilliseconds()).andReturn(null);
-        control.replay();
-        assertEquals("Unexpected value for base retransmission interval", 
-                     0L, queue.getBaseRetransmissionInterval(message));
-        control.verify();
-        control.reset();
-        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
-        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
-        EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
-        EasyMock.expect(bri.getMilliseconds()).andReturn(new BigInteger("7000"));
-        control.replay();
-        assertEquals("Unexpected value for base retransmission interval", 
-                     7000L, queue.getBaseRetransmissionInterval(message));
-    }
-    
-    @Test
-    public void testUseExponentialBackoff() {
-        Message message = createMock(Message.class);
-        AssertionInfoMap aim = createMock(AssertionInfoMap.class);
-        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
-        AssertionInfo ai = createMock(AssertionInfo.class);
-        Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
-        EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
-        ais.add(ai);
-        JaxbAssertion ja = createMock(JaxbAssertion.class);
-        EasyMock.expect(ai.getAssertion()).andReturn(ja);
-        EasyMock.expect(ja.getData()).andReturn(rma);
-        EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
-        control.replay();
-        assertTrue("Should not use exponential backoff", !queue.useExponentialBackoff(message));
-        control.verify();
-        control.reset();
-        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
-        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
-        EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
-        control.replay();
-        assertTrue("Should not use exponential backoff", !queue.useExponentialBackoff(message));
-        control.verify();
-        control.reset();
-        EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
-        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
-        RMAssertion.ExponentialBackoff eb = createMock(RMAssertion.ExponentialBackoff.class);
-        EasyMock.expect(rma.getExponentialBackoff()).andReturn(eb);
-        control.replay();
-        assertTrue("Should use exponential backoff", queue.useExponentialBackoff(message));        
-    }
-    
-    @Test
     public void testResendCandidateCtor() {
         Message message = createMock(Message.class);
         setupMessagePolicies(message);
@@ -329,7 +263,7 @@
         setupMessagePolicies(message1);        
         Message message2 =
             setUpMessage("sequence1", messageNumbers[1], false);
-        setupMessagePolicies(message1);
+        setupMessagePolicies(message2);
         ready(false);
         
         sequenceList.add(queue.createResendCandidate(message1));
@@ -385,7 +319,7 @@
     
     private void setupMessagePolicies(Message message) {
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
-        EasyMock.expect(manager.getRMAssertion()).andReturn(rma).times(2);
+        EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
         RMAssertion.BaseRetransmissionInterval bri = 
             createMock(RMAssertion.BaseRetransmissionInterval.class);
         EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Tue May  1 01:49:30 2007
@@ -103,7 +103,8 @@
     private boolean doTestTwowayNonAnonymousDeferred = testAll;
     private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
     private boolean doTestTwowayAtMostOnce = testAll;
-    private boolean doTestInvalidSequence = testAll;
+    private boolean doTestUnknownSequence = testAll;
+    private boolean doTestInactivityTimeout = testAll;
     private boolean doTestOnewayMessageLoss = testAll;
     private boolean doTestOnewayMessageLossAsyncExecutor = testAll;
     private boolean doTestTwowayMessageLoss = testAll;
@@ -698,8 +699,8 @@
     }
     
     @Test
-    public void testInvalidSequence() throws Exception {
-        if (!doTestInvalidSequence) {
+    public void testUnknownSequence() throws Exception {
+        if (!doTestUnknownSequence) {
             return;
         }
         
@@ -734,6 +735,69 @@
         // the third inbound message has a SequenceFault header
         MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
         mf.verifySequenceFault(RMConstants.getUnknownSequenceFaultCode(), false, 1);
+    }
+    
+    @Test
+    public void testInactivityTimeout() throws Exception {
+        if (!doTestInactivityTimeout) {
+            return;
+        }
+        
+        setupGreeter("org/apache/cxf/systest/ws/rm/inactivity-timeout.xml");
+       
+        greeter.greetMe("one");
+        
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException ex) {
+            // ignore
+        }        
+        
+        try {
+            greeter.greetMe("two");
+            fail("Expected fault.");
+        } catch (WebServiceException ex) {
+            SoapFault sf = (SoapFault)ex.getCause();
+            assertEquals("Unexpected fault code.", Soap11.getInstance().getSender(), sf.getFaultCode());
+            assertNull("Unexpected sub code.", sf.getSubCode());
+            assertTrue("Unexpected reason.", sf.getReason().endsWith("is not a known Sequence identifier."));
+        }   
+        
+        awaitMessages(3, 3, 5000);
+        
+        MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+        
+        // Expected outbound:
+        // CreateSequence 
+        // + two requests (second request does not include acknowledgement for first response as 
+        // in the meantime the client has terminated the sequence
+       
+        String[] expectedActions = new String[3];
+        expectedActions[0] = RMConstants.getCreateSequenceAction();        
+        for (int i = 1; i < expectedActions.length; i++) {
+            expectedActions[i] = GREETME_ACTION;
+        }
+        mf.verifyActions(expectedActions, true);
+        mf.verifyMessageNumbers(new String[] {null, "1", "2"}, true);
+        mf.verifyLastMessage(new boolean[3], true);
+        mf.verifyAcknowledgements(new boolean[] {false, false, false}, true);
+ 
+        // Expected inbound:
+        // createSequenceResponse
+        // + 1 response with acknowledgement
+        // + 1 fault without acknowledgement
+        
+        mf.verifyMessages(3, false);
+        expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+                                        null, null};
+        mf.verifyActions(expectedActions, false);
+        mf.verifyMessageNumbers(new String[] {null, "1", null}, false);
+        mf.verifyAcknowledgements(new boolean[] {false, true, false} , false);
+        
+        // the third inbound message has a SequenceFault header
+        
+        mf.verifySequenceFault(RMConstants.getUnknownSequenceFaultCode(), false, 2);
+     
     }
 
     @Test