You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/03/27 15:42:48 UTC

svn commit: r1305836 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/ systests/ws-specs/src/te...

Author: ay
Date: Tue Mar 27 13:42:47 2012
New Revision: 1305836

URL: http://svn.apache.org/viewvc?rev=1305836&view=rev
Log:
unifying the retry interface for CXF-4209

Added:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java
Removed:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java
Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Tue Mar 27 13:42:47 2012
@@ -69,18 +69,18 @@ public class ManagedRMEndpoint implement
     {SimpleType.STRING, SimpleType.LONG, SimpleType.STRING, 
      SimpleType.STRING};
 
-    private static final String[] RETRANSMISSION_STATUS_NAMES = 
-    {"messageNumber", "resends", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
-    private static final String[] RETRANSMISSION_STATUS_DESCRIPTIONS = RETRANSMISSION_STATUS_NAMES;
+    private static final String[] RETRY_STATUS_NAMES = 
+    {"messageNumber", "retries", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
+    private static final String[] RETRY_STATUS_DESCRIPTIONS = RETRY_STATUS_NAMES;
     @SuppressWarnings("rawtypes") // needed as OpenType isn't generic on Java5
-    private static final OpenType[] RETRANSMISSION_STATUS_TYPES =  
+    private static final OpenType[] RETRY_STATUS_TYPES =  
     {SimpleType.LONG, SimpleType.INTEGER, SimpleType.DATE, SimpleType.DATE, SimpleType.LONG, SimpleType.LONG, 
      SimpleType.BOOLEAN, SimpleType.BOOLEAN};
 
     private static CompositeType sourceSequenceType;
     private static CompositeType destinationSequenceType;
 
-    private static CompositeType retransmissionStatusType;
+    private static CompositeType retryStatusType;
 
     private RMEndpoint endpoint;
     
@@ -99,11 +99,11 @@ public class ManagedRMEndpoint implement
                                                         DESTINATION_SEQUENCE_DESCRIPTIONS,
                                                         DESTINATION_SEQUENCE_TYPES);
 
-            retransmissionStatusType = new CompositeType("retransmissionStatus",
-                                                         "retransmissionStatus",
-                                                         RETRANSMISSION_STATUS_NAMES,
-                                                         RETRANSMISSION_STATUS_DESCRIPTIONS,
-                                                         RETRANSMISSION_STATUS_TYPES);
+            retryStatusType = new CompositeType("retryStatus",
+                                                "retryStatus",
+                                                RETRY_STATUS_NAMES,
+                                                RETRY_STATUS_DESCRIPTIONS,
+                                                RETRY_STATUS_TYPES);
             
         } catch (OpenDataException e) {
             // ignore and handle it later
@@ -122,12 +122,23 @@ public class ManagedRMEndpoint implement
     }
 
     @ManagedOperation(description = "Total Number of Queued Messages")
