You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/12/13 15:49:23 UTC

svn commit: r1421320 - in /cxf/trunk/rt/ws/rm/src: main/java/org/apache/cxf/ws/rm/ main/java/org/apache/cxf/ws/rm/soap/ test/java/org/apache/cxf/ws/rm/

Author: ay
Date: Thu Dec 13 14:49:22 2012
New Revision: 1421320

URL: http://svn.apache.org/viewvc?rev=1421320&view=rev
Log:
[CXF-4700] Add an operation for manually cancel/remove WS-RM messages and sequences

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1421320&r1=1421319&r2=1421320&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Thu Dec 13 14:49:22 2012
@@ -496,9 +496,11 @@ public class ManagedRMEndpoint implement
         if (null == ss) {
             throw new JMException("no source sequence");
         }
-        //TODO use cancel instead of suspend
         RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
-        rq.suspend(ss);
+        if (rq.countUnacknowledged(ss) > 0) {
+            throw new JMException("sequence not empty");
+        }
+        rq.stop(ss);
         ss.getSource().removeSequence(ss);
     }
 
@@ -511,12 +513,24 @@ public class ManagedRMEndpoint implement
         if (null == ds) {
             throw new JMException("no source sequence");
         }
-        //TODO use cancel instead of suspend
 //         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
 //         rq.suspend(ds);
         ds.getDestination().removeSequence(ds);
     }
     
