You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/05/21 10:50:16 UTC

svn commit: r1340937 - in /cxf/trunk/rt/ws/rm/src: main/java/org/apache/cxf/ws/rm/ main/java/org/apache/cxf/ws/rm/soap/ main/resources/schemas/configuration/ test/java/org/apache/cxf/ws/rm/ test/java/org/apache/cxf/ws/rm/soap/

Author: ay
Date: Mon May 21 08:50:15 2012
New Revision: 1340937

URL: http://svn.apache.org/viewvc?rev=1340937&view=rev
Log:
[CXF-4261] Add maxRetries option to WS-RM's retry logic

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java Mon May 21 08:50:15 2012
@@ -19,16 +19,25 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 
 public class AbstractEndpoint {
     
+    /* the number of currently processing sequences */
+    protected AtomicInteger processingSequenceCount;
+    /* the number of completed sequences since last started */
+    protected AtomicInteger completedSequenceCount;
+    
     private final RMEndpoint reliableEndpoint;
     
     protected AbstractEndpoint(RMEndpoint rme) {
         reliableEndpoint = rme;
+        processingSequenceCount = new AtomicInteger();
+        completedSequenceCount = new AtomicInteger();
     }
     
     public String getName() {
@@ -64,6 +73,14 @@ public class AbstractEndpoint {
     public Identifier generateSequenceIdentifier() {
         return reliableEndpoint.getManager().getIdGenerator().generateSequenceIdentifier();
     }
+
+    int getProcessingSequenceCount() {
+        return processingSequenceCount.get();
+    }
+
+    int getCompletedSequenceCount() {
+        return completedSequenceCount.get();
+    }
     
     private Bus getBus() {
         return reliableEndpoint.getManager().getBus();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Mon May 21 08:50:15 2012
@@ -65,21 +65,31 @@ public class Destination extends Abstrac
 
     public void addSequence(DestinationSequence seq, boolean persist) {
         seq.setDestination(this);
-        map.put(seq.getIdentifier().getValue(), seq);
+        synchronized (map) {
+            map.put(seq.getIdentifier().getValue(), seq);
+        }
         if (persist) {
             RMStore store = getReliableEndpoint().getManager().getStore();
             if (null != store) {
                 store.createDestinationSequence(seq);
             }
         }
+        processingSequenceCount.incrementAndGet();
     }
 
     public void removeSequence(DestinationSequence seq) {
-        map.remove(seq.getIdentifier().getValue());
+        DestinationSequence o;
+        synchronized (map) {
+            o = map.remove(seq.getIdentifier().getValue());
+        }
         RMStore store = getReliableEndpoint().getManager().getStore();
         if (null != store) {
             store.removeDestinationSequence(seq.getIdentifier());
         }
+        if (o != null) {
+            processingSequenceCount.decrementAndGet();
+            completedSequenceCount.incrementAndGet();
+        }
     }
 
     /**

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Mon May 21 08:50:15 2012
@@ -70,11 +70,13 @@ public class ManagedRMEndpoint implement
      SimpleType.STRING, SimpleType.STRING, SimpleType.STRING};
 
     private static final String[] RETRY_STATUS_NAMES = 
-    {"messageNumber", "retries", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
+    {"messageNumber", "retries", "maxRetries", "previous", "next", "nextInterval", 
+     "backOff", "pending", "suspended"};
     private static final String[] RETRY_STATUS_DESCRIPTIONS = RETRY_STATUS_NAMES;
     @SuppressWarnings("rawtypes") // needed as OpenType isn't generic on Java5
     private static final OpenType[] RETRY_STATUS_TYPES =  
-    {SimpleType.LONG, SimpleType.INTEGER, SimpleType.DATE, SimpleType.DATE, SimpleType.LONG, SimpleType.LONG, 
+    {SimpleType.LONG, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.DATE, 
+     SimpleType.DATE, SimpleType.LONG, SimpleType.LONG, 
      SimpleType.BOOLEAN, SimpleType.BOOLEAN};
 
     private static CompositeType sourceSequenceType;
@@ -126,22 +128,12 @@ public class ManagedRMEndpoint implement
         @ManagedOperationParameter(name = "outbound", description = "The outbound direction") 
     })
     public int getQueuedMessageTotalCount(boolean outbound) {
-        int count = 0;
         if (outbound) {
-            Source source = endpoint.getSource();
-            RetransmissionQueue queue = endpoint.getManager().getRetransmissionQueue();
-            for (SourceSequence ss : source.getAllSequences()) {
-                count += queue.countUnacknowledged(ss);
-            }
+            return endpoint.getManager().getRetransmissionQueue().countUnacknowledged(); 
         } else {
-//            Destination destination = endpoint.getDestination();
-//            RedeliveryQueue queue = endpoint.getManager().getRedeliveryQueue();
-//            for (DestinationSequence ds : destination.getAllSequences()) {
-//                count += queue.countUndelivered(ds);
-//            }
+//            return endpoint.getManager().getRedeliveryQueue().countUndelivered();
+            return 0;
         }
-
-        return count;
     }
 
     @ManagedOperation(description = "Number of Queued Messages")
@@ -151,21 +143,20 @@ public class ManagedRMEndpoint implement
     })
     public int getQueuedMessageCount(String sid, boolean outbound) {
         RMManager manager = endpoint.getManager();
-        int count = 0;
         if (outbound) {
             SourceSequence ss = getSourceSeq(sid);
             if (null == ss) {
                 throw new IllegalArgumentException("no sequence");
             }
-            count = manager.getRetransmissionQueue().countUnacknowledged(ss);
+            return manager.getRetransmissionQueue().countUnacknowledged(ss);
         } else {
 //            DestinationSequence ds = getDestinationSeq(sid);
 //            if (null == ds) {
 //                throw new IllegalArgumentException("no sequence");
 //            }
-//            count = manager.getRedeliveryQueue().countUndelivered(ds);
+//            return manager.getRedeliveryQueue().countUndelivered(ds);
+            return 0;
         }
-        return count;
     }
 
     @ManagedOperation(description = "List of UnAcknowledged Message Numbers")
@@ -568,7 +559,7 @@ public class ManagedRMEndpoint implement
     private CompositeData getRetryStatusProperties(long num, RetryStatus rs) throws JMException {
         CompositeData rsps = null;
         if (null != rs) {
-            Object[] rsv = new Object[] {num, rs.getRetries(), rs.getPrevious(), 
+            Object[] rsv = new Object[] {num, rs.getRetries(), rs.getMaxRetries(), rs.getPrevious(), 
                                          rs.getNext(), rs.getNextInterval(),
                                          rs.getBackoff(), rs.isPending(), rs.isSuspended()};
             rsps = new CompositeDataSupport(retryStatusType, RETRY_STATUS_NAMES, rsv);
@@ -598,5 +589,44 @@ public class ManagedRMEndpoint implement
         return endpoint.getLastControlMessage() == 0L ? null 
             : new Date(endpoint.getLastControlMessage());
     }
+
+    @ManagedAttribute(description = "Number of Outbound Queued Messages", currencyTimeLimit = 10)
+    public int getQueuedMessagesOutboundCount() {
+        return endpoint.getManager().getRetransmissionQueue().countUnacknowledged();
+    }
+
+//    @ManagedAttribute(description = "Number of Outbound Completed Messages", currencyTimeLimit = 10)
+//    public int getCompletedMessagesOutboundCount() {
+//        return endpoint.getManager().countCompleted();
+//    }
+
+//    @ManagedAttribute(description = "Number of Inbound Queued Messages", currencyTimeLimit = 10)
+//    public int getQueuedMessagesInboundCount() {
+//        return endpoint.getManager().getRedeliveryQueue().countUndelivered();
+//    }
+
+//    @ManagedAttribute(description = "Number of Inbound Completed Messages", currencyTimeLimit = 10)
+//    public int getCompletedMessagesInboundCount() {
+//        return endpoint.getManager().countCompleted();
+//    }
+
+    @ManagedAttribute(description = "Number of Processing Source Sequences", currencyTimeLimit = 10)
+    public int getProcessingSourceSequenceCount() {
+        return endpoint.getProcessingSourceSequenceCount();
+    }
+     
+    @ManagedAttribute(description = "Number of Completed Source Sequences", currencyTimeLimit = 10)
+    public int getCompletedSourceSequenceCount() {
+        return endpoint.getCompletedSourceSequenceCount();
+    }
+
+    @ManagedAttribute(description = "Number of Processing Destination Sequences", currencyTimeLimit = 10)
+    public int getProcessingDestinationSequenceCount() {
+        return endpoint.getProcessingDestinationSequenceCount();
+    }
     
+    @ManagedAttribute(description = "Number of Completed Destination Sequences", currencyTimeLimit = 10)
+    public int getCompletedDestinationSequenceCount() {
+        return endpoint.getCompletedDestinationSequenceCount();
+    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java Mon May 21 08:50:15 2012
@@ -72,4 +72,15 @@ public class ManagedRMManager implements
     public boolean isUsingStore() {
         return manager.getStore() != null;
     }
+
+    @ManagedAttribute(description = "Total Number of Outbound Queued Messages", currencyTimeLimit = 10)
+    public int getQueuedMessagesOutboundCount() {
+        return manager.getRetransmissionQueue().countUnacknowledged();
+    }
+
+
+//    @ManagedAttribute(description = "Total Number of Inbound Queued Messages", currencyTimeLimit = 10)
+//    public int getQueuedMessagesInboundCount() {
+//        return manager.getRedeliveryQueue().countUndelivered();
+//    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Mon May 21 08:50:15 2012
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -97,6 +98,8 @@ public class RMEndpoint {
     private Servant servant;
     private long lastApplicationMessage;
     private long lastControlMessage;
+    private AtomicInteger applicationMessageCount;
+    private AtomicInteger controlMessageCount;
     private InstrumentationManager instrumentationManager;
     private ManagedRMEndpoint managedEndpoint;
     
@@ -116,6 +119,8 @@ public class RMEndpoint {
         servant = new Servant(this);
         services = new HashMap<ProtocolVariation, WrappedService>();
         endpoints = new HashMap<ProtocolVariation, Endpoint>();
+        applicationMessageCount = new AtomicInteger();
+        controlMessageCount = new AtomicInteger();
     }
 
     /**
@@ -204,10 +209,18 @@ public class RMEndpoint {
     }
 
     /**
+     * @return The number of times when last application message was received.
+     */
+    public int getApplicationMessageCount() {
+        return applicationMessageCount.get();
+    }
+
+    /**
      * Indicates that an application message has been received.
      */
     public void receivedApplicationMessage() {
         lastApplicationMessage = System.currentTimeMillis();
+        applicationMessageCount.incrementAndGet();
     }
 
     /**
@@ -218,10 +231,18 @@ public class RMEndpoint {
     }
 
     /**
+     * @return The number of times when RM protocol message was received.
+     */
+    public int getControlMessageCount() {
+        return controlMessageCount.get();
+    }
+
+    /**
      * Indicates that an RM protocol message has been received.
      */
     public void receivedControlMessage() {
         lastControlMessage = System.currentTimeMillis();
+        controlMessageCount.incrementAndGet();
     }
 
     /**
@@ -687,6 +708,22 @@ public class RMEndpoint {
         // unregistering of this managed bean from the server is done by the bus itself
     }
 
+    int getProcessingSourceSequenceCount() {
+        return source != null ? source.getProcessingSequenceCount() : 0;
+    }
+
+    int getCompletedSourceSequenceCount() {
+        return source != null ? source.getCompletedSequenceCount() : 0;
+    }
+
+    int getProcessingDestinationSequenceCount() {
+        return destination != null ? destination.getProcessingSequenceCount() : 0;
+    }
+
+    int getCompletedDestinationSequenceCount() {
+        return destination != null ? destination.getCompletedSequenceCount() : 0;
+    }
+
     class EffectivePolicyImpl implements EffectivePolicy {
 
         private EndpointPolicy endpointPolicy;

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Mon May 21 08:50:15 2012
@@ -522,72 +522,81 @@ public class RMManager {
         RMEndpoint rme = createReliableEndpoint(endpoint);
         rme.initialise(conduit, null, null);
         reliableEndpoints.put(endpoint, rme);
-        SourceSequence css = null;
         for (SourceSequence ss : sss) {            
- 
-            Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
-            if (null == ms || 0 == ms.size()) {
-                continue;
-            }
-            LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
-            
-            rme.getSource().addSequence(ss, false);
-            // choosing an arbitrary valid source sequence as the current source sequence
-            if (css == null && !ss.isExpired() && !ss.isLastMessage()) {
-                css = ss;
-                rme.getSource().setCurrent(css);
-            }
-            for (RMMessage m : ms) {                
-                
-                Message message = new MessageImpl();
-                Exchange exchange = new ExchangeImpl();
-                message.setExchange(exchange);
-                if (null != conduit) {
-                    exchange.setConduit(conduit);
-                    message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
-                }
-                exchange.put(Endpoint.class, endpoint);
-                exchange.put(Service.class, endpoint.getService());
-                if (endpoint.getEndpointInfo().getService() != null) {
-                    exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService());
-                    exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface());
-                }
-                exchange.put(Binding.class, endpoint.getBinding());
-                exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding());
-                exchange.put(Bus.class, bus);
-                
-                SequenceType st = new SequenceType();
-                st.setIdentifier(ss.getIdentifier());
-                st.setMessageNumber(m.getMessageNumber());
-                RMProperties rmps = new RMProperties();
-                rmps.setSequence(st);
-                if (ss.isLastMessage() && ss.getCurrentMessageNr() == m.getMessageNumber()) {
-                    CloseSequenceType close = new CloseSequenceType();
-                    close.setIdentifier(ss.getIdentifier());
-                    rmps.setCloseSequence(close);
-                }
-                RMContextUtils.storeRMProperties(message, rmps, true);                
-                if (null == conduit) {
-                    String to = m.getTo();
-                    AddressingProperties maps = new AddressingPropertiesImpl();
-                    maps.setTo(RMUtils.createReference(to));
-                    RMContextUtils.storeMAPs(maps, message, true, false);
-                }
-                                    
-                message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
-                RMContextUtils.setProtocolVariation(message, ss.getProtocol());
-                
-                retransmissionQueue.addUnacknowledged(message);
-            }            
+            recoverSourceSequence(endpoint, conduit, rme.getSource(), ss);
         }
         
         for (DestinationSequence ds : dss) {
-            rme.getDestination().addSequence(ds, false);        
+            reconverDestinationSequence(endpoint, conduit, rme.getDestination(), ds);
         }
         retransmissionQueue.start();
         
     }
     
+    private void recoverSourceSequence(Endpoint endpoint, Conduit conduit, Source s, 
+                                       SourceSequence ss) {
+        Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
+        if (null == ms || 0 == ms.size()) {
+            store.removeSourceSequence(ss.getIdentifier());
+            return;
+        }
+        LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
+            
+        s.addSequence(ss, false);
+        // choosing an arbitrary valid source sequence as the current source sequence
+        if (s.getAssociatedSequence(null) == null && !ss.isExpired() && !ss.isLastMessage()) {
+            s.setCurrent(ss);
+        }
+        for (RMMessage m : ms) {                
+            
+            Message message = new MessageImpl();
+            Exchange exchange = new ExchangeImpl();
+            message.setExchange(exchange);
+            if (null != conduit) {
+                exchange.setConduit(conduit);
+                message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+            }
+            exchange.put(Endpoint.class, endpoint);
+            exchange.put(Service.class, endpoint.getService());
+            if (endpoint.getEndpointInfo().getService() != null) {
+                exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService());
+                exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface());
+            }
+            exchange.put(Binding.class, endpoint.getBinding());
+            exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding());
+            exchange.put(Bus.class, bus);
+            
+            SequenceType st = new SequenceType();
+            st.setIdentifier(ss.getIdentifier());
+            st.setMessageNumber(m.getMessageNumber());
+            RMProperties rmps = new RMProperties();
+            rmps.setSequence(st);
+            if (ss.isLastMessage() && ss.getCurrentMessageNr() == m.getMessageNumber()) {
+                CloseSequenceType close = new CloseSequenceType();
+                close.setIdentifier(ss.getIdentifier());
+                rmps.setCloseSequence(close);
+            }
+            RMContextUtils.storeRMProperties(message, rmps, true);                
+            if (null == conduit) {
+                String to = m.getTo();
+                AddressingProperties maps = new AddressingPropertiesImpl();
+                maps.setTo(RMUtils.createReference(to));
+                RMContextUtils.storeMAPs(maps, message, true, false);
+            }
+                                    
+            message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
+            RMContextUtils.setProtocolVariation(message, ss.getProtocol());
+            
+            retransmissionQueue.addUnacknowledged(message);
+        }            
+    }
+
+    private void reconverDestinationSequence(Endpoint endpoint, Conduit conduit, Destination d, 
+                                             DestinationSequence ds) {
+        d.addSequence(ds, false);
+        //TODO add the redelivery code
+    }
+
     RMEndpoint createReliableEndpoint(Endpoint endpoint) {
         return new RMEndpoint(this, endpoint);
     }  

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Mon May 21 08:50:15 2012
@@ -37,6 +37,11 @@ public interface RetransmissionQueue {
     int countUnacknowledged(SourceSequence seq);
     
     /**
+     * @return the total number of unacknowledged messages in this queue
+     */
+    int countUnacknowledged();
+
+    /**
      * @return true if there are no unacknowledged messages in the queue
      */
     boolean isEmpty();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java Mon May 21 08:50:15 2012
@@ -64,21 +64,31 @@ public class Source extends AbstractEndp
     
     public void addSequence(SourceSequence seq, boolean persist) {
         seq.setSource(this);
-        map.put(seq.getIdentifier().getValue(), seq);
+        synchronized (map) {
+            map.put(seq.getIdentifier().getValue(), seq);
+        }
         if (persist) {
             RMStore store = getReliableEndpoint().getManager().getStore();
             if (null != store) {
                 store.createSourceSequence(seq);
             }
         }
+        processingSequenceCount.incrementAndGet();
     }
     
-    public void removeSequence(SourceSequence seq) {        
-        map.remove(seq.getIdentifier().getValue());
+    public void removeSequence(SourceSequence seq) {
+        SourceSequence o;
+        synchronized (map) {
+            o = map.remove(seq.getIdentifier().getValue());
+        }
         RMStore store = getReliableEndpoint().getManager().getStore();
         if (null != store) {
             store.removeSourceSequence(seq.getIdentifier());
         }
+        if (o != null) {
+            processingSequenceCount.decrementAndGet();
+            completedSequenceCount.incrementAndGet();
+        }
     }
     
     /**

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Mon May 21 08:50:15 2012
@@ -66,6 +66,7 @@ import org.apache.cxf.ws.rm.Retransmissi
 import org.apache.cxf.ws.rm.RetransmissionQueue;
 import org.apache.cxf.ws.rm.RetryStatus;
 import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.manager.RetryPolicyType;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RM10PolicyUtils;
 import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -86,6 +87,8 @@ public class RetransmissionQueueImpl imp
     private Resender resender;
     private RMManager manager;
 
+    private int unacknowledgedCount;
+    
     public RetransmissionQueueImpl(RMManager m) {
         manager = m;
     }
@@ -111,6 +114,10 @@ public class RetransmissionQueueImpl imp
         return sequenceCandidates == null ? 0 : sequenceCandidates.size();
     }
 
+    public int countUnacknowledged() {
+        return unacknowledgedCount;
+    }
+
     /**
      * @return true if there are no unacknowledged messages in the queue
      */
@@ -131,13 +138,11 @@ public class RetransmissionQueueImpl imp
             if (null != sequenceCandidates) {
                 for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
                     ResendCandidate candidate = sequenceCandidates.get(i);
-                    RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
-                                                                                  true);
-                    SequenceType st = properties.getSequence();
-                    long m = st.getMessageNumber().longValue();
+                    long m = candidate.getNumber();
                     if (seq.isAcknowledged(m)) {
                         sequenceCandidates.remove(i);
                         candidate.resolved();
+                        unacknowledgedCount--;
                         purged.add(m);
                     }
                 }
@@ -161,10 +166,7 @@ public class RetransmissionQueueImpl imp
         if (null != sequenceCandidates) {
             for (int i = 0; i < sequenceCandidates.size(); i++) {
                 ResendCandidate candidate = sequenceCandidates.get(i);
-                RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
-                                                                              true);
-                SequenceType st = properties.getSequence();
-                unacknowledged.add(st.getMessageNumber());
+                unacknowledged.add(candidate.getNumber());
             }
         }
         return unacknowledged;
@@ -175,10 +177,7 @@ public class RetransmissionQueueImpl imp
         if (null != sequenceCandidates) {
             for (int i = 0; i < sequenceCandidates.size(); i++) {
                 ResendCandidate candidate = sequenceCandidates.get(i);
-                RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
-                                                                              true);
-                SequenceType st = properties.getSequence();
-                if (num == st.getMessageNumber()) {
+                if (num == candidate.getNumber()) {
                     return candidate;
                 }
             }
@@ -192,10 +191,7 @@ public class RetransmissionQueueImpl imp
         if (null != sequenceCandidates) {
             for (int i = 0; i < sequenceCandidates.size(); i++) {
                 ResendCandidate candidate = sequenceCandidates.get(i);
-                RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
-                                                                              true);
-                SequenceType st = properties.getSequence();
-                cp.put(st.getMessageNumber(), candidate);
+                cp.put(candidate.getNumber(), candidate);
             }
         }
         return cp;
@@ -305,6 +301,7 @@ public class RetransmissionQueueImpl imp
                 candidate.suspend();
             }
             sequenceCandidates.add(candidate);
+            unacknowledgedCount++;
         }
         LOG.fine("Cached unacknowledged message.");
         return candidate;
@@ -473,10 +470,12 @@ public class RetransmissionQueueImpl imp
      */
     protected class ResendCandidate implements Runnable, RetryStatus {
         private Message message;
+        private long number;
         private OutputStream out;
         private Date next;
         private TimerTask nextTask;
         private int retries;
+        private int maxRetries;
         private long nextInterval;
         private long backoff;
         private boolean pending;
@@ -498,7 +497,9 @@ public class RetransmissionQueueImpl imp
                 ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
             next = new Date(System.currentTimeMillis() + baseRetransmissionInterval);
             nextInterval = baseRetransmissionInterval * backoff;
-            
+            RetryPolicyType rmrp = null != manager.getSourcePolicy() 
+                ? manager.getSourcePolicy().getRetryPolicy() : null; 
+            maxRetries = null != rmrp ? rmrp.getMaxRetries() : 0;
             
             AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, true);
             AttributedURIType to = null;
@@ -510,6 +511,10 @@ public class RetransmissionQueueImpl imp
                 LOG.log(Level.INFO, "Cannot resend to anonymous target.  Not scheduling a resend.");
                 return;
             }