-    public int getQueuedMessageTotalCount() {
-        Source source = endpoint.getSource();
-        RetransmissionQueue queue = endpoint.getManager().getRetransmissionQueue();
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "outbound", description = "The outbound direction") 
+    })
+    public int getQueuedMessageTotalCount(boolean outbound) {
         int count = 0;
-        for (SourceSequence ss : source.getAllSequences()) {
-            count += queue.countUnacknowledged(ss);
+        if (outbound) {
+            Source source = endpoint.getSource();
+            RetransmissionQueue queue = endpoint.getManager().getRetransmissionQueue();
+            for (SourceSequence ss : source.getAllSequences()) {
+                count += queue.countUnacknowledged(ss);
+            }
+        } else {
+//            Destination destination = endpoint.getDestination();
+//            RedeliveryQueue queue = endpoint.getManager().getRedeliveryQueue();
+//            for (DestinationSequence ds : destination.getAllSequences()) {
+//                count += queue.countUndelivered(ds);
+//            }
         }
 
         return count;
@@ -135,16 +146,26 @@ public class ManagedRMEndpoint implement
 
     @ManagedOperation(description = "Number of Queued Messages")
     @ManagedOperationParameters({
-        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"), 
+        @ManagedOperationParameter(name = "outbound", description = "The outbound direction") 
     })
-    public int getQueuedMessageCount(String sid) {
+    public int getQueuedMessageCount(String sid, boolean outbound) {
         RMManager manager = endpoint.getManager();
-        SourceSequence ss = getSourceSeq(sid);
-        if (null == ss) {
-            throw new IllegalArgumentException("no sequence");
+        int count = 0;
+        if (outbound) {
+            SourceSequence ss = getSourceSeq(sid);
+            if (null == ss) {
+                throw new IllegalArgumentException("no sequence");
+            }
+            count = manager.getRetransmissionQueue().countUnacknowledged(ss);
+        } else {
+//            DestinationSequence ds = getDestinationSeq(sid);
+//            if (null == ds) {
+//                throw new IllegalArgumentException("no sequence");
+//            }
+//            count = manager.getRedeliveryQueue().countUndelivered(ds);
         }
-
-        return manager.getRetransmissionQueue().countUnacknowledged(ss);
+        return count;
     }
 
     @ManagedOperation(description = "List of UnAcknowledged Message Numbers")
@@ -239,8 +260,8 @@ public class ManagedRMEndpoint implement
             throw new IllegalArgumentException("no sequence");
         }
         RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
-        RetransmissionStatus rs = rq.getRetransmissionStatus(ss, num);
-        return getRetransmissionStatusProperties(num, rs);
+        RetryStatus rs = rq.getRetransmissionStatus(ss, num);
+        return getRetryStatusProperties(num, rs);
     }
 
     @ManagedOperation(description = "Retransmission Statuses")
@@ -253,16 +274,66 @@ public class ManagedRMEndpoint implement
             throw new IllegalArgumentException("no sequence");
         }
         RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
-        Map<Long, RetransmissionStatus> rsmap = rq.getRetransmissionStatuses(ss);
+        Map<Long, RetryStatus> rsmap = rq.getRetransmissionStatuses(ss);
 
         CompositeData[] rsps = new CompositeData[rsmap.size()];
         int i = 0;
-        for (Map.Entry<Long, RetransmissionStatus> rs : rsmap.entrySet()) {
-            rsps[i++] = getRetransmissionStatusProperties(rs.getKey(), rs.getValue());
+        for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) {
+            rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue());
         }
         return rsps;
     }
 
