You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/03/21 09:15:13 UTC

svn commit: r1303305 - in /cxf/branches/2.5.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/

Author: ay
Date: Wed Mar 21 08:15:13 2012
New Revision: 1303305

URL: http://svn.apache.org/viewvc?rev=1303305&view=rev
Log:
Merged revisions 1303303 via  svn merge from
https://svn.apache.org/repos/asf/cxf/trunk

........
  r1303303 | ay | 2012-03-21 09:09:28 +0100 (Wed, 21 Mar 2012) | 1 line
  
  [CXF-4188] Robust-InOnly processing with WS-RM to perform AtMostOnce check
........

Added:
    cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
      - copied unchanged from r1303303, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
    cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
      - copied unchanged from r1303303, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
Modified:
    cxf/branches/2.5.x-fixes/   (props changed)
    cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
    cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
    cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java

Propchange: cxf/branches/2.5.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Wed Mar 21 08:15:13 2012
@@ -113,6 +113,9 @@ public class Destination extends Abstrac
 
         if (null != seq) {
             if (seq.applyDeliveryAssurance(sequenceType.getMessageNumber(), message)) {
+                if (MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY))) {
+                    return;
+                }
                 seq.acknowledge(message);
     
                 if (null != rmps.getCloseSequence()) {

Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Wed Mar 21 08:15:13 2012
@@ -21,8 +21,10 @@ package org.apache.cxf.ws.rm;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.TimerTask;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -32,6 +34,7 @@ import org.apache.cxf.continuations.Cont
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.rm.manager.AcksPolicyType;
 import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
@@ -60,6 +63,7 @@ public class DestinationSequence extends
     private long inProcessNumber;
     private long highNumberCompleted;
     private List<Continuation> continuations = new LinkedList<Continuation>();
+    private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
     
     public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d, ProtocolVariation pv) {
         this(i, a, 0, null, pv);
@@ -238,10 +242,26 @@ public class DestinationSequence extends
         Continuation cont = getContinuation(message);
         DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
         boolean canSkip = !da.isSetAtLeastOnce() && !da.isSetExactlyOnce();
+        boolean robust = false;
+        boolean robustDelivering = false;
+        if (message != null) {
+            robust = MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
+            if (robust) {
+                robustDelivering = 
+                    MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY));
+            }
+        }
+        if (robust && !robustDelivering) {
+            // no check performed if in robust and not in delivering
+            deliveringMessageNumbers.remove(mn);
+            return true;
+        }
         if (cont != null && da.isSetInOrder() && !cont.isNew()) {
             return waitInQueue(mn, canSkip, message, cont);
         }
-        if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) && isAcknowledged(mn)) {            
+        if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) 
+            && (isAcknowledged(mn) 
+                || (robustDelivering && deliveringMessageNumbers.contains(mn)))) {            
             
             // acknowledge at first opportunity following duplicate message
             scheduleImmediateAcknowledgement();
@@ -251,6 +271,9 @@ public class DestinationSequence extends
             throw new RMException(msg);
             
         } 
+        if (robustDelivering) {
+            deliveringMessageNumbers.add(mn);
+        }
         if (da.isSetInOrder()) {
             return waitInQueue(mn, canSkip, message, cont);
         }

Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java Wed Mar 21 08:15:13 2012
@@ -47,6 +47,7 @@ public class RMDeliveryInterceptor exten
         final boolean robust =
             MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
         if (robust) {
+            message.remove(RMMessageConstants.DELIVERING_ROBUST_ONEWAY);
             dest.acknowledge(message);
         }
         dest.processingComplete(message);

Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Wed Mar 21 08:15:13 2012
@@ -153,9 +153,11 @@ public class RMInInterceptor extends Abs
         throws SequenceFault, RMException {
         final boolean robust =
             MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
-        if (!robust) {
-            destination.acknowledge(message);
-        }
+        if (robust) {
+            // set this property to change the acknlowledging behavior
+            message.put(RMMessageConstants.DELIVERING_ROBUST_ONEWAY, Boolean.TRUE);
+        } 
+        destination.acknowledge(message);
     }
     
     void processDeliveryAssurance(RMProperties rmps) {

Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?rev=1303305&r1=1303304&r2=1303305&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Wed Mar 21 08:15:13 2012
@@ -42,6 +42,10 @@ public final class RMMessageConstants {
     public static final String SAVED_CONTENT =
         "org.apache.cxf.ws.rm.content";
     
+    // keep this constant in the ws-rm package until it finds a general use outside of ws-rm
+    static final String DELIVERING_ROBUST_ONEWAY = 
+        "org.apache.cxf.oneway.robust.delivering";
+    
     /**
      * Prevents instantiation. 
      */