+            RMProperties rmprops = RMContextUtils.retrieveRMProperties(message, true);
+            if (null != rmprops) {
+                number = rmprops.getSequence().getMessageNumber();
+            }
             if (null != manager.getTimer()) {
                 schedule();
             }
@@ -558,6 +563,10 @@ public class RetransmissionQueueImpl imp
             }
         }
 
+        public long getNumber() {
+            return number;
+        }
+        
         /**
          * @return number of resend attempts
          */
@@ -569,7 +578,7 @@ public class RetransmissionQueueImpl imp
          * @return number of max resend attempts
          */
         public int getMaxRetries() {
-            return 0;
+            return maxRetries;
         }
         
         /**
@@ -666,7 +675,7 @@ public class RetransmissionQueueImpl imp
         protected synchronized void attempted() {
             pending = false;
             retries++;
-            if (null != next) {
+            if (null != next && maxRetries != retries) {
                 next = new Date(next.getTime() + nextInterval);
                 nextInterval *= backoff;
                 schedule();

Modified: cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd (original)
+++ cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd Mon May 21 08:50:15 2012
@@ -42,6 +42,7 @@
         
        <xs:sequence>  
             <xs:element name="sequenceTerminationPolicy" type="tns:SequenceTerminationPolicyType" minOccurs="0"/>
+            <xs:element name="retryPolicy" type="tns:RetryPolicyType" minOccurs="0"/>
        </xs:sequence>
         
         <xs:attribute name="sequenceExpiration" type="xs:duration" use="optional" default="PT0S">
@@ -88,6 +89,7 @@
 
         <xs:sequence>  
             <xs:element name="acksPolicy" type="tns:AcksPolicyType" minOccurs="0"/>
+            <xs:element name="retryPolicy" type="tns:RetryPolicyType" minOccurs="0"/>
         </xs:sequence>
         
         <xs:attribute name="sequenceExpiration" type="xs:duration" use="optional" default="PT0S">
@@ -177,6 +179,17 @@
         </xs:attribute>
 
     </xs:complexType>
+     
+    <xs:complexType name="RetryPolicyType">
+        <xs:attribute name="maxRetries" type="xs:int" use="optional" default="0">
+            <xs:annotation>
+                <xs:documentation>
+                    The maximum number of retries of a message.
+                    A value of 0 means the number is unbound.
+                </xs:documentation>
+            </xs:annotation>      
+        </xs:attribute> 
+    </xs:complexType>
     
     <xs:complexType name="DeliveryAssuranceType">
         <xs:sequence>

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Mon May 21 08:50:15 2012
@@ -472,6 +472,10 @@ public class ManagedRMManagerTest extend
         RetryStatus getRetransmissionStatus() {
             return status;
         }
+
+        public int countUnacknowledged() {
+            return 3;
+        }
     }
     
     private static class TestRetransmissionStatus implements RetryStatus {

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Mon May 21 08:50:15 2012
@@ -31,6 +31,8 @@ import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
 import org.apache.cxf.ws.rm.RMProperties;
 import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.manager.RetryPolicyType;
+import org.apache.cxf.ws.rm.manager.SourcePolicyType;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 import org.apache.cxf.ws.rm.v200702.SequenceType;
@@ -127,7 +129,33 @@ public class RetransmissionQueueImplTest
         assertTrue(!candidate.getNext().before(refDate));
         refDate = new Date(now + 17000);
         assertTrue(!candidate.getNext().after(refDate));
-        assertTrue(!candidate.isPending());        
+        assertTrue(!candidate.isPending());
+    }
+
+    @Test
+    public void testResendCandidateMaxRetries() {
+        Message message = createMock(Message.class);
+        setupMessagePolicies(message);
+        setupRetryPolicy(message);
+        
+        ready(true);
+        RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
+        
+        assertEquals(3, candidate.getMaxRetries());
+        Date next = null;
+        for (int i = 1; i < 3; i++) {
+            next = candidate.getNext();
+            candidate.attempted();
+            assertEquals(i, candidate.getRetries());
+            // the next time must advance
+            assertTrue(candidate.getNext().after(next));
+        }
+        next = candidate.getNext();
+        candidate.attempted();
+        // reaches the max retries
+        assertEquals(3, candidate.getRetries());
+        // the next time must not advance
+        assertFalse(candidate.getNext().after(next));
     }
     
     @Test
@@ -349,7 +377,15 @@ public class RetransmissionQueueImplTest
         EasyMock.expect(rma.getExponentialBackoff()).andReturn(eb);        
     }
     
+    private void setupRetryPolicy(Message message) {
 
+        SourcePolicyType spt = control.createMock(SourcePolicyType.class);
+        EasyMock.expect(manager.getSourcePolicy()).andReturn(spt).anyTimes();
+        RetryPolicyType rpt = control.createMock(RetryPolicyType.class);
+        EasyMock.expect(spt.getRetryPolicy()).andReturn(rpt);
+        EasyMock.expect(rpt.getMaxRetries()).andReturn(3);
+    }
+    
     private void ready(boolean doStart) {
         control.replay();
         if (doStart) {