+//     @ManagedOperation(description = "Redelivery Status")
+//     @ManagedOperationParameters({
+//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
+//         @ManagedOperationParameter(name = "messageNumber", description = "The message number")
+//     })
+//     public CompositeData getRedeliveryStatus(String sid, long num) throws JMException {
+//         DestinationSequence ds = getDestinationSeq(sid);
+//         if (null == ds) {
+//             throw new IllegalArgumentException("no sequence");
+//         }
+//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+//         RetryStatus rs = rq.getRedeliveryStatus(ds, num);
+//         return getRetryStatusProperties(num, rs);
+//     }
+
+//    @ManagedOperation(description = "Redelivery Statuses")
+//     @ManagedOperationParameters({
+//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+//     })
+//     public CompositeData[] getRedeliveryStatuses(String sid) throws JMException {
+//         DestinationSequence ds = getDestinationSeq(sid);
+//         if (null == ds) {
+//             throw new IllegalArgumentException("no sequence");
+//         }
+//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+//         Map<Long, RetryStatus> rsmap = rq.getRedeliveryStatuses(ds);
+//
+//         CompositeData[] rsps = new CompositeData[rsmap.size()];
+//         int i = 0;
+//         for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) {
+//             rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue());
+//         }
+//         return rsps;
+//     }
+
+//     @ManagedOperation(description = "List of UnDelivered Message Numbers")
+//     @ManagedOperationParameters({
+//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+//     })
+//     public Long[] getUnDeliveredMessageIdentifiers(String sid) {
+//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+//         DestinationSequence ds = getDestinationSeq(sid);
+//         if (null == ds) {
+//             throw new IllegalArgumentException("no sequence");
+//         }
+//        
+//         List<Long> numbers = rq.getUndeliveredMessageNumbers(ds);
+//         return numbers.toArray(new Long[numbers.size()]);
+//     }
+
     @ManagedOperation(description = "List of Source Sequence IDs")
     @ManagedOperationParameters({
         @ManagedOperationParameter(name = "expired", description = "The expired sequences included") 
@@ -316,6 +387,32 @@ public class ManagedRMEndpoint implement
         rq.resume(ss);
     }
     
+//     @ManagedOperation(description = "Suspend Redelivery Queue")
+//     @ManagedOperationParameters({
+//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+//     })
+//     public void suspendDestinationQueue(String sid) throws JMException {
+//         DestinationSequence ds = getDestinationSeq(sid);
+//         if (null == ds) {
+//             throw new IllegalArgumentException("no sequence");
+//         }
+//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+//         rq.suspend(ds);
+//     }
+    
+//     @ManagedOperation(description = "Resume Redelivery Queue")
+//     @ManagedOperationParameters({
+//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+//     })
+//     public void resumeDestinationQueue(String sid) throws JMException {
+//         DestinationSequence ds = getDestinationSeq(sid);
+//         if (null == ds) {
+//             throw new JMException("no source sequence");
+//         }
+//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+//         rq.resume(ds);
+//     }
+    
     @ManagedOperation(description = "Current Source Sequence Properties")
     public CompositeData getCurrentSourceSequence() throws JMException {
         Source source = endpoint.getSource();
@@ -363,7 +460,6 @@ public class ManagedRMEndpoint implement
         return sps.toArray(new CompositeData[sps.size()]);
     }
     
-    
     @ManagedOperation(description = "Destination Sequence Properties")
     @ManagedOperationParameters({
         @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier") 
@@ -400,6 +496,36 @@ public class ManagedRMEndpoint implement
         return destination.getSequence(identifier);
     }
 
+    @ManagedOperation(description = "Remove Source Sequence")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier") 
+    })
+    public void removeSourceSequence(String sid) throws JMException {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new JMException("no source sequence");
+        }
+        //TODO use cancel insted of suspend
+        RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+        rq.suspend(ss);
+        ss.getSource().removeSequence(ss);
+    }
+
+    @ManagedOperation(description = "Remove Destination Sequence")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier") 
+    })
+    public void removeDestinationSequence(String sid) throws JMException {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new JMException("no source sequence");
+        }
+        //TODO use cancel insted of suspend
+//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+//         rq.suspend(ds);
+        ds.getDestination().removeSequence(ds);
+    }
+    
     private static String getAddressValue(EndpointReferenceType epr) {
         if (null != epr && null != epr.getAddress()) {
             return epr.getAddress().getValue();
@@ -435,15 +561,13 @@ public class ManagedRMEndpoint implement
         return dsps;
     }
     
-    private CompositeData getRetransmissionStatusProperties(long num, 
-                                                            RetransmissionStatus rs) throws JMException {
+    private CompositeData getRetryStatusProperties(long num, RetryStatus rs) throws JMException {
         CompositeData rsps = null;
         if (null != rs) {
-            Object[] rsv = new Object[] {num, rs.getResends(), rs.getPrevious(), 
+            Object[] rsv = new Object[] {num, rs.getRetries(), rs.getPrevious(), 
                                          rs.getNext(), rs.getNextInterval(),
                                          rs.getBackoff(), rs.isPending(), rs.isSuspended()};
-            rsps = new CompositeDataSupport(retransmissionStatusType,
-                                            RETRANSMISSION_STATUS_NAMES, rsv);
+            rsps = new CompositeDataSupport(retryStatusType, RETRY_STATUS_NAMES, rsv);
         }
         return rsps;
     }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Tue Mar 27 13:42:47 2012
@@ -67,14 +67,14 @@ public interface RetransmissionQueue {
      * @param num
      * @return
      */
-    RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num);
+    RetryStatus getRetransmissionStatus(SourceSequence seq, long num);
     
     /**
      * Return the retransmission status of all the messages assigned to the sequence.
      * @param seq
      * @return
      */
-    Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq);
+    Map<Long, RetryStatus> getRetransmissionStatuses(SourceSequence seq);
         
     /**
      * Initiate resends.

Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java?rev=1305836&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java Tue Mar 27 13:42:47 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.Date;
+
+/**
+ * A generic interface to represent the retrying status of a repeating activity
+ * at some WS-RM component.
+ */
+public interface RetryStatus {
+    /**
+     * @return the next retry time
+     */
+    Date getNext();
+
+    /**
+     * @return the previous retry time
+     */
+    Date getPrevious();
+    
+    /**
+     * @return the number of retries
+     */
+    int getRetries();
+    
+    /**
+     * @return the max number of retries permitted
+     */
+    int getMaxRetries();
+    
+    /**
+     * @return the nextInterval
+     */
+    long getNextInterval();
+    
+    /**
+     * @return the backoff
+     */
+    long getBackoff();
+    
+    /**
+     * @return the pending
+     */
+    boolean isPending();
+    
+    /**
+     * @return the suspended
+     */
+    boolean isSuspended();
+}

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue Mar 27 13:42:47 2012
@@ -62,7 +62,7 @@ import org.apache.cxf.ws.rm.RMProperties
 import org.apache.cxf.ws.rm.RMUtils;
 import org.apache.cxf.ws.rm.RetransmissionCallback;
 import org.apache.cxf.ws.rm.RetransmissionQueue;
-import org.apache.cxf.ws.rm.RetransmissionStatus;
+import org.apache.cxf.ws.rm.RetryStatus;
 import org.apache.cxf.ws.rm.SourceSequence;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RM10PolicyUtils;
@@ -168,7 +168,7 @@ public class RetransmissionQueueImpl imp
         return unacknowledged;
     }
     
-    public RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num) {
+    public RetryStatus getRetransmissionStatus(SourceSequence seq, long num) {
         List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
         if (null != sequenceCandidates) {
             for (int i = 0; i < sequenceCandidates.size(); i++) {
@@ -184,8 +184,8 @@ public class RetransmissionQueueImpl imp
         return null;
     }
     
-    public Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq) {
-        Map<Long, RetransmissionStatus> cp = new HashMap<Long, RetransmissionStatus>();
+    public Map<Long, RetryStatus> getRetransmissionStatuses(SourceSequence seq) {
+        Map<Long, RetryStatus> cp = new HashMap<Long, RetryStatus>();
         List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
         if (null != sequenceCandidates) {
             for (int i = 0; i < sequenceCandidates.size(); i++) {
@@ -469,12 +469,12 @@ public class RetransmissionQueueImpl imp
     /**
      * Represents a candidate for resend, i.e. an unacked outgoing message.
      */
-    protected class ResendCandidate implements Runnable, RetransmissionStatus {
+    protected class ResendCandidate implements Runnable, RetryStatus {
         private Message message;
         private OutputStream out;
         private Date next;
         private TimerTask nextTask;
-        private int resends;
+        private int retries;
         private long nextInterval;
         private long backoff;
         private boolean pending;
@@ -486,7 +486,7 @@ public class RetransmissionQueueImpl imp
          */
         protected ResendCandidate(Message m) {
             message = m;
-            resends = 0;
+            retries = 0;
             out = m.getContent(OutputStream.class);
             org.apache.cxf.ws.rmp.v200502.RMAssertion rma = 
                 RM10PolicyUtils.getRMAssertion(manager.getRMAssertion(), message);
@@ -555,11 +555,18 @@ public class RetransmissionQueueImpl imp
         /**
          * @return number of resend attempts
          */
-        public int getResends() {
-            return resends;
+        public int getRetries() {
+            return retries;
         }
 
         /**
+         * @return number of max resend attempts
+         */
+        public int getMaxRetries() {
+            return 0;
+        }
+        
+        /**
          * @return date of next resend
          */
         public Date getNext() {
@@ -570,7 +577,7 @@ public class RetransmissionQueueImpl imp
          * @return date of previous resend or null if no attempt is yet taken 
          */
         public Date getPrevious() {
-            if (resends > 0) {
+            if (retries > 0) {
                 return new Date(next.getTime() - nextInterval / backoff);
             }
             return null;
@@ -652,7 +659,7 @@ public class RetransmissionQueueImpl imp
          */
         protected synchronized void attempted() {
             pending = false;
-            resends++;
+            retries++;
             if (null != next) {
                 next = new Date(next.getTime() + nextInterval);
                 nextInterval *= backoff;

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Tue Mar 27 13:42:47 2012
@@ -147,7 +147,8 @@ public class ManagedRMManagerTest extend
         assertTrue(o instanceof Integer);
         assertEquals("No deferred acks", 0, o);
         
-        o = mbs.invoke(endpointName, "getQueuedMessageTotalCount", null, null);
+        o = mbs.invoke(endpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, new String[]{"boolean"});
         assertTrue(o instanceof Integer);
         assertEquals("No queued messages", 0, o);
     }
@@ -156,10 +157,10 @@ public class ManagedRMManagerTest extend
     public void testManagedRMEndpointGetQueuedCount() throws Exception {
         ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
 
-        int n = managedEndpoint.getQueuedMessageTotalCount();
+        int n = managedEndpoint.getQueuedMessageTotalCount(true);
         assertEquals(3, n);
         
-        n = managedEndpoint.getQueuedMessageCount("seq1");
+        n = managedEndpoint.getQueuedMessageCount("seq1", true);
         assertEquals(2, n);
     }
     
@@ -263,9 +264,9 @@ public class ManagedRMManagerTest extend
         assertTrue("seq3".equals(key) || "seq4".equals(key)); 
     }
 
-    private void verifyRetransmissionStatus(CompositeData cd, long num, RetransmissionStatus status) {
+    private void verifyRetransmissionStatus(CompositeData cd, long num, RetryStatus status) {
         assertEquals(num, cd.get("messageNumber"));
-        assertEquals(status.getResends(), cd.get("resends"));
+        assertEquals(status.getRetries(), cd.get("retries"));
         assertEquals(status.getNext(), cd.get("next"));
         assertEquals(status.getPrevious(), cd.get("previous"));
         assertEquals(status.getNextInterval(), cd.get("nextInterval"));
@@ -394,7 +395,7 @@ public class ManagedRMManagerTest extend
     
     private class TestRetransmissionQueue implements RetransmissionQueue {
         private Set<String> suspended = new HashSet<String>();
-        private RetransmissionStatus status = new TestRetransmissionStatus();
+        private RetryStatus status = new TestRetransmissionStatus();
         
         public int countUnacknowledged(SourceSequence seq) {
             final String key = seq.getIdentifier().getValue(); 
@@ -430,7 +431,7 @@ public class ManagedRMManagerTest extend
             return list;
         }
 
-        public RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num) {
+        public RetryStatus getRetransmissionStatus(SourceSequence seq, long num) {
             final String key = seq.getIdentifier().getValue();
             if (("seq1".equals(key) && (2L == num || 4L == num)) 
                 || ("seq2".equals(key) && (2L == num))) {
@@ -439,7 +440,7 @@ public class ManagedRMManagerTest extend
             return null;
         }
 
-        public Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq) {
+        public Map<Long, RetryStatus> getRetransmissionStatuses(SourceSequence seq) {
             // TODO Auto-generated method stub
             return null;
         }
@@ -464,12 +465,12 @@ public class ManagedRMManagerTest extend
             return suspended.contains(sid);
         }
         
-        RetransmissionStatus getRetransmissionStatus() {
+        RetryStatus getRetransmissionStatus() {
             return status;
         }
     }
     
-    private static class TestRetransmissionStatus implements RetransmissionStatus {
+    private static class TestRetransmissionStatus implements RetryStatus {
         private long interval = 300000L;
         private Date next = new Date(System.currentTimeMillis() + interval / 2);
         private Date previous = new Date(next.getTime() - interval);
@@ -482,10 +483,14 @@ public class ManagedRMManagerTest extend
             return previous;
         }
 
-        public int getResends() {
+        public int getRetries() {
             return 2;
         }
 
+        public int getMaxRetries() {
+            return -1;
+        }
+
         public long getNextInterval() {
             return interval;
         }

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Tue Mar 27 13:42:47 2012
@@ -106,7 +106,7 @@ public class RetransmissionQueueImplTest
         long now = System.currentTimeMillis();
         RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
         assertSame(message, candidate.getMessage());
-        assertEquals(0, candidate.getResends());
+        assertEquals(0, candidate.getRetries());
         Date refDate = new Date(now + 5000);
         assertTrue(!candidate.getNext().before(refDate));
         refDate = new Date(now + 7000);
@@ -122,7 +122,7 @@ public class RetransmissionQueueImplTest
         long now = System.currentTimeMillis();
         RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
         candidate.attempted();
-        assertEquals(1, candidate.getResends());
+        assertEquals(1, candidate.getRetries());
         Date refDate = new Date(now + 15000);
         assertTrue(!candidate.getNext().before(refDate));
         refDate = new Date(now + 17000);

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java Tue Mar 27 13:42:47 2012
@@ -154,11 +154,12 @@ public class ManagedEndpointsTest extend
                        new Object[]{true}, new String[]{"boolean"});
         verifyArray("Expected sequence identifier", o, new String[]{dseqId}); 
         
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, new String[]{"boolean"});
         assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
 
         o = mbs.invoke(clientEndpointName, "getQueuedMessageCount",
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId, true}, new String[]{"java.lang.String", "boolean"});
         assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
 
         o = mbs.invoke(clientEndpointName, "getCurrentSourceSequence", null, null);
@@ -180,7 +181,8 @@ public class ManagedEndpointsTest extend
         greeter.greetMeOneWay("two"); // getting lost
         greeter.greetMeOneWay("three"); // sent
         
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, new String[]{"boolean"});
         assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
 
         o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
@@ -203,7 +205,8 @@ public class ManagedEndpointsTest extend
         LOG.info("waiting for 12 secs for the retry to complete ...");
         Thread.sleep(12000);
 
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, new String[]{"boolean"});
         assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
 
         o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
@@ -260,7 +263,8 @@ public class ManagedEndpointsTest extend
         greeter.greetMeOneWay("two"); // sent but suspended
         greeter.greetMeOneWay("three"); // sent but suspended
 
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, new String[]{"boolean"});
         assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
 
         mbs.invoke(clientEndpointName, "suspendSourceQueue", 
@@ -272,7 +276,8 @@ public class ManagedEndpointsTest extend
         LOG.info("waiting for 10 secs for the retry (suspended)...");
         Thread.sleep(10000);
 
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, new String[]{"boolean"});
         assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
 
         mbs.invoke(clientEndpointName, "resumeSourceQueue", 
@@ -282,7 +287,8 @@ public class ManagedEndpointsTest extend
         LOG.info("waiting for 15 secs for the retry (resumed)...");
         Thread.sleep(10000);
 
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, new String[]{"boolean"});
         assertTrue("No queued messages", o instanceof Integer && 0 == ((Integer)o).intValue());
     }
 
@@ -323,7 +329,7 @@ public class ManagedEndpointsTest extend
         assertTrue(value instanceof CompositeData);
         CompositeData cd = (CompositeData)value;
         verifyValue(cd, "messageNumber", num);
-        verifyValue(cd, "resends", count);
+        verifyValue(cd, "retries", count);
         Date now = new Date();
         if (count > 0) {
             assertTrue(now.after((Date)getValue(cd, "previous")));