You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/05/21 10:50:16 UTC
svn commit: r1340937 - in /cxf/trunk/rt/ws/rm/src:
main/java/org/apache/cxf/ws/rm/ main/java/org/apache/cxf/ws/rm/soap/
main/resources/schemas/configuration/ test/java/org/apache/cxf/ws/rm/
test/java/org/apache/cxf/ws/rm/soap/
Author: ay
Date: Mon May 21 08:50:15 2012
New Revision: 1340937
URL: http://svn.apache.org/viewvc?rev=1340937&view=rev
Log:
[CXF-4261] Add maxRetries option to WS-RM's retry logic
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java Mon May 21 08:50:15 2012
@@ -19,16 +19,25 @@
package org.apache.cxf.ws.rm;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.ws.rm.v200702.Identifier;
public class AbstractEndpoint {
+ /* the number of currently processing sequences */
+ protected AtomicInteger processingSequenceCount;
+ /* the number of completed sequences since last started */
+ protected AtomicInteger completedSequenceCount;
+
private final RMEndpoint reliableEndpoint;
protected AbstractEndpoint(RMEndpoint rme) {
reliableEndpoint = rme;
+ processingSequenceCount = new AtomicInteger();
+ completedSequenceCount = new AtomicInteger();
}
public String getName() {
@@ -64,6 +73,14 @@ public class AbstractEndpoint {
public Identifier generateSequenceIdentifier() {
return reliableEndpoint.getManager().getIdGenerator().generateSequenceIdentifier();
}
+
+ int getProcessingSequenceCount() {
+ return processingSequenceCount.get();
+ }
+
+ int getCompletedSequenceCount() {
+ return completedSequenceCount.get();
+ }
private Bus getBus() {
return reliableEndpoint.getManager().getBus();
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Mon May 21 08:50:15 2012
@@ -65,21 +65,31 @@ public class Destination extends Abstrac
public void addSequence(DestinationSequence seq, boolean persist) {
seq.setDestination(this);
- map.put(seq.getIdentifier().getValue(), seq);
+ synchronized (map) {
+ map.put(seq.getIdentifier().getValue(), seq);
+ }
if (persist) {
RMStore store = getReliableEndpoint().getManager().getStore();
if (null != store) {
store.createDestinationSequence(seq);
}
}
+ processingSequenceCount.incrementAndGet();
}
public void removeSequence(DestinationSequence seq) {
- map.remove(seq.getIdentifier().getValue());
+ DestinationSequence o;
+ synchronized (map) {
+ o = map.remove(seq.getIdentifier().getValue());
+ }
RMStore store = getReliableEndpoint().getManager().getStore();
if (null != store) {
store.removeDestinationSequence(seq.getIdentifier());
}
+ if (o != null) {
+ processingSequenceCount.decrementAndGet();
+ completedSequenceCount.incrementAndGet();
+ }
}
/**
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Mon May 21 08:50:15 2012
@@ -70,11 +70,13 @@ public class ManagedRMEndpoint implement
SimpleType.STRING, SimpleType.STRING, SimpleType.STRING};
private static final String[] RETRY_STATUS_NAMES =
- {"messageNumber", "retries", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
+ {"messageNumber", "retries", "maxRetries", "previous", "next", "nextInterval",
+ "backOff", "pending", "suspended"};
private static final String[] RETRY_STATUS_DESCRIPTIONS = RETRY_STATUS_NAMES;
@SuppressWarnings("rawtypes") // needed as OpenType isn't generic on Java5
private static final OpenType[] RETRY_STATUS_TYPES =
- {SimpleType.LONG, SimpleType.INTEGER, SimpleType.DATE, SimpleType.DATE, SimpleType.LONG, SimpleType.LONG,
+ {SimpleType.LONG, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.DATE,
+ SimpleType.DATE, SimpleType.LONG, SimpleType.LONG,
SimpleType.BOOLEAN, SimpleType.BOOLEAN};
private static CompositeType sourceSequenceType;
@@ -126,22 +128,12 @@ public class ManagedRMEndpoint implement
@ManagedOperationParameter(name = "outbound", description = "The outbound direction")
})
public int getQueuedMessageTotalCount(boolean outbound) {
- int count = 0;
if (outbound) {
- Source source = endpoint.getSource();
- RetransmissionQueue queue = endpoint.getManager().getRetransmissionQueue();
- for (SourceSequence ss : source.getAllSequences()) {
- count += queue.countUnacknowledged(ss);
- }
+ return endpoint.getManager().getRetransmissionQueue().countUnacknowledged();
} else {
-// Destination destination = endpoint.getDestination();
-// RedeliveryQueue queue = endpoint.getManager().getRedeliveryQueue();
-// for (DestinationSequence ds : destination.getAllSequences()) {
-// count += queue.countUndelivered(ds);
-// }
+// return endpoint.getManager().getRedeliveryQueue().countUndelivered();
+ return 0;
}
-
- return count;
}
@ManagedOperation(description = "Number of Queued Messages")
@@ -151,21 +143,20 @@ public class ManagedRMEndpoint implement
})
public int getQueuedMessageCount(String sid, boolean outbound) {
RMManager manager = endpoint.getManager();
- int count = 0;
if (outbound) {
SourceSequence ss = getSourceSeq(sid);
if (null == ss) {
throw new IllegalArgumentException("no sequence");
}
- count = manager.getRetransmissionQueue().countUnacknowledged(ss);
+ return manager.getRetransmissionQueue().countUnacknowledged(ss);
} else {
// DestinationSequence ds = getDestinationSeq(sid);
// if (null == ds) {
// throw new IllegalArgumentException("no sequence");
// }
-// count = manager.getRedeliveryQueue().countUndelivered(ds);
+// return manager.getRedeliveryQueue().countUndelivered(ds);
+ return 0;
}
- return count;
}
@ManagedOperation(description = "List of UnAcknowledged Message Numbers")
@@ -568,7 +559,7 @@ public class ManagedRMEndpoint implement
private CompositeData getRetryStatusProperties(long num, RetryStatus rs) throws JMException {
CompositeData rsps = null;
if (null != rs) {
- Object[] rsv = new Object[] {num, rs.getRetries(), rs.getPrevious(),
+ Object[] rsv = new Object[] {num, rs.getRetries(), rs.getMaxRetries(), rs.getPrevious(),
rs.getNext(), rs.getNextInterval(),
rs.getBackoff(), rs.isPending(), rs.isSuspended()};
rsps = new CompositeDataSupport(retryStatusType, RETRY_STATUS_NAMES, rsv);
@@ -598,5 +589,44 @@ public class ManagedRMEndpoint implement
return endpoint.getLastControlMessage() == 0L ? null
: new Date(endpoint.getLastControlMessage());
}
+
+ @ManagedAttribute(description = "Number of Outbound Queued Messages", currencyTimeLimit = 10)
+ public int getQueuedMessagesOutboundCount() {
+ return endpoint.getManager().getRetransmissionQueue().countUnacknowledged();
+ }
+
+// @ManagedAttribute(description = "Number of Outbound Completed Messages", currencyTimeLimit = 10)
+// public int getCompletedMessagesOutboundCount() {
+// return endpoint.getManager().countCompleted();
+// }
+
+// @ManagedAttribute(description = "Number of Inbound Queued Messages", currencyTimeLimit = 10)
+// public int getQueuedMessagesInboundCount() {
+// return endpoint.getManager().getRedeliveryQueue().countUndelivered();
+// }
+
+// @ManagedAttribute(description = "Number of Inbound Completed Messages", currencyTimeLimit = 10)
+// public int getCompletedMessagesInboundCount() {
+// return endpoint.getManager().countCompleted();
+// }
+
+ @ManagedAttribute(description = "Number of Processing Source Sequences", currencyTimeLimit = 10)
+ public int getProcessingSourceSequenceCount() {
+ return endpoint.getProcessingSourceSequenceCount();
+ }
+
+ @ManagedAttribute(description = "Number of Completed Source Sequences", currencyTimeLimit = 10)
+ public int getCompletedSourceSequenceCount() {
+ return endpoint.getCompletedSourceSequenceCount();
+ }
+
+ @ManagedAttribute(description = "Number of Processing Destination Sequences", currencyTimeLimit = 10)
+ public int getProcessingDestinationSequenceCount() {
+ return endpoint.getProcessingDestinationSequenceCount();
+ }
+ @ManagedAttribute(description = "Number of Completed Destination Sequences", currencyTimeLimit = 10)
+ public int getCompletedDestinationSequenceCount() {
+ return endpoint.getCompletedDestinationSequenceCount();
+ }
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java Mon May 21 08:50:15 2012
@@ -72,4 +72,15 @@ public class ManagedRMManager implements
public boolean isUsingStore() {
return manager.getStore() != null;
}
+
+ @ManagedAttribute(description = "Total Number of Outbound Queued Messages", currencyTimeLimit = 10)
+ public int getQueuedMessagesOutboundCount() {
+ return manager.getRetransmissionQueue().countUnacknowledged();
+ }
+
+
+// @ManagedAttribute(description = "Total Number of Inbound Queued Messages", currencyTimeLimit = 10)
+// public int getQueuedMessagesInboundCount() {
+// return manager.getRedeliveryQueue().countUndelivered();
+// }
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Mon May 21 08:50:15 2012
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -97,6 +98,8 @@ public class RMEndpoint {
private Servant servant;
private long lastApplicationMessage;
private long lastControlMessage;
+ private AtomicInteger applicationMessageCount;
+ private AtomicInteger controlMessageCount;
private InstrumentationManager instrumentationManager;
private ManagedRMEndpoint managedEndpoint;
@@ -116,6 +119,8 @@ public class RMEndpoint {
servant = new Servant(this);
services = new HashMap<ProtocolVariation, WrappedService>();
endpoints = new HashMap<ProtocolVariation, Endpoint>();
+ applicationMessageCount = new AtomicInteger();
+ controlMessageCount = new AtomicInteger();
}
/**
@@ -204,10 +209,18 @@ public class RMEndpoint {
}
/**
+ * @return The number of times when last application message was received.
+ */
+ public int getApplicationMessageCount() {
+ return applicationMessageCount.get();
+ }
+
+ /**
* Indicates that an application message has been received.
*/
public void receivedApplicationMessage() {
lastApplicationMessage = System.currentTimeMillis();
+ applicationMessageCount.incrementAndGet();
}
/**
@@ -218,10 +231,18 @@ public class RMEndpoint {
}
/**
+ * @return The number of times when RM protocol message was received.
+ */
+ public int getControlMessageCount() {
+ return controlMessageCount.get();
+ }
+
+ /**
* Indicates that an RM protocol message has been received.
*/
public void receivedControlMessage() {
lastControlMessage = System.currentTimeMillis();
+ controlMessageCount.incrementAndGet();
}
/**
@@ -687,6 +708,22 @@ public class RMEndpoint {
// unregistering of this managed bean from the server is done by the bus itself
}
+ int getProcessingSourceSequenceCount() {
+ return source != null ? source.getProcessingSequenceCount() : 0;
+ }
+
+ int getCompletedSourceSequenceCount() {
+ return source != null ? source.getCompletedSequenceCount() : 0;
+ }
+
+ int getProcessingDestinationSequenceCount() {
+ return destination != null ? destination.getProcessingSequenceCount() : 0;
+ }
+
+ int getCompletedDestinationSequenceCount() {
+ return destination != null ? destination.getCompletedSequenceCount() : 0;
+ }
+
class EffectivePolicyImpl implements EffectivePolicy {
private EndpointPolicy endpointPolicy;
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Mon May 21 08:50:15 2012
@@ -522,72 +522,81 @@ public class RMManager {
RMEndpoint rme = createReliableEndpoint(endpoint);
rme.initialise(conduit, null, null);
reliableEndpoints.put(endpoint, rme);
- SourceSequence css = null;
for (SourceSequence ss : sss) {
-
- Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
- if (null == ms || 0 == ms.size()) {
- continue;
- }
- LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
-
- rme.getSource().addSequence(ss, false);
- // choosing an arbitrary valid source sequence as the current source sequence
- if (css == null && !ss.isExpired() && !ss.isLastMessage()) {
- css = ss;
- rme.getSource().setCurrent(css);
- }
- for (RMMessage m : ms) {
-
- Message message = new MessageImpl();
- Exchange exchange = new ExchangeImpl();
- message.setExchange(exchange);
- if (null != conduit) {
- exchange.setConduit(conduit);
- message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
- }
- exchange.put(Endpoint.class, endpoint);
- exchange.put(Service.class, endpoint.getService());
- if (endpoint.getEndpointInfo().getService() != null) {
- exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService());
- exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface());
- }
- exchange.put(Binding.class, endpoint.getBinding());
- exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding());
- exchange.put(Bus.class, bus);
-
- SequenceType st = new SequenceType();
- st.setIdentifier(ss.getIdentifier());
- st.setMessageNumber(m.getMessageNumber());
- RMProperties rmps = new RMProperties();
- rmps.setSequence(st);
- if (ss.isLastMessage() && ss.getCurrentMessageNr() == m.getMessageNumber()) {
- CloseSequenceType close = new CloseSequenceType();
- close.setIdentifier(ss.getIdentifier());
- rmps.setCloseSequence(close);
- }
- RMContextUtils.storeRMProperties(message, rmps, true);
- if (null == conduit) {
- String to = m.getTo();
- AddressingProperties maps = new AddressingPropertiesImpl();
- maps.setTo(RMUtils.createReference(to));
- RMContextUtils.storeMAPs(maps, message, true, false);
- }
-
- message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
- RMContextUtils.setProtocolVariation(message, ss.getProtocol());
-
- retransmissionQueue.addUnacknowledged(message);
- }
+ recoverSourceSequence(endpoint, conduit, rme.getSource(), ss);
}
for (DestinationSequence ds : dss) {
- rme.getDestination().addSequence(ds, false);
+ reconverDestinationSequence(endpoint, conduit, rme.getDestination(), ds);
}
retransmissionQueue.start();
}
+ private void recoverSourceSequence(Endpoint endpoint, Conduit conduit, Source s,
+ SourceSequence ss) {
+ Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
+ if (null == ms || 0 == ms.size()) {
+ store.removeSourceSequence(ss.getIdentifier());
+ return;
+ }
+ LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
+
+ s.addSequence(ss, false);
+ // choosing an arbitrary valid source sequence as the current source sequence
+ if (s.getAssociatedSequence(null) == null && !ss.isExpired() && !ss.isLastMessage()) {
+ s.setCurrent(ss);
+ }
+ for (RMMessage m : ms) {
+
+ Message message = new MessageImpl();
+ Exchange exchange = new ExchangeImpl();
+ message.setExchange(exchange);
+ if (null != conduit) {
+ exchange.setConduit(conduit);
+ message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+ }
+ exchange.put(Endpoint.class, endpoint);
+ exchange.put(Service.class, endpoint.getService());
+ if (endpoint.getEndpointInfo().getService() != null) {
+ exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService());
+ exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface());
+ }
+ exchange.put(Binding.class, endpoint.getBinding());
+ exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding());
+ exchange.put(Bus.class, bus);
+
+ SequenceType st = new SequenceType();
+ st.setIdentifier(ss.getIdentifier());
+ st.setMessageNumber(m.getMessageNumber());
+ RMProperties rmps = new RMProperties();
+ rmps.setSequence(st);
+ if (ss.isLastMessage() && ss.getCurrentMessageNr() == m.getMessageNumber()) {
+ CloseSequenceType close = new CloseSequenceType();
+ close.setIdentifier(ss.getIdentifier());
+ rmps.setCloseSequence(close);
+ }
+ RMContextUtils.storeRMProperties(message, rmps, true);
+ if (null == conduit) {
+ String to = m.getTo();
+ AddressingProperties maps = new AddressingPropertiesImpl();
+ maps.setTo(RMUtils.createReference(to));
+ RMContextUtils.storeMAPs(maps, message, true, false);
+ }
+
+ message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
+ RMContextUtils.setProtocolVariation(message, ss.getProtocol());
+
+ retransmissionQueue.addUnacknowledged(message);
+ }
+ }
+
+ private void reconverDestinationSequence(Endpoint endpoint, Conduit conduit, Destination d,
+ DestinationSequence ds) {
+ d.addSequence(ds, false);
+ //TODO add the redelivery code
+ }
+
RMEndpoint createReliableEndpoint(Endpoint endpoint) {
return new RMEndpoint(this, endpoint);
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Mon May 21 08:50:15 2012
@@ -37,6 +37,11 @@ public interface RetransmissionQueue {
int countUnacknowledged(SourceSequence seq);
/**
+ * @return the total number of unacknowledged messages in this queue
+ */
+ int countUnacknowledged();
+
+ /**
* @return true if there are no unacknowledged messages in the queue
*/
boolean isEmpty();
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java Mon May 21 08:50:15 2012
@@ -64,21 +64,31 @@ public class Source extends AbstractEndp
public void addSequence(SourceSequence seq, boolean persist) {
seq.setSource(this);
- map.put(seq.getIdentifier().getValue(), seq);
+ synchronized (map) {
+ map.put(seq.getIdentifier().getValue(), seq);
+ }
if (persist) {
RMStore store = getReliableEndpoint().getManager().getStore();
if (null != store) {
store.createSourceSequence(seq);
}
}
+ processingSequenceCount.incrementAndGet();
}
- public void removeSequence(SourceSequence seq) {
- map.remove(seq.getIdentifier().getValue());
+ public void removeSequence(SourceSequence seq) {
+ SourceSequence o;
+ synchronized (map) {
+ o = map.remove(seq.getIdentifier().getValue());
+ }
RMStore store = getReliableEndpoint().getManager().getStore();
if (null != store) {
store.removeSourceSequence(seq.getIdentifier());
}
+ if (o != null) {
+ processingSequenceCount.decrementAndGet();
+ completedSequenceCount.incrementAndGet();
+ }
}
/**
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Mon May 21 08:50:15 2012
@@ -66,6 +66,7 @@ import org.apache.cxf.ws.rm.Retransmissi
import org.apache.cxf.ws.rm.RetransmissionQueue;
import org.apache.cxf.ws.rm.RetryStatus;
import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.manager.RetryPolicyType;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.policy.RM10PolicyUtils;
import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -86,6 +87,8 @@ public class RetransmissionQueueImpl imp
private Resender resender;
private RMManager manager;
+ private int unacknowledgedCount;
+
public RetransmissionQueueImpl(RMManager m) {
manager = m;
}
@@ -111,6 +114,10 @@ public class RetransmissionQueueImpl imp
return sequenceCandidates == null ? 0 : sequenceCandidates.size();
}
+ public int countUnacknowledged() {
+ return unacknowledgedCount;
+ }
+
/**
* @return true if there are no unacknowledged messages in the queue
*/
@@ -131,13 +138,11 @@ public class RetransmissionQueueImpl imp
if (null != sequenceCandidates) {
for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
ResendCandidate candidate = sequenceCandidates.get(i);
- RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
- true);
- SequenceType st = properties.getSequence();
- long m = st.getMessageNumber().longValue();
+ long m = candidate.getNumber();
if (seq.isAcknowledged(m)) {
sequenceCandidates.remove(i);
candidate.resolved();
+ unacknowledgedCount--;
purged.add(m);
}
}
@@ -161,10 +166,7 @@ public class RetransmissionQueueImpl imp
if (null != sequenceCandidates) {
for (int i = 0; i < sequenceCandidates.size(); i++) {
ResendCandidate candidate = sequenceCandidates.get(i);
- RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
- true);
- SequenceType st = properties.getSequence();
- unacknowledged.add(st.getMessageNumber());
+ unacknowledged.add(candidate.getNumber());
}
}
return unacknowledged;
@@ -175,10 +177,7 @@ public class RetransmissionQueueImpl imp
if (null != sequenceCandidates) {
for (int i = 0; i < sequenceCandidates.size(); i++) {
ResendCandidate candidate = sequenceCandidates.get(i);
- RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
- true);
- SequenceType st = properties.getSequence();
- if (num == st.getMessageNumber()) {
+ if (num == candidate.getNumber()) {
return candidate;
}
}
@@ -192,10 +191,7 @@ public class RetransmissionQueueImpl imp
if (null != sequenceCandidates) {
for (int i = 0; i < sequenceCandidates.size(); i++) {
ResendCandidate candidate = sequenceCandidates.get(i);
- RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
- true);
- SequenceType st = properties.getSequence();
- cp.put(st.getMessageNumber(), candidate);
+ cp.put(candidate.getNumber(), candidate);
}
}
return cp;
@@ -305,6 +301,7 @@ public class RetransmissionQueueImpl imp
candidate.suspend();
}
sequenceCandidates.add(candidate);
+ unacknowledgedCount++;
}
LOG.fine("Cached unacknowledged message.");
return candidate;
@@ -473,10 +470,12 @@ public class RetransmissionQueueImpl imp
*/
protected class ResendCandidate implements Runnable, RetryStatus {
private Message message;
+ private long number;
private OutputStream out;
private Date next;
private TimerTask nextTask;
private int retries;
+ private int maxRetries;
private long nextInterval;
private long backoff;
private boolean pending;
@@ -498,7 +497,9 @@ public class RetransmissionQueueImpl imp
? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
next = new Date(System.currentTimeMillis() + baseRetransmissionInterval);
nextInterval = baseRetransmissionInterval * backoff;
-
+ RetryPolicyType rmrp = null != manager.getSourcePolicy()
+ ? manager.getSourcePolicy().getRetryPolicy() : null;
+ maxRetries = null != rmrp ? rmrp.getMaxRetries() : 0;
AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, true);
AttributedURIType to = null;
@@ -510,6 +511,10 @@ public class RetransmissionQueueImpl imp
LOG.log(Level.INFO, "Cannot resend to anonymous target. Not scheduling a resend.");
return;
}
+ RMProperties rmprops = RMContextUtils.retrieveRMProperties(message, true);
+ if (null != rmprops) {
+ number = rmprops.getSequence().getMessageNumber();
+ }
if (null != manager.getTimer()) {
schedule();
}
@@ -558,6 +563,10 @@ public class RetransmissionQueueImpl imp
}
}
+ public long getNumber() {
+ return number;
+ }
+
/**
* @return number of resend attempts
*/
@@ -569,7 +578,7 @@ public class RetransmissionQueueImpl imp
* @return number of max resend attempts
*/
public int getMaxRetries() {
- return 0;
+ return maxRetries;
}
/**
@@ -666,7 +675,7 @@ public class RetransmissionQueueImpl imp
protected synchronized void attempted() {
pending = false;
retries++;
- if (null != next) {
+ if (null != next && maxRetries != retries) {
next = new Date(next.getTime() + nextInterval);
nextInterval *= backoff;
schedule();
Modified: cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd (original)
+++ cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd Mon May 21 08:50:15 2012
@@ -42,6 +42,7 @@
<xs:sequence>
<xs:element name="sequenceTerminationPolicy" type="tns:SequenceTerminationPolicyType" minOccurs="0"/>
+ <xs:element name="retryPolicy" type="tns:RetryPolicyType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="sequenceExpiration" type="xs:duration" use="optional" default="PT0S">
@@ -88,6 +89,7 @@
<xs:sequence>
<xs:element name="acksPolicy" type="tns:AcksPolicyType" minOccurs="0"/>
+ <xs:element name="retryPolicy" type="tns:RetryPolicyType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="sequenceExpiration" type="xs:duration" use="optional" default="PT0S">
@@ -177,6 +179,17 @@
</xs:attribute>
</xs:complexType>
+
+ <xs:complexType name="RetryPolicyType">
+ <xs:attribute name="maxRetries" type="xs:int" use="optional" default="0">
+ <xs:annotation>
+ <xs:documentation>
+ The maximum number of retries of a message.
+ A value of 0 means the number is unbound.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
+ </xs:complexType>
<xs:complexType name="DeliveryAssuranceType">
<xs:sequence>
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Mon May 21 08:50:15 2012
@@ -472,6 +472,10 @@ public class ManagedRMManagerTest extend
RetryStatus getRetransmissionStatus() {
return status;
}
+
+ public int countUnacknowledged() {
+ return 3;
+ }
}
private static class TestRetransmissionStatus implements RetryStatus {
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?rev=1340937&r1=1340936&r2=1340937&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Mon May 21 08:50:15 2012
@@ -31,6 +31,8 @@ import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMMessageConstants;
import org.apache.cxf.ws.rm.RMProperties;
import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.manager.RetryPolicyType;
+import org.apache.cxf.ws.rm.manager.SourcePolicyType;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.v200702.Identifier;
import org.apache.cxf.ws.rm.v200702.SequenceType;
@@ -127,7 +129,33 @@ public class RetransmissionQueueImplTest
assertTrue(!candidate.getNext().before(refDate));
refDate = new Date(now + 17000);
assertTrue(!candidate.getNext().after(refDate));
- assertTrue(!candidate.isPending());
+ assertTrue(!candidate.isPending());
+ }
+
+ @Test
+ public void testResendCandidateMaxRetries() {
+ Message message = createMock(Message.class);
+ setupMessagePolicies(message);
+ setupRetryPolicy(message);
+
+ ready(true);
+ RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
+
+ assertEquals(3, candidate.getMaxRetries());
+ Date next = null;
+ for (int i = 1; i < 3; i++) {
+ next = candidate.getNext();
+ candidate.attempted();
+ assertEquals(i, candidate.getRetries());
+ // the next time must advance
+ assertTrue(candidate.getNext().after(next));
+ }
+ next = candidate.getNext();
+ candidate.attempted();
+ // reaches the max retries
+ assertEquals(3, candidate.getRetries());
+ // the next time must not advance
+ assertFalse(candidate.getNext().after(next));
}
@Test
@@ -349,7 +377,15 @@ public class RetransmissionQueueImplTest
EasyMock.expect(rma.getExponentialBackoff()).andReturn(eb);
}
+ private void setupRetryPolicy(Message message) {
+ SourcePolicyType spt = control.createMock(SourcePolicyType.class);
+ EasyMock.expect(manager.getSourcePolicy()).andReturn(spt).anyTimes();
+ RetryPolicyType rpt = control.createMock(RetryPolicyType.class);
+ EasyMock.expect(spt.getRetryPolicy()).andReturn(rpt);
+ EasyMock.expect(rpt.getMaxRetries()).andReturn(3);
+ }
+
private void ready(boolean doStart) {
control.replay();
if (doStart) {