You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/12/13 15:49:23 UTC
svn commit: r1421320 - in /cxf/trunk/rt/ws/rm/src:
main/java/org/apache/cxf/ws/rm/ main/java/org/apache/cxf/ws/rm/soap/
test/java/org/apache/cxf/ws/rm/
Author: ay
Date: Thu Dec 13 14:49:22 2012
New Revision: 1421320
URL: http://svn.apache.org/viewvc?rev=1421320&view=rev
Log:
[CXF-4700] Add an operation for manually cancel/remove WS-RM messages and sequences
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1421320&r1=1421319&r2=1421320&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Thu Dec 13 14:49:22 2012
@@ -496,9 +496,11 @@ public class ManagedRMEndpoint implement
if (null == ss) {
throw new JMException("no source sequence");
}
- //TODO use cancel instead of suspend
RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
- rq.suspend(ss);
+ if (rq.countUnacknowledged(ss) > 0) {
+ throw new JMException("sequence not empty");
+ }
+ rq.stop(ss);
ss.getSource().removeSequence(ss);
}
@@ -511,12 +513,24 @@ public class ManagedRMEndpoint implement
if (null == ds) {
throw new JMException("no source sequence");
}
- //TODO use cancel instead of suspend
// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
// rq.suspend(ds);
ds.getDestination().removeSequence(ds);
}
+ @ManagedOperation(description = "Purge UnAcknowledged Messages")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public void purgeUnAcknowledgedMessages(String sid) {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+ rq.purgeAll(ss);
+ }
+
private static String getAddressValue(EndpointReferenceType epr) {
if (null != epr && null != epr.getAddress()) {
return epr.getAddress().getValue();
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1421320&r1=1421319&r2=1421320&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Thu Dec 13 14:49:22 2012
@@ -60,6 +60,13 @@ public interface RetransmissionQueue {
void purgeAcknowledged(SourceSequence seq);
/**
+ * Purge all candiates for the given sequence.
+ *
+ * @param seq the sequence object
+ */
+ void purgeAll(SourceSequence seq);
+
+ /**
*
* @param seq
* @return
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1421320&r1=1421319&r2=1421320&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Thu Dec 13 14:49:22 2012
@@ -131,6 +131,21 @@ public class RetransmissionQueueImpl imp
* @param seq the sequence object.
*/
public void purgeAcknowledged(SourceSequence seq) {
+ purgeCandidates(seq, false);
+ }
+
+ /**
+ * Purge all candidates for the given sequence. This method is used to
+ * terminate the sequence by force and release the resource associated
+ * with the sequence.
+ *
+ * @param seq the sequence object.
+ */
+ public void purgeAll(SourceSequence seq) {
+ purgeCandidates(seq, true);
+ }
+
+ private void purgeCandidates(SourceSequence seq, boolean any) {
Collection<Long> purged = new ArrayList<Long>();
synchronized (this) {
LOG.fine("Start purging resend candidates.");
@@ -139,7 +154,7 @@ public class RetransmissionQueueImpl imp
for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
ResendCandidate candidate = sequenceCandidates.get(i);
long m = candidate.getNumber();
- if (seq.isAcknowledged(m)) {
+ if (any || seq.isAcknowledged(m)) {
sequenceCandidates.remove(i);
candidate.resolved();
unacknowledgedCount--;
@@ -260,7 +275,7 @@ public class RetransmissionQueueImpl imp
}
}
}
-
+
/**
* @return the exponential backoff
*/
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1421320&r1=1421319&r2=1421320&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Thu Dec 13 14:49:22 2012
@@ -20,7 +20,9 @@
package org.apache.cxf.ws.rm;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -174,6 +176,72 @@ public class ManagedRMManagerTest extend
}
@Test
+ public void testRemoveSequence() throws Exception {
+ manager = new RMManager();
+ RMEndpoint rme = control.createMock(RMEndpoint.class);
+ EndpointReferenceType ref = RMUtils.createReference(TEST_URI);
+ Source source = new Source(rme);
+ Destination destination = new Destination(rme);
+
+ RetransmissionQueue rq = new TestRetransmissionQueue();
+ manager.setRetransmissionQueue(rq);
+ manager.initialise();
+
+ SourceSequence ss1 = createTestSourceSequence(source, "seq1", ref,
+ ProtocolVariation.RM10WSA200408, new long[]{1L, 1L, 3L, 3L});
+ SourceSequence ss3 = createTestSourceSequence(source, "seq3", ref,
+ ProtocolVariation.RM10WSA200408, new long[]{1L, 5L});
+
+ EasyMock.expect(rme.getManager()).andReturn(manager).anyTimes();
+ EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+ EasyMock.expect(rme.getDestination()).andReturn(destination).anyTimes();
+
+ control.replay();
+ setCurrentMessageNumber(ss1, 5L);
+ setCurrentMessageNumber(ss3, 5L);
+ source.addSequence(ss1);
+ source.addSequence(ss3);
+ source.setCurrent(ss3);
+
+ ManagedRMEndpoint managedEndpoint = new ManagedRMEndpoint(rme);
+
+ // for those sequences without any unacknowledged messages
+ CompositeData cd = managedEndpoint.getSourceSequence("seq3");
+ assertNotNull(cd);
+
+ managedEndpoint.removeSourceSequence("seq3");
+ try {
+ cd = managedEndpoint.getSourceSequence("seq3");
+ fail("sequnce not removed");
+ } catch (Exception e) {
+ // ok
+ }
+
+ // for those sequences with some unacknowledged messages
+ cd = managedEndpoint.getSourceSequence("seq1");
+ assertNotNull(cd);
+
+ try {
+ managedEndpoint.removeSourceSequence("seq1");
+ fail("sequnce may not be removed");
+ } catch (Exception e) {
+ // ok
+ }
+ cd = managedEndpoint.getSourceSequence("seq1");
+ assertNotNull(cd);
+
+ managedEndpoint.purgeUnAcknowledgedMessages("seq1");
+ managedEndpoint.removeSourceSequence("seq1");
+
+ try {
+ cd = managedEndpoint.getSourceSequence("seq1");
+ fail("sequnce not removed");
+ } catch (Exception e) {
+ // ok
+ }
+ }
+
+ @Test
public void testGetSourceSequenceAcknowledgedRange() throws Exception {
ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
@@ -400,15 +468,19 @@ public class ManagedRMManagerTest extend
private class TestRetransmissionQueue implements RetransmissionQueue {
private Set<String> suspended = new HashSet<String>();
private RetryStatus status = new TestRetransmissionStatus();
+ private Map<String, List<Long>> numlists = new HashMap<String, List<Long>>();
+
+ public TestRetransmissionQueue() {
+ numlists.put("seq1", new ArrayList<Long>());
+ numlists.put("seq2", new ArrayList<Long>());
+ Collections.addAll(numlists.get("seq1"), 2L, 4L);
+ Collections.addAll(numlists.get("seq2"), 3L);
+ }
public int countUnacknowledged(SourceSequence seq) {
- final String key = seq.getIdentifier().getValue();
- if ("seq1".equals(key)) {
- return 2;
- } else if ("seq2".equals(key)) {
- return 1;
- }
- return 0;
+ final String key = seq.getIdentifier().getValue();
+ List<Long> list = numlists.get(key);
+ return list != null ? list.size() : 0;
}
public boolean isEmpty() {
@@ -423,25 +495,24 @@ public class ManagedRMManagerTest extend
// TODO Auto-generated method stub
}
- public List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq) {
- List<Long> list = new ArrayList<Long>();
+ public void purgeAll(SourceSequence seq) {
final String key = seq.getIdentifier().getValue();
- if ("seq1".equals(key)) {
- list.add(2L);
- list.add(4L);
- } else if ("seq2".equals(key)) {
- list.add(3L);
+ List<Long> list = numlists.get(key);
+ if (list != null) {
+ list.clear();
}
- return list;
+ }
+
+ public List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq) {
+ final String key = seq.getIdentifier().getValue();
+ List<Long> list = numlists.get(key);
+ return list != null ? list : new ArrayList<Long>();
}
public RetryStatus getRetransmissionStatus(SourceSequence seq, long num) {
final String key = seq.getIdentifier().getValue();
- if (("seq1".equals(key) && (2L == num || 4L == num))
- || ("seq2".equals(key) && (2L == num))) {
- return status;
- }
- return null;
+ List<Long> list = numlists.get(key);
+ return list.contains(num) ? status : null;
}
public Map<Long, RetryStatus> getRetransmissionStatuses(SourceSequence seq) {
@@ -474,7 +545,7 @@ public class ManagedRMManagerTest extend
}
public int countUnacknowledged() {
- return 3;
+ return numlists.get("seq1").size() + numlists.get("seq2").size();
}
}