You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/03/21 09:15:13 UTC
svn commit: r1303305 - in /cxf/branches/2.5.x-fixes: ./
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/
Author: ay
Date: Wed Mar 21 08:15:13 2012
New Revision: 1303305
URL: http://svn.apache.org/viewvc?rev=1303305&view=rev
Log:
Merged revisions 1303303 via svn merge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r1303303 | ay | 2012-03-21 09:09:28 +0100 (Wed, 21 Mar 2012) | 1 line
[CXF-4188] Robust-InOnly processing with WS-RM to perform AtMostOnce check
........
Added:
cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
- copied unchanged from r1303303, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
- copied unchanged from r1303303, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
Modified:
cxf/branches/2.5.x-fixes/ (props changed)
cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
Propchange: cxf/branches/2.5.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Wed Mar 21 08:15:13 2012
@@ -113,6 +113,9 @@ public class Destination extends Abstrac
if (null != seq) {
if (seq.applyDeliveryAssurance(sequenceType.getMessageNumber(), message)) {
+ if (MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY))) {
+ return;
+ }
seq.acknowledge(message);
if (null != rmps.getCloseSequence()) {
Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Wed Mar 21 08:15:13 2012
@@ -21,8 +21,10 @@ package org.apache.cxf.ws.rm;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,6 +34,7 @@ import org.apache.cxf.continuations.Cont
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.rm.manager.AcksPolicyType;
import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
@@ -60,6 +63,7 @@ public class DestinationSequence extends
private long inProcessNumber;
private long highNumberCompleted;
private List<Continuation> continuations = new LinkedList<Continuation>();
+ private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d, ProtocolVariation pv) {
this(i, a, 0, null, pv);
@@ -238,10 +242,26 @@ public class DestinationSequence extends
Continuation cont = getContinuation(message);
DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
boolean canSkip = !da.isSetAtLeastOnce() && !da.isSetExactlyOnce();
+ boolean robust = false;
+ boolean robustDelivering = false;
+ if (message != null) {
+ robust = MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
+ if (robust) {
+ robustDelivering =
+ MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY));
+ }
+ }
+ if (robust && !robustDelivering) {
+ // no check performed if in robust and not in delivering
+ deliveringMessageNumbers.remove(mn);
+ return true;
+ }
if (cont != null && da.isSetInOrder() && !cont.isNew()) {
return waitInQueue(mn, canSkip, message, cont);
}
- if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) && isAcknowledged(mn)) {
+ if ((da.isSetExactlyOnce() || da.isSetAtMostOnce())
+ && (isAcknowledged(mn)
+ || (robustDelivering && deliveringMessageNumbers.contains(mn)))) {
// acknowledge at first opportunity following duplicate message
scheduleImmediateAcknowledgement();
@@ -251,6 +271,9 @@ public class DestinationSequence extends
throw new RMException(msg);
}
+ if (robustDelivering) {
+ deliveringMessageNumbers.add(mn);
+ }
if (da.isSetInOrder()) {
return waitInQueue(mn, canSkip, message, cont);
}
Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java Wed Mar 21 08:15:13 2012
@@ -47,6 +47,7 @@ public class RMDeliveryInterceptor exten
final boolean robust =
MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
if (robust) {
+ message.remove(RMMessageConstants.DELIVERING_ROBUST_ONEWAY);
dest.acknowledge(message);
}
dest.processingComplete(message);
Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Wed Mar 21 08:15:13 2012
@@ -153,9 +153,11 @@ public class RMInInterceptor extends Abs
throws SequenceFault, RMException {
final boolean robust =
MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
- if (!robust) {
- destination.acknowledge(message);
- }
+ if (robust) {
+ // set this property to change the acknlowledging behavior
+ message.put(RMMessageConstants.DELIVERING_ROBUST_ONEWAY, Boolean.TRUE);
+ }
+ destination.acknowledge(message);
}
void processDeliveryAssurance(RMProperties rmps) {
Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Wed Mar 21 08:15:13 2012
@@ -42,6 +42,10 @@ public final class RMMessageConstants {
public static final String SAVED_CONTENT =
"org.apache.cxf.ws.rm.content";
+ // keep this constant in the ws-rm package until it finds a general use outside of ws-rm
+ static final String DELIVERING_ROBUST_ONEWAY =
+ "org.apache.cxf.oneway.robust.delivering";
+
/**
* Prevents instantiation.
*/