You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2007/05/01 10:49:31 UTC
svn commit: r533968 - in /incubator/cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/...
Author: andreasmyth
Date: Tue May 1 01:49:30 2007
New Revision: 533968
URL: http://svn.apache.org/viewvc?view=rev&rev=533968
Log:
[JIRA CXF-347] Implementation of InactivityTimeout:
* Recorded arrival time of last application and RM protocol messages.
* Terminate sequence if no protocol or application message have been sent to an endpoint for more than the time specified in the inactvity timeout parameter of the RMAssertion.
* Override Assertion.equal and AssertionBuilder.buildCompatible for RMAssertion and use these in refactored PolicyUtils.
Added:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java (with props)
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Tue May 1 01:49:30 2007
@@ -38,6 +38,8 @@
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.policy.PolicyUtils;
import org.apache.cxf.ws.rm.policy.RMAssertion;
+import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
public class DestinationSequence extends AbstractSequence {
@@ -49,6 +51,7 @@
private SequenceMonitor monitor;
private boolean acknowledgeOnNextOccasion;
private List<DeferredAcknowledgment> deferredAcknowledgments;
+ private SequenceTermination scheduledTermination;
private String correlationID;
public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d) {
@@ -153,7 +156,27 @@
purgeAcknowledged(messageNumber);
- scheduleAcknowledgement(message);
+ RMAssertion rma = PolicyUtils.getRMAssertion(destination.getManager().getRMAssertion(), message);
+ long acknowledgementInterval = 0;
+ AcknowledgementInterval ai = rma.getAcknowledgementInterval();
+ if (null != ai) {
+ BigInteger val = ai.getMilliseconds();
+ if (null != val) {
+ acknowledgementInterval = val.longValue();
+ }
+ }
+
+ scheduleAcknowledgement(acknowledgementInterval);
+
+ long inactivityTimeout = 0;
+ InactivityTimeout iat = rma.getInactivityTimeout();
+ if (null != iat) {
+ BigInteger val = iat.getMilliseconds();
+ if (null != val) {
+ inactivityTimeout = val.longValue();
+ }
+ }
+ scheduleSequenceTermination(inactivityTimeout);
}
@@ -273,24 +296,12 @@
return correlationID;
}
- void scheduleAcknowledgement(Message message) {
- BigInteger interval = PolicyUtils.getAcknowledgmentInterval(message);
- if (null == interval) {
- RMAssertion rma = destination.getManager().getRMAssertion();
- if (null != rma.getAcknowledgementInterval()) {
- interval = rma.getAcknowledgementInterval().getMilliseconds();
- }
- }
-
- long delay = 0;
- if (null != interval) {
- delay = interval.longValue();
- }
+ void scheduleAcknowledgement(long acknowledgementInterval) {
AcksPolicyType ap = destination.getManager().getDestinationPolicy().getAcksPolicy();
- if (delay > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) {
+ if (acknowledgementInterval > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) {
LOG.fine("Schedule deferred acknowledgment");
- scheduleDeferredAcknowledgement(delay);
+ scheduleDeferredAcknowledgement(acknowledgementInterval);
} else {
LOG.fine("Schedule immediate acknowledgment");
scheduleImmediateAcknowledgement();
@@ -301,6 +312,20 @@
void scheduleImmediateAcknowledgement() {
acknowledgeOnNextOccasion = true;
}
+
+ synchronized void scheduleSequenceTermination(long inactivityTimeout) {
+ if (inactivityTimeout <= 0) {
+ return;
+ }
+ boolean scheduled = null != scheduledTermination;
+ if (null == scheduledTermination) {
+ scheduledTermination = new SequenceTermination();
+ }
+ scheduledTermination.updateInactivityTimeout(inactivityTimeout);
+ if (!scheduled) {
+ destination.getManager().getTimer().schedule(scheduledTermination, inactivityTimeout);
+ }
+ }
synchronized void scheduleDeferredAcknowledgement(long delay) {
@@ -349,6 +374,43 @@
}
+ }
+ }
+
+ final class SequenceTermination extends TimerTask {
+
+ private long maxInactivityTimeout;
+
+ void updateInactivityTimeout(long timeout) {
+ maxInactivityTimeout = Math.max(maxInactivityTimeout, timeout);
+ }
+
+ public void run() {
+ synchronized (DestinationSequence.this) {
+ DestinationSequence.this.scheduledTermination = null;
+ RMEndpoint rme = destination.getReliableEndpoint();
+ long lat = Math.max(rme.getLastControlMessage(), rme.getLastApplicationMessage());
+ if (0 == lat) {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ if (now - lat >= maxInactivityTimeout) {
+
+ // terminate regardless outstanding acknowledgments - as we assume that the client is
+ // gone there is no point in sending a SequenceAcknowledgment
+
+ LogUtils.log(LOG, Level.WARNING, "TERMINATING_INACTIVE_SEQ_MSG",
+ DestinationSequence.this.getIdentifier().getValue());
+ DestinationSequence.this.destination.removeSequence(DestinationSequence.this);
+
+ } else {
+ // reschedule
+ SequenceTermination st = new SequenceTermination();
+ st.updateInactivityTimeout(maxInactivityTimeout);
+ DestinationSequence.this.destination.getManager().getTimer()
+ .schedule(st, maxInactivityTimeout);
+ }
+ }
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties Tue May 1 01:49:30 2007
@@ -18,6 +18,8 @@
# under the License.
#
#
+TERMINATING_INACTIVE_SEQ_MSG = Terminating sequence {0} due to inactivity.
+
RM_INVOCATION_FAILED = Invocation of RM protocol operation failed.
SEQ_TERMINATION_FAILURE = Failed to terminate sequence {0}.
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Tue May 1 01:49:30 2007
@@ -78,8 +78,9 @@
private Endpoint endpoint;
private Proxy proxy;
private Servant servant;
-
-
+ private long lastApplicationMessage;
+ private long lastControlMessage;
+
public RMEndpoint(RMManager m, Endpoint ae) {
manager = m;
applicationEndpoint = ae;
@@ -448,6 +449,24 @@
void setManager(RMManager m) {
manager = m;
}
+
+ public long getLastApplicationMessage() {
+ return lastApplicationMessage;
+ }
+
+ public void receivedApplicationMessage() {
+ lastApplicationMessage = System.currentTimeMillis();
+ }
+
+ public long getLastControlMessage() {
+ return lastControlMessage;
+ }
+
+ public void receivedControlMessage() {
+ lastControlMessage = System.currentTimeMillis();
+ }
+
+
class EffectivePolicyImpl implements EffectivePolicy {
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Tue May 1 01:49:30 2007
@@ -78,7 +78,9 @@
// for application AND out of band messages
- if (isApplicationMessage) {
+ RMEndpoint rme = getManager().getReliableEndpoint(message);
+
+ if (isApplicationMessage) {
Destination destination = getManager().getDestination(message);
if (null != rmps) {
processAcknowledgments(rmps);
@@ -86,16 +88,19 @@
processSequence(destination, message);
processDeliveryAssurance(rmps);
}
- } else if (RMConstants.getSequenceAckAction().equals(action)) {
- processAcknowledgments(rmps);
- } else if (RMConstants.getCreateSequenceAction().equals(action) && !isServer) {
- LOG.fine("Processing inbound CreateSequence on client side.");
- RMEndpoint rme = getManager().getReliableEndpoint(message);
- Servant servant = rme.getServant();
- CreateSequenceResponseType csr = servant.createSequence(message);
- Proxy proxy = rme.getProxy();
- proxy.createSequenceResponse(csr);
- return;
+ rme.receivedApplicationMessage();
+ } else {
+ rme.receivedControlMessage();
+ if (RMConstants.getSequenceAckAction().equals(action)) {
+ processAcknowledgments(rmps);
+ } else if (RMConstants.getCreateSequenceAction().equals(action) && !isServer) {
+ LOG.fine("Processing inbound CreateSequence on client side.");
+ Servant servant = rme.getServant();
+ CreateSequenceResponseType csr = servant.createSequence(message);
+ Proxy proxy = rme.getProxy();
+ proxy.createSequenceResponse(csr);
+ return;
+ }
}
assertReliability(message);
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Tue May 1 01:49:30 2007
@@ -239,14 +239,7 @@
@PostConstruct
void initialise() {
if (!isSetRMAssertion()) {
- org.apache.cxf.ws.rm.policy.ObjectFactory factory =
- new org.apache.cxf.ws.rm.policy.ObjectFactory();
- RMAssertion rma = factory.createRMAssertion();
- BaseRetransmissionInterval bri = factory.createRMAssertionBaseRetransmissionInterval();
- bri.setMilliseconds(new BigInteger(RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
- rma.setBaseRetransmissionInterval(bri);
- rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
- setRMAssertion(rma);
+ setRMAssertion(null);
}
org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
if (!isSetDeliveryAssurance()) {
@@ -275,7 +268,28 @@
idGenerator = new DefaultSequenceIdentifierGenerator();
}
}
-
+
+ @Override
+ public void setRMAssertion(RMAssertion rma) {
+
+ org.apache.cxf.ws.rm.policy.ObjectFactory factory =
+ new org.apache.cxf.ws.rm.policy.ObjectFactory();
+ if (null == rma) {
+ rma = factory.createRMAssertion();
+ rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
+ }
+ BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+ if (null == bri) {
+ bri = factory.createRMAssertionBaseRetransmissionInterval();
+ rma.setBaseRetransmissionInterval(bri);
+ }
+ if (null == bri.getMilliseconds()) {
+ bri.setMilliseconds(new BigInteger(RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
+ }
+
+ super.setRMAssertion(rma);
+ }
+
void addSourceSequence(SourceSequence ss) {
if (null == sourceSequences) {
sourceSequences = new HashMap<String, SourceSequence>();
@@ -298,4 +312,5 @@
return sid;
}
}
+
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java Tue May 1 01:49:30 2007
@@ -58,6 +58,13 @@
return mpm;
}
+
+ public synchronized long getLastArrivalTime() {
+ if (receiveTimes.size() > 0) {
+ return receiveTimes.get(receiveTimes.size() - 1).longValue();
+ }
+ return 0;
+ }
protected void setMonitorInterval(long i) {
if (receiveTimes.size() == 0) {
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java Tue May 1 01:49:30 2007
@@ -27,6 +27,9 @@
import org.apache.cxf.ws.policy.AssertionInfoMap;
import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
import org.apache.cxf.ws.rm.RMConstants;
+import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
/**
*
@@ -40,94 +43,168 @@
private PolicyUtils() {
}
-
/**
- * Returns the base retransmission interval for the specified message.
- * This is obtained as the minimum base retransmission interval in all RMAssertions pertaining
- * to the message, or null if there are no such policy assertions.
+ * Returns an RMAssertion that is compatible with the default value
+ * and all RMAssertions pertaining to the message (can never be null).
+ *
+ * @param rma the default value
* @param message the message
- * @return the base retransmission interval for the message
+ * @return the compatible RMAssertion
*/
- public static BigInteger getBaseRetransmissionInterval(Message message) {
+ public static RMAssertion getRMAssertion(RMAssertion defaultValue, Message message) {
+ RMAssertion compatible = defaultValue;
AssertionInfoMap amap = message.get(AssertionInfoMap.class);
- BigInteger result = null;
if (null != amap) {
Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
if (null != ais) {
+
for (AssertionInfo ai : ais) {
JaxbAssertion<RMAssertion> ja = getAssertion(ai);
RMAssertion rma = ja.getData();
- RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
- if (null == bri) {
- continue;
- }
- BigInteger val = bri.getMilliseconds();
- if (null == result || val.compareTo(result) < 0) {
- result = val;
- }
+ compatible = null == defaultValue ? rma
+ : intersect(compatible, rma);
}
}
}
- return result;
+ return compatible;
}
- /**
- * Determines if exponential backoff should be used in repeated attempts to
- * resend the specified message. Returns false if there is at least one
- * RMAssertion for this message indicating that no exponential backoff
- * algorithm should be used, or true otherwise.
- *
- * @param message the message
- * @return true iff the exponential backoff algorithm should be used for the
- * message
- */
- public static boolean useExponentialBackoff(Message message) {
- AssertionInfoMap amap = message.get(AssertionInfoMap.class);
- if (null != amap) {
- Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
- if (null != ais) {
- for (AssertionInfo ai : ais) {
- JaxbAssertion<RMAssertion> ja = getAssertion(ai);
- RMAssertion rma = ja.getData();
- if (null == rma.getExponentialBackoff()) {
- return false;
- }
- }
+ public static RMAssertion intersect(RMAssertion a, RMAssertion b) {
+ if (equals(a, b)) {
+ return a;
+ }
+ RMAssertion compatible = new RMAssertion();
+
+ // use maximum of inactivity timeout
+
+ BigInteger aval = null;
+ if (null != a.getInactivityTimeout()) {
+ aval = a.getInactivityTimeout().getMilliseconds();
+ }
+ BigInteger bval = null;
+ if (null != b.getInactivityTimeout()) {
+ bval = b.getInactivityTimeout().getMilliseconds();
+ }
+ if (null != aval || null != bval) {
+ InactivityTimeout ia = new RMAssertion.InactivityTimeout();
+ if (null != aval && null != bval) {
+ ia.setMilliseconds(aval.max(bval));
+ } else {
+ ia.setMilliseconds(aval != null ? aval : bval);
}
+ compatible.setInactivityTimeout(ia);
}
- return true;
+
+ // use minimum of base retransmission interval
+
+ aval = null;
+ if (null != a.getBaseRetransmissionInterval()) {
+ aval = a.getBaseRetransmissionInterval().getMilliseconds();
+ }
+ bval = null;
+ if (null != b.getBaseRetransmissionInterval()) {
+ bval = b.getBaseRetransmissionInterval().getMilliseconds();
+ }
+ if (null != aval || null != bval) {
+ BaseRetransmissionInterval bri = new RMAssertion.BaseRetransmissionInterval();
+ if (null != aval && null != bval) {
+ bri.setMilliseconds(aval.min(bval));
+ } else {
+ bri.setMilliseconds(aval != null ? aval : bval);
+ }
+ compatible.setBaseRetransmissionInterval(bri);
+ }
+
+ // use minimum of acknowledgement interval
+
+ aval = null;
+ if (null != a.getAcknowledgementInterval()) {
+ aval = a.getAcknowledgementInterval().getMilliseconds();
+ }
+ bval = null;
+ if (null != b.getAcknowledgementInterval()) {
+ bval = b.getAcknowledgementInterval().getMilliseconds();
+ }
+ if (null != aval || null != bval) {
+ AcknowledgementInterval ai = new RMAssertion.AcknowledgementInterval();
+ if (null != aval && null != bval) {
+ ai.setMilliseconds(aval.min(bval));
+ } else {
+ ai.setMilliseconds(aval != null ? aval : bval);
+ }
+ compatible.setAcknowledgementInterval(ai);
+ }
+
+ // backoff parameter
+ if (null != a.getExponentialBackoff() || null != b.getExponentialBackoff()) {
+ compatible.setExponentialBackoff(new RMAssertion.ExponentialBackoff());
+ }
+ return compatible;
}
- /**
- * Returns the acknowledgment interval for the specified message.
- * This is obtained as the minimum acknowledgment interval in all RMAssertions pertaining
- * to the message, or null of if there are no such policy assertions.
- * @param message the message
- * @return the base retransmission interval for the message
- */
- public static BigInteger getAcknowledgmentInterval(Message message) {
- AssertionInfoMap amap = message.get(AssertionInfoMap.class);
- BigInteger result = null;
- if (null != amap) {
- Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
- if (null != ais) {
- for (AssertionInfo ai : ais) {
- JaxbAssertion<RMAssertion> ja = getAssertion(ai);
- RMAssertion rma = ja.getData();
- RMAssertion.AcknowledgementInterval interval = rma.getAcknowledgementInterval();
- if (null == interval) {
- continue;
- }
- BigInteger val = interval.getMilliseconds();
- if (null == result || val.compareTo(result) < 0) {
- result = val;
- }
+ public static boolean equals(RMAssertion a, RMAssertion b) {
+ if (a == b) {
+ return true;
+ }
+
+ BigInteger aval = null;
+ if (null != a.getInactivityTimeout()) {
+ aval = a.getInactivityTimeout().getMilliseconds();
+ }
+ BigInteger bval = null;
+ if (null != b.getInactivityTimeout()) {
+ bval = b.getInactivityTimeout().getMilliseconds();
+ }
+ if (!equals(aval, bval)) {
+ return false;
+ }
+
+ aval = null;
+ if (null != a.getBaseRetransmissionInterval()) {
+ aval = a.getBaseRetransmissionInterval().getMilliseconds();
+ }
+ bval = null;
+ if (null != b.getBaseRetransmissionInterval()) {
+ bval = b.getBaseRetransmissionInterval().getMilliseconds();
+ }
+ if (!equals(aval, bval)) {
+ return false;
+ }
+
+ aval = null;
+ if (null != a.getAcknowledgementInterval()) {
+ aval = a.getAcknowledgementInterval().getMilliseconds();
+ }
+ bval = null;
+ if (null != b.getAcknowledgementInterval()) {
+ bval = b.getAcknowledgementInterval().getMilliseconds();
+ }
+ if (!equals(aval, bval)) {
+ return false;
+ }
+
+ return null == a.getExponentialBackoff()
+ ? null == b.getExponentialBackoff()
+ : null != b.getExponentialBackoff();
+ }
+
+ private static boolean equals(BigInteger aval, BigInteger bval) {
+ if (null != aval) {
+ if (null != bval) {
+ if (!aval.equals(bval)) {
+ return false;
}
+ } else {
+ return false;
}
+ } else {
+ if (null != bval) {
+ return false;
+ }
+ return true;
}
- return result;
+ return true;
}
-
@SuppressWarnings("unchecked")
private static JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java?view=auto&rev=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java Tue May 1 01:49:30 2007
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.policy;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertionBuilder;
+import org.apache.cxf.ws.rm.RMConstants;
+import org.apache.neethi.Assertion;
+import org.apache.neethi.Constants;
+import org.apache.neethi.PolicyComponent;
+
+/**
+ *
+ */
+public class RMAssertionBuilder extends JaxbAssertionBuilder<RMAssertion> {
+
+ public RMAssertionBuilder() throws JAXBException {
+ super(RMAssertion.class, RMConstants.getRMAssertionQName());
+ }
+
+ @Override
+ public Assertion buildCompatible(Assertion a, Assertion b) {
+ if (RMConstants.getRMAssertionQName().equals(a.getName())
+ && RMConstants.getRMAssertionQName().equals(b.getName())) {
+
+ RMAssertion compatible = PolicyUtils.intersect(
+ JaxbAssertion.cast(a, RMAssertion.class).getData(),
+ JaxbAssertion.cast(b, RMAssertion.class).getData());
+ if (null == compatible) {
+ return null;
+ }
+ JaxbAssertion<RMAssertion> ca =
+ new JaxbAssertion<RMAssertion>(RMConstants.getRMAssertionQName(),
+ a.isOptional() && b.isOptional());
+ ca.setData(compatible);
+ return ca;
+ }
+ return null;
+ }
+
+ @Override
+ protected JaxbAssertion<RMAssertion> buildAssertion() {
+ return new RMPolicyAssertion();
+ }
+
+ class RMPolicyAssertion extends JaxbAssertion<RMAssertion> {
+ RMPolicyAssertion() {
+ super(RMConstants.getRMAssertionQName(), false);
+ }
+
+ @Override
+ public boolean equal(PolicyComponent policyComponent) {
+ if (policyComponent.getType() != Constants.TYPE_ASSERTION
+ || !getName().equals(((Assertion)policyComponent).getName())) {
+ return false;
+ }
+ JaxbAssertion<RMAssertion> other = JaxbAssertion.cast((Assertion)policyComponent);
+ return PolicyUtils.equals(this.getData(), other.getData());
+ }
+
+ @Override
+ protected Assertion cloneMandatory() {
+ RMPolicyAssertion a = new RMPolicyAssertion();
+ a.setData(getData());
+ return a;
+ }
+ }
+
+
+
+
+}
+
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMAssertionBuilder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue May 1 01:49:30 2007
@@ -81,54 +81,6 @@
manager = m;
}
- /**
- * Returns the base retransmission interval for the specified message. This
- * is obtained as the minimum base retransmission interval in all
- * RMAssertions pertaining to the message, or the default configured for the
- * RMManager if there are no such policy assertions.
- *
- * @param message the message
- * @return the base retransmission interval for the message
- */
- public long getBaseRetransmissionInterval(Message message) {
- BigInteger val = PolicyUtils.getBaseRetransmissionInterval(message);
- if (null != val) {
- return val.longValue();
- } else {
-
- RMAssertion rma = manager.getRMAssertion();
- RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
- if (null != bri) {
- val = bri.getMilliseconds();
- }
- }
- if (null != val) {
- return val.longValue();
- }
- return 0;
- }
-
- /**
- * Determines if exponential backoff should be used in repeated attemprs to
- * resend the specified message. Returns false if there is at least one
- * RMAssertion for this message indicating that no exponential backoff
- * algorithm should be used, or true otherwise.
- *
- * @param message the message
- * @return true iff the exponential backoff algorithm should be used for the
- * message
- */
- public boolean useExponentialBackoff(Message message) {
- if (!PolicyUtils.useExponentialBackoff(message)) {
- return false;
- }
- RMAssertion rma = manager.getRMAssertion();
- if (null == rma.getExponentialBackoff()) {
- return false;
- }
- return true;
- }
-
public void addUnacknowledged(Message message) {
cacheUnacknowledged(message);
}
@@ -335,8 +287,11 @@
protected ResendCandidate(Message m) {
message = m;
resends = 0;
- long baseRetransmissionInterval = getBaseRetransmissionInterval(m);
- backoff = useExponentialBackoff(m) ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
+ RMAssertion rma = PolicyUtils.getRMAssertion(manager.getRMAssertion(), message);
+ long baseRetransmissionInterval =
+ rma.getBaseRetransmissionInterval().getMilliseconds().longValue();
+ backoff = null != rma.getExponentialBackoff()
+ ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
next = new Date(System.currentTimeMillis() + baseRetransmissionInterval);
nextInterval = baseRetransmissionInterval * backoff;
if (null != manager.getTimer()) {
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java Tue May 1 01:49:30 2007
@@ -45,6 +45,7 @@
public class AbstractRMInterceptorTest extends Assert {
private IMocksControl control;
+
@Before
public void setUp() {
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Tue May 1 01:49:30 2007
@@ -36,6 +36,7 @@
import org.apache.cxf.ws.rm.policy.RMAssertion;
import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
import org.junit.After;
@@ -614,6 +615,38 @@
EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
control.replay();
assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
+ control.verify();
+ }
+
+ @Test
+ public void testScheduleSequenceTermination() throws SequenceFault, IOException {
+ Timer timer = new Timer();
+ setUpDestination(timer);
+
+ DestinationSequence seq = new DestinationSequence(id, ref, destination);
+ destination.removeSequence(seq);
+ EasyMock.expectLastCall();
+
+ Message message = setUpMessage("1");
+
+ RMEndpoint rme = control.createMock(RMEndpoint.class);
+ EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme);
+ long arrival = System.currentTimeMillis();
+ EasyMock.expect(rme.getLastApplicationMessage()).andReturn(arrival);
+
+ control.replay();
+ InactivityTimeout iat = new RMAssertion.InactivityTimeout();
+ iat.setMilliseconds(new BigInteger("200"));
+ rma.setInactivityTimeout(iat);
+
+ seq.acknowledge(message);
+
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+
control.verify();
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java Tue May 1 01:49:30 2007
@@ -44,8 +44,12 @@
public class RMInInterceptorTest extends Assert {
private IMocksControl control;
+ private RMInInterceptor interceptor;
+ private RMManager manager;
+ private RMEndpoint rme;
private RMProperties rmps;
+
@Before
public void setUp() {
control = EasyMock.createNiceControl();
@@ -76,8 +80,10 @@
@Test
public void testHandleCreateSequenceOnServer() throws SequenceFault, RMException {
- RMInInterceptor interceptor = new RMInInterceptor();
- Message message = setupInboundMessage(RMConstants.getCreateSequenceAction(), true);
+ interceptor = new RMInInterceptor();
+ Message message = setupInboundMessage(RMConstants.getCreateSequenceAction(), true);
+ rme.receivedControlMessage();
+ EasyMock.expectLastCall();
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
control.replay();
@@ -86,12 +92,10 @@
@Test
public void testHandleCreateSequenceOnClient() throws SequenceFault, RMException {
- RMInInterceptor interceptor = new RMInInterceptor();
- Message message = setupInboundMessage(RMConstants.getCreateSequenceAction(), false);
- RMManager manager = control.createMock(RMManager.class);
- interceptor.setManager(manager);
- RMEndpoint rme = control.createMock(RMEndpoint.class);
- EasyMock.expect(manager.getReliableEndpoint(message)).andReturn(rme);
+ interceptor = new RMInInterceptor();
+ Message message = setupInboundMessage(RMConstants.getCreateSequenceAction(), false);
+ rme.receivedControlMessage();
+ EasyMock.expectLastCall();
Servant servant = control.createMock(Servant.class);
EasyMock.expect(rme.getServant()).andReturn(servant);
CreateSequenceResponseType csr = control.createMock(CreateSequenceResponseType.class);
@@ -119,8 +123,10 @@
NoSuchMethodException {
Method m = RMInInterceptor.class.getDeclaredMethod("processAcknowledgments",
new Class[] {RMProperties.class});
- RMInInterceptor interceptor = control.createMock(RMInInterceptor.class, new Method[] {m});
+ interceptor = control.createMock(RMInInterceptor.class, new Method[] {m});
Message message = setupInboundMessage(RMConstants.getSequenceAckAction(), onServer);
+ rme.receivedControlMessage();
+ EasyMock.expectLastCall();
interceptor.processAcknowledgments(rmps);
EasyMock.expectLastCall();
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
@@ -140,8 +146,9 @@
}
private void testHandleTerminateSequence(boolean onServer) throws SequenceFault, RMException {
- RMInInterceptor interceptor = new RMInInterceptor();
+ interceptor = new RMInInterceptor();
Message message = setupInboundMessage(RMConstants.getTerminateSequenceAction(), onServer);
+ rme.receivedControlMessage();
EasyMock.expectLastCall();
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
@@ -168,11 +175,9 @@
new Class[] {Destination.class, Message.class});
Method m4 = RMInInterceptor.class.getDeclaredMethod("processDeliveryAssurance",
new Class[] {RMProperties.class});
- RMInInterceptor interceptor = control
+ interceptor = control
.createMock(RMInInterceptor.class, new Method[] {m1, m2, m3, m4});
Message message = setupInboundMessage("greetMe", true);
- RMManager manager = control.createMock(RMManager.class);
- interceptor.setManager(manager);
Destination d = control.createMock(Destination.class);
EasyMock.expect(manager.getDestination(message)).andReturn(d);
interceptor.processAcknowledgments(rmps);
@@ -184,6 +189,8 @@
interceptor.processDeliveryAssurance(rmps);
EasyMock.expectLastCall();
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ rme.receivedApplicationMessage();
+ EasyMock.expectLastCall();
control.replay();
interceptor.handle(message);
@@ -191,8 +198,8 @@
@Test
public void testProcessAcknowledgments() {
- RMInInterceptor interceptor = new RMInInterceptor();
- RMManager manager = control.createMock(RMManager.class);
+ interceptor = new RMInInterceptor();
+ manager = control.createMock(RMManager.class);
interceptor.setManager(manager);
SequenceAcknowledgement ack1 = control.createMock(SequenceAcknowledgement.class);
SequenceAcknowledgement ack2 = control.createMock(SequenceAcknowledgement.class);
@@ -232,7 +239,7 @@
destination.acknowledge(message);
EasyMock.expectLastCall();
control.replay();
- RMInInterceptor interceptor = new RMInInterceptor();
+ interceptor = new RMInInterceptor();
interceptor.processSequence(destination, message);
}
@@ -267,6 +274,11 @@
org.apache.cxf.transport.Destination td =
serverSide ? control.createMock(org.apache.cxf.transport.Destination.class) : null;
EasyMock.expect(exchange.getDestination()).andReturn(td);
+
+ manager = control.createMock(RMManager.class);
+ interceptor.setManager(manager);
+ rme = control.createMock(RMEndpoint.class);
+ EasyMock.expect(manager.getReliableEndpoint(message)).andReturn(rme);
return message;
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java Tue May 1 01:49:30 2007
@@ -28,9 +28,12 @@
import org.apache.cxf.ws.policy.AssertionInfoMap;
import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
import org.apache.cxf.ws.rm.RMConstants;
+import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
+import org.apache.cxf.ws.rm.policy.RMAssertion.ExponentialBackoff;
+import org.apache.cxf.ws.rm.policy.RMAssertion.InactivityTimeout;
import org.easymock.IMocksControl;
import org.easymock.classextension.EasyMock;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -47,133 +50,120 @@
control = EasyMock.createNiceControl();
}
- @After
- public void tearDown() {
- control.verify();
+ @Test
+ public void testRMAssertionEquals() {
+ RMAssertion a = new RMAssertion();
+ assertTrue(PolicyUtils.equals(a, a));
+
+ RMAssertion b = new RMAssertion();
+ assertTrue(PolicyUtils.equals(a, b));
+
+ InactivityTimeout iat = new RMAssertion.InactivityTimeout();
+ iat.setMilliseconds(BigInteger.TEN);
+ a.setInactivityTimeout(iat);
+ assertTrue(!PolicyUtils.equals(a, b));
+ b.setInactivityTimeout(iat);
+ assertTrue(PolicyUtils.equals(a, b));
+
+ ExponentialBackoff eb = new RMAssertion.ExponentialBackoff();
+ a.setExponentialBackoff(eb);
+ assertTrue(!PolicyUtils.equals(a, b));
+ b.setExponentialBackoff(eb);
+ assertTrue(PolicyUtils.equals(a, b));
}
@Test
- public void testGetBaseRetranmissionInterval() {
- Message message = control.createMock(Message.class);
- AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
- AssertionInfo ai1 = control.createMock(AssertionInfo.class);
- AssertionInfo ai2 = control.createMock(AssertionInfo.class);
- AssertionInfo ai3 = control.createMock(AssertionInfo.class);
- AssertionInfo ai4 = control.createMock(AssertionInfo.class);
- Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
- ais.add(ai1);
- ais.add(ai2);
- ais.add(ai3);
- ais.add(ai4);
- EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
- JaxbAssertion ja1 = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
- RMAssertion rma1 = control.createMock(RMAssertion.class);
- EasyMock.expect(ja1.getData()).andReturn(rma1);
- EasyMock.expect(rma1.getBaseRetransmissionInterval()).andReturn(null);
- JaxbAssertion ja2 = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
- RMAssertion rma2 = control.createMock(RMAssertion.class);
- EasyMock.expect(ja2.getData()).andReturn(rma2);
- RMAssertion.BaseRetransmissionInterval bri2 =
- control.createMock(RMAssertion.BaseRetransmissionInterval.class);
- EasyMock.expect(rma2.getBaseRetransmissionInterval()).andReturn(bri2);
- EasyMock.expect(bri2.getMilliseconds()).andReturn(null);
- JaxbAssertion ja3 = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
- RMAssertion rma3 = control.createMock(RMAssertion.class);
- EasyMock.expect(ja3.getData()).andReturn(rma3);
- RMAssertion.BaseRetransmissionInterval bri3 =
- control.createMock(RMAssertion.BaseRetransmissionInterval.class);
- EasyMock.expect(rma3.getBaseRetransmissionInterval()).andReturn(bri3);
- EasyMock.expect(bri3.getMilliseconds()).andReturn(new BigInteger("10000"));
- JaxbAssertion ja4 = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
- RMAssertion rma4 = control.createMock(RMAssertion.class);
- EasyMock.expect(ja4.getData()).andReturn(rma4);
- RMAssertion.BaseRetransmissionInterval bri4 =
- control.createMock(RMAssertion.BaseRetransmissionInterval.class);
- EasyMock.expect(rma4.getBaseRetransmissionInterval()).andReturn(bri4);
- EasyMock.expect(bri4.getMilliseconds()).andReturn(new BigInteger("5000"));
+ public void testIntersect() {
+ RMAssertion a = new RMAssertion();
+ RMAssertion b = new RMAssertion();
+ assertSame(a, PolicyUtils.intersect(a, b));
- control.replay();
- assertEquals("Unexpected value for base retransmission interval",
- 5000, PolicyUtils.getBaseRetransmissionInterval(message).intValue());
+ InactivityTimeout aiat = new RMAssertion.InactivityTimeout();
+ aiat.setMilliseconds(new BigInteger("3600000"));
+ a.setInactivityTimeout(aiat);
+ InactivityTimeout biat = new RMAssertion.InactivityTimeout();
+ biat.setMilliseconds(new BigInteger("7200000"));
+ b.setInactivityTimeout(biat);
+
+ RMAssertion c = PolicyUtils.intersect(a, b);
+ assertEquals(7200000L, c.getInactivityTimeout().getMilliseconds().longValue());
+ assertNull(c.getBaseRetransmissionInterval());
+ assertNull(c.getAcknowledgementInterval());
+ assertNull(c.getExponentialBackoff());
+
+ BaseRetransmissionInterval abri = new RMAssertion.BaseRetransmissionInterval();
+ abri.setMilliseconds(new BigInteger("10000"));
+ a.setBaseRetransmissionInterval(abri);
+ BaseRetransmissionInterval bbri = new RMAssertion.BaseRetransmissionInterval();
+ bbri.setMilliseconds(new BigInteger("20000"));
+ b.setBaseRetransmissionInterval(bbri);
+
+ c = PolicyUtils.intersect(a, b);
+ assertEquals(7200000L, c.getInactivityTimeout().getMilliseconds().longValue());
+ assertEquals(10000L, c.getBaseRetransmissionInterval().getMilliseconds().longValue());
+ assertNull(c.getAcknowledgementInterval());
+ assertNull(c.getExponentialBackoff());
+
+ AcknowledgementInterval aai = new RMAssertion.AcknowledgementInterval();
+ aai.setMilliseconds(new BigInteger("2000"));
+ a.setAcknowledgementInterval(aai);
+
+ c = PolicyUtils.intersect(a, b);
+ assertEquals(7200000L, c.getInactivityTimeout().getMilliseconds().longValue());
+ assertEquals(10000L, c.getBaseRetransmissionInterval().getMilliseconds().longValue());
+ assertEquals(2000L, c.getAcknowledgementInterval().getMilliseconds().longValue());
+ assertNull(c.getExponentialBackoff());
+
+ b.setExponentialBackoff(new RMAssertion.ExponentialBackoff());
+ c = PolicyUtils.intersect(a, b);
+ assertEquals(7200000L, c.getInactivityTimeout().getMilliseconds().longValue());
+ assertEquals(10000L, c.getBaseRetransmissionInterval().getMilliseconds().longValue());
+ assertEquals(2000L, c.getAcknowledgementInterval().getMilliseconds().longValue());
+ assertNotNull(c.getExponentialBackoff());
}
@Test
- public void testUseExponentialBackoff() {
+ public void testGetRMAssertion() {
+ RMAssertion a = new RMAssertion();
+ BaseRetransmissionInterval abri = new RMAssertion.BaseRetransmissionInterval();
+ abri.setMilliseconds(new BigInteger("3000"));
+ a.setBaseRetransmissionInterval(abri);
+ a.setExponentialBackoff(new RMAssertion.ExponentialBackoff());
+
Message message = control.createMock(Message.class);
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ control.replay();
+ assertSame(a, PolicyUtils.getRMAssertion(a, message));
+ control.verify();
+
+ control.reset();
AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
- AssertionInfo ai = control.createMock(AssertionInfo.class);
Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
- ais.add(ai);
- JaxbAssertion ja = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai.getAssertion()).andReturn(ja);
- RMAssertion rma = control.createMock(RMAssertion.class);
- EasyMock.expect(ja.getData()).andReturn(rma);
- EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
control.replay();
- assertTrue("Should not use exponential backoff", !PolicyUtils.useExponentialBackoff(message));
+ assertSame(a, PolicyUtils.getRMAssertion(a, message));
control.verify();
+
control.reset();
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
- control.replay();
- assertTrue("Should use exponential backoff", PolicyUtils.useExponentialBackoff(message));
- }
-
- @Test
- public void testGetAcknowledgmentInterval() {
- Message message = control.createMock(Message.class);
- AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
+ RMAssertion b = new RMAssertion();
+ BaseRetransmissionInterval bbri = new RMAssertion.BaseRetransmissionInterval();
+ bbri.setMilliseconds(new BigInteger("2000"));
+ b.setBaseRetransmissionInterval(bbri);
+ JaxbAssertion<RMAssertion> assertion = new JaxbAssertion<RMAssertion>();
+ assertion.setName(RMConstants.getRMAssertionQName());
+ assertion.setData(b);
+ AssertionInfo ai = new AssertionInfo(assertion);
+ ais.add(ai);
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
- AssertionInfo ai1 = control.createMock(AssertionInfo.class);
- AssertionInfo ai2 = control.createMock(AssertionInfo.class);
- AssertionInfo ai3 = control.createMock(AssertionInfo.class);
- AssertionInfo ai4 = control.createMock(AssertionInfo.class);
- Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
- ais.add(ai1);
- ais.add(ai2);
- ais.add(ai3);
- ais.add(ai4);
EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
- JaxbAssertion ja1 = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
- RMAssertion rma1 = control.createMock(RMAssertion.class);
- EasyMock.expect(ja1.getData()).andReturn(rma1);
- EasyMock.expect(rma1.getAcknowledgementInterval()).andReturn(null);
- JaxbAssertion ja2 = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
- RMAssertion rma2 = control.createMock(RMAssertion.class);
- EasyMock.expect(ja2.getData()).andReturn(rma2);
- RMAssertion.AcknowledgementInterval aint2 =
- control.createMock(RMAssertion.AcknowledgementInterval.class);
- EasyMock.expect(rma2.getAcknowledgementInterval()).andReturn(aint2);
- EasyMock.expect(aint2.getMilliseconds()).andReturn(null);
- JaxbAssertion ja3 = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
- RMAssertion rma3 = control.createMock(RMAssertion.class);
- EasyMock.expect(ja3.getData()).andReturn(rma3);
- RMAssertion.AcknowledgementInterval aint3 =
- control.createMock(RMAssertion.AcknowledgementInterval.class);
- EasyMock.expect(rma3.getAcknowledgementInterval()).andReturn(aint3);
- EasyMock.expect(aint3.getMilliseconds()).andReturn(new BigInteger("10000"));
- JaxbAssertion ja4 = control.createMock(JaxbAssertion.class);
- EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
- RMAssertion rma4 = control.createMock(RMAssertion.class);
- EasyMock.expect(ja4.getData()).andReturn(rma4);
- RMAssertion.AcknowledgementInterval aint4 =
- control.createMock(RMAssertion.AcknowledgementInterval.class);
- EasyMock.expect(rma4.getAcknowledgementInterval()).andReturn(aint4);
- EasyMock.expect(aint4.getMilliseconds()).andReturn(new BigInteger("5000"));
-
control.replay();
- assertEquals("Unexpected value for acknowledgment interval",
- 5000, PolicyUtils.getAcknowledgmentInterval(message).intValue());
+ RMAssertion c = PolicyUtils.getRMAssertion(a, message);
+ assertNull(c.getAcknowledgementInterval());
+ assertNull(c.getInactivityTimeout());
+ assertEquals(2000L, c.getBaseRetransmissionInterval().getMilliseconds().longValue());
+ assertNotNull(c.getExponentialBackoff());
+ control.verify();
}
-
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Tue May 1 01:49:30 2007
@@ -22,17 +22,13 @@
import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.cxf.message.Message;
-import org.apache.cxf.ws.policy.AssertionInfo;
import org.apache.cxf.ws.policy.AssertionInfoMap;
-import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
import org.apache.cxf.ws.rm.Identifier;
-import org.apache.cxf.ws.rm.RMConstants;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMMessageConstants;
import org.apache.cxf.ws.rm.RMProperties;
@@ -108,68 +104,6 @@
}
@Test
- public void testGetBaseRetransmissionIntervalFromManager() {
- Message message = createMock(Message.class);
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
- EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
- EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(null);
- control.replay();
- assertEquals("Unexpected value for base retransmission interval",
- 0L, queue.getBaseRetransmissionInterval(message));
- control.verify();
- control.reset();
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
- EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
- RMAssertion.BaseRetransmissionInterval bri = createMock(RMAssertion.BaseRetransmissionInterval.class);
- EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
- EasyMock.expect(bri.getMilliseconds()).andReturn(null);
- control.replay();
- assertEquals("Unexpected value for base retransmission interval",
- 0L, queue.getBaseRetransmissionInterval(message));
- control.verify();
- control.reset();
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
- EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
- EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
- EasyMock.expect(bri.getMilliseconds()).andReturn(new BigInteger("7000"));
- control.replay();
- assertEquals("Unexpected value for base retransmission interval",
- 7000L, queue.getBaseRetransmissionInterval(message));
- }
-
- @Test
- public void testUseExponentialBackoff() {
- Message message = createMock(Message.class);
- AssertionInfoMap aim = createMock(AssertionInfoMap.class);
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
- AssertionInfo ai = createMock(AssertionInfo.class);
- Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
- EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
- ais.add(ai);
- JaxbAssertion ja = createMock(JaxbAssertion.class);
- EasyMock.expect(ai.getAssertion()).andReturn(ja);
- EasyMock.expect(ja.getData()).andReturn(rma);
- EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
- control.replay();
- assertTrue("Should not use exponential backoff", !queue.useExponentialBackoff(message));
- control.verify();
- control.reset();
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
- EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
- EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
- control.replay();
- assertTrue("Should not use exponential backoff", !queue.useExponentialBackoff(message));
- control.verify();
- control.reset();
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
- EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
- RMAssertion.ExponentialBackoff eb = createMock(RMAssertion.ExponentialBackoff.class);
- EasyMock.expect(rma.getExponentialBackoff()).andReturn(eb);
- control.replay();
- assertTrue("Should use exponential backoff", queue.useExponentialBackoff(message));
- }
-
- @Test
public void testResendCandidateCtor() {
Message message = createMock(Message.class);
setupMessagePolicies(message);
@@ -329,7 +263,7 @@
setupMessagePolicies(message1);
Message message2 =
setUpMessage("sequence1", messageNumbers[1], false);
- setupMessagePolicies(message1);
+ setupMessagePolicies(message2);
ready(false);
sequenceList.add(queue.createResendCandidate(message1));
@@ -385,7 +319,7 @@
private void setupMessagePolicies(Message message) {
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
- EasyMock.expect(manager.getRMAssertion()).andReturn(rma).times(2);
+ EasyMock.expect(manager.getRMAssertion()).andReturn(rma);
RMAssertion.BaseRetransmissionInterval bri =
createMock(RMAssertion.BaseRetransmissionInterval.class);
EasyMock.expect(rma.getBaseRetransmissionInterval()).andReturn(bri);
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=533968&r1=533967&r2=533968
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Tue May 1 01:49:30 2007
@@ -103,7 +103,8 @@
private boolean doTestTwowayNonAnonymousDeferred = testAll;
private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
private boolean doTestTwowayAtMostOnce = testAll;
- private boolean doTestInvalidSequence = testAll;
+ private boolean doTestUnknownSequence = testAll;
+ private boolean doTestInactivityTimeout = testAll;
private boolean doTestOnewayMessageLoss = testAll;
private boolean doTestOnewayMessageLossAsyncExecutor = testAll;
private boolean doTestTwowayMessageLoss = testAll;
@@ -698,8 +699,8 @@
}
@Test
- public void testInvalidSequence() throws Exception {
- if (!doTestInvalidSequence) {
+ public void testUnknownSequence() throws Exception {
+ if (!doTestUnknownSequence) {
return;
}
@@ -734,6 +735,69 @@
// the third inbound message has a SequenceFault header
MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
mf.verifySequenceFault(RMConstants.getUnknownSequenceFaultCode(), false, 1);
+ }
+
+ @Test
+ public void testInactivityTimeout() throws Exception {
+ if (!doTestInactivityTimeout) {
+ return;
+ }
+
+ setupGreeter("org/apache/cxf/systest/ws/rm/inactivity-timeout.xml");
+
+ greeter.greetMe("one");
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+
+ try {
+ greeter.greetMe("two");
+ fail("Expected fault.");
+ } catch (WebServiceException ex) {
+ SoapFault sf = (SoapFault)ex.getCause();
+ assertEquals("Unexpected fault code.", Soap11.getInstance().getSender(), sf.getFaultCode());
+ assertNull("Unexpected sub code.", sf.getSubCode());
+ assertTrue("Unexpected reason.", sf.getReason().endsWith("is not a known Sequence identifier."));
+ }
+
+ awaitMessages(3, 3, 5000);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ // Expected outbound:
+ // CreateSequence
+ // + two requests (second request does not include acknowledgement for first response as
+ // in the meantime the client has terminated the sequence
+
+ String[] expectedActions = new String[3];
+ expectedActions[0] = RMConstants.getCreateSequenceAction();
+ for (int i = 1; i < expectedActions.length; i++) {
+ expectedActions[i] = GREETME_ACTION;
+ }
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2"}, true);
+ mf.verifyLastMessage(new boolean[3], true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, false}, true);
+
+ // Expected inbound:
+ // createSequenceResponse
+ // + 1 response with acknowledgement
+ // + 1 fault without acknowledgement
+
+ mf.verifyMessages(3, false);
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ null, null};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", null}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, false} , false);
+
+ // the third inbound message has a SequenceFault header
+
+ mf.verifySequenceFault(RMConstants.getUnknownSequenceFaultCode(), false, 2);
+
}
@Test