+    @ManagedOperation(description = "Purge UnAcknowledged Messages")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public void purgeUnAcknowledgedMessages(String sid) {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+        rq.purgeAll(ss);
+    }
+
     private static String getAddressValue(EndpointReferenceType epr) {
         if (null != epr && null != epr.getAddress()) {
             return epr.getAddress().getValue();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1421320&r1=1421319&r2=1421320&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Thu Dec 13 14:49:22 2012
@@ -60,6 +60,13 @@ public interface RetransmissionQueue {
     void purgeAcknowledged(SourceSequence seq);
     
     /**
+     * Purge all candiates for the given sequence.
+     * 
+     * @param seq the sequence object
+     */
+    void purgeAll(SourceSequence seq);
+    
+    /**
      * 
      * @param seq
      * @return

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1421320&r1=1421319&r2=1421320&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Thu Dec 13 14:49:22 2012
@@ -131,6 +131,21 @@ public class RetransmissionQueueImpl imp
      * @param seq the sequence object.
      */
     public void purgeAcknowledged(SourceSequence seq) {
+        purgeCandidates(seq, false);
+    }
+
+    /**
+     * Purge all candidates for the given sequence. This method is used to 
+     * terminate the sequence by force and release the resource associated
+     * with the sequence.
+     *  
+     * @param seq the sequence object.
+     */
+    public void purgeAll(SourceSequence seq) {
+        purgeCandidates(seq, true);
+    }
+    
+    private void purgeCandidates(SourceSequence seq, boolean any) {
         Collection<Long> purged = new ArrayList<Long>();
         synchronized (this) {
             LOG.fine("Start purging resend candidates.");
@@ -139,7 +154,7 @@ public class RetransmissionQueueImpl imp
                 for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
                     ResendCandidate candidate = sequenceCandidates.get(i);
                     long m = candidate.getNumber();
-                    if (seq.isAcknowledged(m)) {
+                    if (any || seq.isAcknowledged(m)) {
                         sequenceCandidates.remove(i);
                         candidate.resolved();
                         unacknowledgedCount--;
@@ -260,7 +275,7 @@ public class RetransmissionQueueImpl imp
             }           
         }
     }
-
+    
     /**
      * @return the exponential backoff
      */

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1421320&r1=1421319&r2=1421320&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Thu Dec 13 14:49:22 2012
@@ -20,7 +20,9 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -174,6 +176,72 @@ public class ManagedRMManagerTest extend
     }
     
     @Test
+    public void testRemoveSequence() throws Exception {
+        manager = new RMManager(); 
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EndpointReferenceType ref = RMUtils.createReference(TEST_URI);
+        Source source = new Source(rme);
+        Destination destination = new Destination(rme);
+        
+        RetransmissionQueue rq = new TestRetransmissionQueue();
+        manager.setRetransmissionQueue(rq);
+        manager.initialise();
+        
+        SourceSequence ss1 = createTestSourceSequence(source, "seq1", ref, 
+                                                     ProtocolVariation.RM10WSA200408, new long[]{1L, 1L, 3L, 3L});
+        SourceSequence ss3 = createTestSourceSequence(source, "seq3", ref, 
+                                                     ProtocolVariation.RM10WSA200408, new long[]{1L, 5L});
+
+        EasyMock.expect(rme.getManager()).andReturn(manager).anyTimes();
+        EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+        EasyMock.expect(rme.getDestination()).andReturn(destination).anyTimes();
+
+        control.replay();
+        setCurrentMessageNumber(ss1, 5L);
+        setCurrentMessageNumber(ss3, 5L);
+        source.addSequence(ss1);
+        source.addSequence(ss3);
+        source.setCurrent(ss3);
+
+        ManagedRMEndpoint managedEndpoint = new ManagedRMEndpoint(rme);
+        
+        // for those sequences without any unacknowledged messages
+        CompositeData cd = managedEndpoint.getSourceSequence("seq3");
+        assertNotNull(cd);
+        
+        managedEndpoint.removeSourceSequence("seq3");
+        try {
+            cd = managedEndpoint.getSourceSequence("seq3");
+            fail("sequnce not removed");
+        } catch (Exception e) {
+            // ok
+        }
+        
+        // for those sequences with some unacknowledged messages        
+        cd = managedEndpoint.getSourceSequence("seq1");
+        assertNotNull(cd);
+        
+        try {
+            managedEndpoint.removeSourceSequence("seq1");
+            fail("sequnce may not be removed");
+        } catch (Exception e) {
+            // ok
+        }
+        cd = managedEndpoint.getSourceSequence("seq1");
+        assertNotNull(cd);
+        
+        managedEndpoint.purgeUnAcknowledgedMessages("seq1");
+        managedEndpoint.removeSourceSequence("seq1");
+
+        try {
+            cd = managedEndpoint.getSourceSequence("seq1");
+            fail("sequnce not removed");
+        } catch (Exception e) {
+            // ok
+        }
+    }
+
+    @Test
     public void testGetSourceSequenceAcknowledgedRange() throws Exception {
         ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
         
@@ -400,15 +468,19 @@ public class ManagedRMManagerTest extend
     private class TestRetransmissionQueue implements RetransmissionQueue {
         private Set<String> suspended = new HashSet<String>();
         private RetryStatus status = new TestRetransmissionStatus();
+        private Map<String, List<Long>> numlists = new HashMap<String, List<Long>>();
+        
+        public TestRetransmissionQueue() {
+            numlists.put("seq1", new ArrayList<Long>());
+            numlists.put("seq2", new ArrayList<Long>());
+            Collections.addAll(numlists.get("seq1"), 2L, 4L);
+            Collections.addAll(numlists.get("seq2"), 3L);
+        }
         
         public int countUnacknowledged(SourceSequence seq) {
-            final String key = seq.getIdentifier().getValue(); 
-            if ("seq1".equals(key)) {
-                return 2;
-            } else if ("seq2".equals(key)) {
-                return 1;
-            }
-            return 0;
+            final String key = seq.getIdentifier().getValue();
+            List<Long> list = numlists.get(key);
+            return list != null ? list.size() : 0;
         }
 
         public boolean isEmpty() {
@@ -423,25 +495,24 @@ public class ManagedRMManagerTest extend
             // TODO Auto-generated method stub
         }
 
-        public List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq) {
-            List<Long> list = new ArrayList<Long>();
+        public void purgeAll(SourceSequence seq) {
             final String key = seq.getIdentifier().getValue(); 
-            if ("seq1".equals(key)) {
-                list.add(2L);
-                list.add(4L);
-            } else if ("seq2".equals(key)) {
-                list.add(3L);
+            List<Long> list = numlists.get(key);
+            if (list != null) {
+                list.clear();
             }
-            return list;
+        }
+
+        public List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq) {
+            final String key = seq.getIdentifier().getValue(); 
+            List<Long> list = numlists.get(key);
+            return list != null ? list : new ArrayList<Long>();
         }
 
         public RetryStatus getRetransmissionStatus(SourceSequence seq, long num) {
             final String key = seq.getIdentifier().getValue();
-            if (("seq1".equals(key) && (2L == num || 4L == num)) 
-                || ("seq2".equals(key) && (2L == num))) {
-                return status;
-            }
-            return null;
+            List<Long> list = numlists.get(key);
+            return list.contains(num) ? status : null;
         }
 
         public Map<Long, RetryStatus> getRetransmissionStatuses(SourceSequence seq) {
@@ -474,7 +545,7 @@ public class ManagedRMManagerTest extend
         }
 
         public int countUnacknowledged() {
-            return 3;
+            return numlists.get("seq1").size() + numlists.get("seq2").size();
         }
     }