You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/03/06 08:51:03 UTC

svn commit: r1297370 - in /cxf/branches/2.5.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/

Author: ay
Date: Tue Mar  6 07:51:02 2012
New Revision: 1297370

URL: http://svn.apache.org/viewvc?rev=1297370&view=rev
Log:
Merged revisions 1297296 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/trunk

........
  r1297296 | ay | 2012-03-06 00:57:14 +0100 (Tue, 06 Mar 2012) | 1 line
  
  [CXF-4164] Robust-InOnly processing with WS-RM must delay updating the sequence until message delivery
........

Added:
    cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java
      - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java
    cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java
      - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java
    cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml
      - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml
Modified:
    cxf/branches/2.5.x-fixes/   (props changed)
    cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
    cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
    cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java

Propchange: cxf/branches/2.5.x-fixes/
------------------------------------------------------------------------------
    svn:mergeinfo = /cxf/trunk:1297296

Propchange: cxf/branches/2.5.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java Tue Mar  6 07:51:02 2012
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.phase.Phase;
 
 /**
@@ -42,6 +43,12 @@ public class RMDeliveryInterceptor exten
     
     public void handle(Message message) throws SequenceFault, RMException {
         LOG.entering(getClass().getName(), "handleMessage");
-        getManager().getDestination(message).processingComplete(message);
+        Destination dest = getManager().getDestination(message);
+        final boolean robust =
+            MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
+        if (robust) {
+            dest.acknowledge(message);
+        }
+        dest.processingComplete(message);
     }
 }

Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Tue Mar  6 07:51:02 2012
@@ -25,6 +25,7 @@ import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
 import org.apache.cxf.ws.addressing.ContextUtils;
 import org.apache.cxf.ws.addressing.MAPAggregator;
@@ -150,7 +151,11 @@ public class RMInInterceptor extends Abs
     
     void processSequence(Destination destination, Message message) 
         throws SequenceFault, RMException {
-        destination.acknowledge(message);
+        final boolean robust =
+            MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
+        if (!robust) {
+            destination.acknowledge(message);
+        }
     }
     
     void processDeliveryAssurance(RMProperties rmps) {

Modified: cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java?rev=1297370&r1=1297369&r2=1297370&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java (original)
+++ cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java Tue Mar  6 07:51:02 2012
@@ -18,186 +18,12 @@
  */
 package org.apache.cxf.systest.ws.rm;
 
-import java.net.MalformedURLException;
-import java.util.logging.Logger;
-
-import javax.xml.ws.Endpoint;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.BusFactory;
-import org.apache.cxf.bus.spring.SpringBusFactory;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.greeter_control.Control;
-import org.apache.cxf.greeter_control.ControlService;
-import org.apache.cxf.greeter_control.Greeter;
-import org.apache.cxf.greeter_control.GreeterService;
-import org.apache.cxf.greeter_control.types.FaultLocation;
-import org.apache.cxf.interceptor.ServiceInvokerInterceptor;
-import org.apache.cxf.phase.Phase;
-import org.apache.cxf.test.TestUtilities;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.ws.rm.RMManager;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
 /**
  * Tests the acknowledgement delivery back to the non-decoupled port when there is some
  * error at the provider side and how its behavior is affected by the robust in-only mode setting.
  */
-public class ServiceInvocationAckTest extends AbstractBusClientServerTestBase {
-    public static final String PORT = allocatePort(Server.class);
-    
-    private static final Logger LOG = LogUtils.getLogger(ServiceInvocationAckTest.class);
-
-    private static final String CONTROL_PORT_ADDRESS = 
-        "http://localhost:" + PORT + "/SoapContext/ControlPort";
-
-    public static class Server extends AbstractBusTestServerBase {
-
-        protected void run() {
-            SpringBusFactory factory = new SpringBusFactory();
-            Bus bus = factory.createBus();
-            BusFactory.setDefaultBus(bus);
-            setBus(bus);
-
-            ControlImpl implementor = new ControlImpl();
-            implementor.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort");
-            GreeterImpl greeterImplementor = new GreeterImpl();
-            implementor.setImplementor(greeterImplementor);
-            Endpoint.publish(CONTROL_PORT_ADDRESS, implementor);
-            LOG.fine("Published control endpoint.");
-        }
-
-        public static void main(String[] args) {
-            try {
-                Server s = new Server();
-                s.start();
-            } catch (Exception ex) {
-                ex.printStackTrace();
-                System.exit(-1);
-            } finally {
-                System.out.println("done!");
-            }
-        }
-    }
-    
-    private Bus controlBus;
-    private Control control;
-    private Bus greeterBus;
-    private Greeter greeter;
-    
-
-    @BeforeClass
-    public static void startServers() throws Exception {
-        TestUtilities.setKeepAliveSystemProperty(false);
-        assertTrue("server did not launch correctly", launchServer(Server.class, true));
-    }
-    
-    @AfterClass
-    public static void cleanup() {
-        TestUtilities.recoverKeepAliveSystemProperty();
-    }
-    
-    @After
-    public void tearDown() {
-        if (null != greeter) {
-            assertTrue("Failed to stop greeter.", control.stopGreeter(null));
-            greeterBus.shutdown(true);
-            greeterBus = null;
-        }
-        if (null != control) {  
-            assertTrue("Failed to stop greeter", control.stopGreeter(null));
-            controlBus.shutdown(true);
-        }
-    }
-
-    @Test
-    public void testDefaultInvocationHandling() throws Exception {
+public class ServiceInvocationAckTest extends ServiceInvocationAckBase {
+    protected void setupGreeter() throws Exception {
         setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml");
-
-        control.setRobustInOnlyMode(false);
-        
-        FaultLocation location = new org.apache.cxf.greeter_control.types.ObjectFactory()
-            .createFaultLocation();
-        location.setPhase(Phase.INVOKE);
-        location.setBefore(ServiceInvokerInterceptor.class.getName());
-        
-        RMManager manager = greeterBus.getExtension(RMManager.class);
-
-        // the message is acked and the invocation takes place
-        greeter.greetMeOneWay("one");
-        Thread.sleep(6000L);
-        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
-    
-        control.setFaultLocation(location);
-
-        // the invocation fails but the message is acked because the delivery succeeds
-        greeter.greetMeOneWay("two");
-        Thread.sleep(6000L);
-        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
-    }
-
-    @Test
-    public void testRobustInvocationHandling() throws Exception {
-        setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml");
-
-        control.setRobustInOnlyMode(true);
-        
-        FaultLocation location = new org.apache.cxf.greeter_control.types.ObjectFactory()
-            .createFaultLocation();
-        location.setPhase(Phase.INVOKE);
-        location.setBefore(ServiceInvokerInterceptor.class.getName());
-        
-        RMManager manager = greeterBus.getExtension(RMManager.class);
-
-        
-        // the message is acked and the invocation takes place
-        greeter.greetMeOneWay("one");
-        Thread.sleep(6000L);
-        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
-
-        control.setFaultLocation(location);
-
-        // the invocation fails but the message is acked because the delivery succeeds
-        greeter.greetMeOneWay("two");
-        Thread.sleep(6000L);
-        assertFalse("RetransmissionQueue must not be empty", manager.getRetransmissionQueue().isEmpty());
-        
-        location.setPhase(null);
-        control.setFaultLocation(location);
-
-        // the retransmission succeeds and the invocation succeeds, the message is acked
-        Thread.sleep(6000L);
-        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
-        
-    }
-
-    private void setupGreeter(String cfgResource) throws NumberFormatException, MalformedURLException {
-        
-        SpringBusFactory bf = new SpringBusFactory();
-        
-        controlBus = bf.createBus();
-        BusFactory.setDefaultBus(controlBus);
-
-        ControlService cs = new ControlService();
-        control = cs.getControlPort();
-        updateAddressPort(control, PORT);
-        
-        assertTrue("Failed to start greeter", control.startGreeter(cfgResource));
-        
-        greeterBus = bf.createBus(cfgResource);
-        BusFactory.setDefaultBus(greeterBus);
-        LOG.fine("Initialised greeter bus with configuration: " + cfgResource);
-        
-        GreeterService gs = new GreeterService();
-
-        greeter = gs.getGreeterPort();
-        updateAddressPort(greeter, PORT);
-        LOG.fine("Created greeter client.");
-
     }
 }



Re: svn commit: r1297370 - in /cxf/branches/2.5.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/

Posted by Aki Yoshida <el...@googlemail.com>.
2012/3/6 Dennis Sosnoski <dm...@sosnoski.com>:
> Hi Aki,
>
> On reviewing the code, I think you'll also need to make a change to
> Destination.acknowledge() for this to work correctly. Right now
> Destination.acknowledge() is what persists the received message to the
> store, so if it's not called until processing is complete messages will
> never be persisted. This should be moved out to a separate method which
> can be called by RMInInterceptor.

Hi Dennis,

In the robust mode, the message does not need to be persisted but only
its sequence needs to be updated after the call completes. I am using
this robust mode, which was introduced for normal oneway robost-inonly
calls, as a workaround for the ws-rm's missing retry mechanism for the
failed inbound service invocation.

This is the reason why we skip the persistence of the inbound message
for the robust-oneway mode at RMInInterceptor. If we persisted the
message and updated the sequence in that phase, and later when the
service invocation fails, we would need to revert/rollback the
sequence in memory as well as in persistence. So, to avoid this
situation, my change delays updating the sequence (i.e., calling
dest.acknowledged) until the call succeeds, and that is when the
DeliveryInterceptor reports that the call is completed. So I think it
is correct in this context.

>
> Separately, it looks like we need to change the code to handle passing
> persisted messages on to the application when recovering from the store.
> It looks to me like at present messages will be acknowledged by the RM
> layer but never delivered to the application if there's a crash or
> shutdown while they're waiting to be processed. What do you think?

Yes. There are a few things we need to do. Currently, the original
transported wire message is persisted, which is similar to the
restriction which we have in the outbound side. For the inbound side,
I think we can change this so that we capture the post ws-rm handling
(that implies automatically post ws-sec handling) with the endpoint
info so that we can restart this message from that step on. Here, we
should introduce an automatic retry mechanism.

To ensure the at-least-one invocation of the service, we can go
without transaction and let the service check the duplicates in their
domain. For the exactly-once invocation, we will need a transaction.

I would like to work on this part if that is okay with you. We can
discuss further on this topic. But a bigger remaining thing is the
desired change in the persistence/retry mechanism for the outbound so
that messages can be persisted even when a sequence is not yet
available and  the retransmitted messages can be retransmitted with
the updated information. I only thought about it a little bit. Are you
or will you be working on it?

Thanks.
Regards, aki
>
>  - Dennis
>
>
> On 03/06/2012 08:51 PM, ay@apache.org wrote:
>> Author: ay
>> Date: Tue Mar  6 07:51:02 2012
>> New Revision: 1297370
>>
>> URL: http://svn.apache.org/viewvc?rev=1297370&view=rev
>> Log:
>> Merged revisions 1297296 via svnmerge from
>> https://svn.apache.org/repos/asf/cxf/trunk
>>
>> ........
>>   r1297296 | ay | 2012-03-06 00:57:14 +0100 (Tue, 06 Mar 2012) | 1 line
>>
>>   [CXF-4164] Robust-InOnly processing with WS-RM must delay updating the sequence until message delivery
>> ........
>>
>> Added:
>>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java
>>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java
>>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java
>>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java
>>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml
>>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml
>> Modified:
>>     cxf/branches/2.5.x-fixes/   (props changed)
>>     cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
>>     cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
>>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
>>
>> Propchange: cxf/branches/2.5.x-fixes/
>> ------------------------------------------------------------------------------
>>     svn:mergeinfo = /cxf/trunk:1297296
>>
>> Propchange: cxf/branches/2.5.x-fixes/
>> ------------------------------------------------------------------------------
>> Binary property 'svnmerge-integrated' - no diff available.
>>
>> Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
>> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff
>> ==============================================================================
>> --- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (original)
>> +++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java Tue Mar  6 07:51:02 2012
>> @@ -23,6 +23,7 @@ import java.util.logging.Logger;
>>
>>  import org.apache.cxf.common.logging.LogUtils;
>>  import org.apache.cxf.message.Message;
>> +import org.apache.cxf.message.MessageUtils;
>>  import org.apache.cxf.phase.Phase;
>>
>>  /**
>> @@ -42,6 +43,12 @@ public class RMDeliveryInterceptor exten
>>
>>      public void handle(Message message) throws SequenceFault, RMException {
>>          LOG.entering(getClass().getName(), "handleMessage");
>> -        getManager().getDestination(message).processingComplete(message);
>> +        Destination dest = getManager().getDestination(message);
>> +        final boolean robust =
>> +            MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
>> +        if (robust) {
>> +            dest.acknowledge(message);
>> +        }
>> +        dest.processingComplete(message);
>>      }
>>  }
>>
>> Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
>> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff
>> ==============================================================================
>> --- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
>> +++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Tue Mar  6 07:51:02 2012
>> @@ -25,6 +25,7 @@ import java.util.logging.Logger;
>>
>>  import org.apache.cxf.common.logging.LogUtils;
>>  import org.apache.cxf.message.Message;
>> +import org.apache.cxf.message.MessageUtils;
>>  import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
>>  import org.apache.cxf.ws.addressing.ContextUtils;
>>  import org.apache.cxf.ws.addressing.MAPAggregator;
>> @@ -150,7 +151,11 @@ public class RMInInterceptor extends Abs
>>
>>      void processSequence(Destination destination, Message message)
>>          throws SequenceFault, RMException {
>> -        destination.acknowledge(message);
>> +        final boolean robust =
>> +            MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
>> +        if (!robust) {
>> +            destination.acknowledge(message);
>> +        }
>>      }
>>
>>      void processDeliveryAssurance(RMProperties rmps) {
>>
>> Modified: cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
>> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java?rev=1297370&r1=1297369&r2=1297370&view=diff
>> ==============================================================================
>> --- cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java (original)
>> +++ cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java Tue Mar  6 07:51:02 2012
>> @@ -18,186 +18,12 @@
>>   */
>>  package org.apache.cxf.systest.ws.rm;
>>
>> -import java.net.MalformedURLException;
>> -import java.util.logging.Logger;
>> -
>> -import javax.xml.ws.Endpoint;
>> -
>> -import org.apache.cxf.Bus;
>> -import org.apache.cxf.BusFactory;
>> -import org.apache.cxf.bus.spring.SpringBusFactory;
>> -import org.apache.cxf.common.logging.LogUtils;
>> -import org.apache.cxf.greeter_control.Control;
>> -import org.apache.cxf.greeter_control.ControlService;
>> -import org.apache.cxf.greeter_control.Greeter;
>> -import org.apache.cxf.greeter_control.GreeterService;
>> -import org.apache.cxf.greeter_control.types.FaultLocation;
>> -import org.apache.cxf.interceptor.ServiceInvokerInterceptor;
>> -import org.apache.cxf.phase.Phase;
>> -import org.apache.cxf.test.TestUtilities;
>> -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
>> -import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
>> -import org.apache.cxf.ws.rm.RMManager;
>> -
>> -import org.junit.After;
>> -import org.junit.AfterClass;
>> -import org.junit.BeforeClass;
>> -import org.junit.Test;
>> -
>>  /**
>>   * Tests the acknowledgement delivery back to the non-decoupled port when there is some
>>   * error at the provider side and how its behavior is affected by the robust in-only mode setting.
>>   */
>> -public class ServiceInvocationAckTest extends AbstractBusClientServerTestBase {
>> -    public static final String PORT = allocatePort(Server.class);
>> -
>> -    private static final Logger LOG = LogUtils.getLogger(ServiceInvocationAckTest.class);
>> -
>> -    private static final String CONTROL_PORT_ADDRESS =
>> -        "http://localhost:" + PORT + "/SoapContext/ControlPort";
>> -
>> -    public static class Server extends AbstractBusTestServerBase {
>> -
>> -        protected void run() {
>> -            SpringBusFactory factory = new SpringBusFactory();
>> -            Bus bus = factory.createBus();
>> -            BusFactory.setDefaultBus(bus);
>> -            setBus(bus);
>> -
>> -            ControlImpl implementor = new ControlImpl();
>> -            implementor.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort");
>> -            GreeterImpl greeterImplementor = new GreeterImpl();
>> -            implementor.setImplementor(greeterImplementor);
>> -            Endpoint.publish(CONTROL_PORT_ADDRESS, implementor);
>> -            LOG.fine("Published control endpoint.");
>> -        }
>> -
>> -        public static void main(String[] args) {
>> -            try {
>> -                Server s = new Server();
>> -                s.start();
>> -            } catch (Exception ex) {
>> -                ex.printStackTrace();
>> -                System.exit(-1);
>> -            } finally {
>> -                System.out.println("done!");
>> -            }
>> -        }
>> -    }
>> -
>> -    private Bus controlBus;
>> -    private Control control;
>> -    private Bus greeterBus;
>> -    private Greeter greeter;
>> -
>> -
>> -    @BeforeClass
>> -    public static void startServers() throws Exception {
>> -        TestUtilities.setKeepAliveSystemProperty(false);
>> -        assertTrue("server did not launch correctly", launchServer(Server.class, true));
>> -    }
>> -
>> -    @AfterClass
>> -    public static void cleanup() {
>> -        TestUtilities.recoverKeepAliveSystemProperty();
>> -    }
>> -
>> -    @After
>> -    public void tearDown() {
>> -        if (null != greeter) {
>> -            assertTrue("Failed to stop greeter.", control.stopGreeter(null));
>> -            greeterBus.shutdown(true);
>> -            greeterBus = null;
>> -        }
>> -        if (null != control) {
>> -            assertTrue("Failed to stop greeter", control.stopGreeter(null));
>> -            controlBus.shutdown(true);
>> -        }
>> -    }
>> -
>> -    @Test
>> -    public void testDefaultInvocationHandling() throws Exception {
>> +public class ServiceInvocationAckTest extends ServiceInvocationAckBase {
>> +    protected void setupGreeter() throws Exception {
>>          setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml");
>> -
>> -        control.setRobustInOnlyMode(false);
>> -
>> -        FaultLocation location = new org.apache.cxf.greeter_control.types.ObjectFactory()
>> -            .createFaultLocation();
>> -        location.setPhase(Phase.INVOKE);
>> -        location.setBefore(ServiceInvokerInterceptor.class.getName());
>> -
>> -        RMManager manager = greeterBus.getExtension(RMManager.class);
>> -
>> -        // the message is acked and the invocation takes place
>> -        greeter.greetMeOneWay("one");
>> -        Thread.sleep(6000L);
>> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
>> -
>> -        control.setFaultLocation(location);
>> -
>> -        // the invocation fails but the message is acked because the delivery succeeds
>> -        greeter.greetMeOneWay("two");
>> -        Thread.sleep(6000L);
>> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
>> -    }
>> -
>> -    @Test
>> -    public void testRobustInvocationHandling() throws Exception {
>> -        setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml");
>> -
>> -        control.setRobustInOnlyMode(true);
>> -
>> -        FaultLocation location = new org.apache.cxf.greeter_control.types.ObjectFactory()
>> -            .createFaultLocation();
>> -        location.setPhase(Phase.INVOKE);
>> -        location.setBefore(ServiceInvokerInterceptor.class.getName());
>> -
>> -        RMManager manager = greeterBus.getExtension(RMManager.class);
>> -
>> -
>> -        // the message is acked and the invocation takes place
>> -        greeter.greetMeOneWay("one");
>> -        Thread.sleep(6000L);
>> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
>> -
>> -        control.setFaultLocation(location);
>> -
>> -        // the invocation fails but the message is acked because the delivery succeeds
>> -        greeter.greetMeOneWay("two");
>> -        Thread.sleep(6000L);
>> -        assertFalse("RetransmissionQueue must not be empty", manager.getRetransmissionQueue().isEmpty());
>> -
>> -        location.setPhase(null);
>> -        control.setFaultLocation(location);
>> -
>> -        // the retransmission succeeds and the invocation succeeds, the message is acked
>> -        Thread.sleep(6000L);
>> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
>> -
>> -    }
>> -
>> -    private void setupGreeter(String cfgResource) throws NumberFormatException, MalformedURLException {
>> -
>> -        SpringBusFactory bf = new SpringBusFactory();
>> -
>> -        controlBus = bf.createBus();
>> -        BusFactory.setDefaultBus(controlBus);
>> -
>> -        ControlService cs = new ControlService();
>> -        control = cs.getControlPort();
>> -        updateAddressPort(control, PORT);
>> -
>> -        assertTrue("Failed to start greeter", control.startGreeter(cfgResource));
>> -
>> -        greeterBus = bf.createBus(cfgResource);
>> -        BusFactory.setDefaultBus(greeterBus);
>> -        LOG.fine("Initialised greeter bus with configuration: " + cfgResource);
>> -
>> -        GreeterService gs = new GreeterService();
>> -
>> -        greeter = gs.getGreeterPort();
>> -        updateAddressPort(greeter, PORT);
>> -        LOG.fine("Created greeter client.");
>> -
>>      }
>>  }
>>
>>

Re: svn commit: r1297370 - in /cxf/branches/2.5.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/

Posted by Dennis Sosnoski <dm...@sosnoski.com>.
Hi Aki,

On reviewing the code, I think you'll also need to make a change to
Destination.acknowledge() for this to work correctly. Right now
Destination.acknowledge() is what persists the received message to the
store, so if it's not called until processing is complete messages will
never be persisted. This should be moved out to a separate method which
can be called by RMInInterceptor.

Separately, it looks like we need to change the code to handle passing
persisted messages on to the application when recovering from the store.
It looks to me like at present messages will be acknowledged by the RM
layer but never delivered to the application if there's a crash or
shutdown while they're waiting to be processed. What do you think?

  - Dennis


On 03/06/2012 08:51 PM, ay@apache.org wrote:
> Author: ay
> Date: Tue Mar  6 07:51:02 2012
> New Revision: 1297370
>
> URL: http://svn.apache.org/viewvc?rev=1297370&view=rev
> Log:
> Merged revisions 1297296 via svnmerge from 
> https://svn.apache.org/repos/asf/cxf/trunk
>
> ........
>   r1297296 | ay | 2012-03-06 00:57:14 +0100 (Tue, 06 Mar 2012) | 1 line
>   
>   [CXF-4164] Robust-InOnly processing with WS-RM must delay updating the sequence until message delivery
> ........
>
> Added:
>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java
>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java
>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java
>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java
>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml
>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml
> Modified:
>     cxf/branches/2.5.x-fixes/   (props changed)
>     cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
>     cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
>
> Propchange: cxf/branches/2.5.x-fixes/
> ------------------------------------------------------------------------------
>     svn:mergeinfo = /cxf/trunk:1297296
>
> Propchange: cxf/branches/2.5.x-fixes/
> ------------------------------------------------------------------------------
> Binary property 'svnmerge-integrated' - no diff available.
>
> Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff
> ==============================================================================
> --- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (original)
> +++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java Tue Mar  6 07:51:02 2012
> @@ -23,6 +23,7 @@ import java.util.logging.Logger;
>  
>  import org.apache.cxf.common.logging.LogUtils;
>  import org.apache.cxf.message.Message;
> +import org.apache.cxf.message.MessageUtils;
>  import org.apache.cxf.phase.Phase;
>  
>  /**
> @@ -42,6 +43,12 @@ public class RMDeliveryInterceptor exten
>      
>      public void handle(Message message) throws SequenceFault, RMException {
>          LOG.entering(getClass().getName(), "handleMessage");
> -        getManager().getDestination(message).processingComplete(message);
> +        Destination dest = getManager().getDestination(message);
> +        final boolean robust =
> +            MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
> +        if (robust) {
> +            dest.acknowledge(message);
> +        }
> +        dest.processingComplete(message);
>      }
>  }
>
> Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff
> ==============================================================================
> --- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
> +++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Tue Mar  6 07:51:02 2012
> @@ -25,6 +25,7 @@ import java.util.logging.Logger;
>  
>  import org.apache.cxf.common.logging.LogUtils;
>  import org.apache.cxf.message.Message;
> +import org.apache.cxf.message.MessageUtils;
>  import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
>  import org.apache.cxf.ws.addressing.ContextUtils;
>  import org.apache.cxf.ws.addressing.MAPAggregator;
> @@ -150,7 +151,11 @@ public class RMInInterceptor extends Abs
>      
>      void processSequence(Destination destination, Message message) 
>          throws SequenceFault, RMException {
> -        destination.acknowledge(message);
> +        final boolean robust =
> +            MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
> +        if (!robust) {
> +            destination.acknowledge(message);
> +        }
>      }
>      
>      void processDeliveryAssurance(RMProperties rmps) {
>
> Modified: cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java?rev=1297370&r1=1297369&r2=1297370&view=diff
> ==============================================================================
> --- cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java (original)
> +++ cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java Tue Mar  6 07:51:02 2012
> @@ -18,186 +18,12 @@
>   */
>  package org.apache.cxf.systest.ws.rm;
>  
> -import java.net.MalformedURLException;
> -import java.util.logging.Logger;
> -
> -import javax.xml.ws.Endpoint;
> -
> -import org.apache.cxf.Bus;
> -import org.apache.cxf.BusFactory;
> -import org.apache.cxf.bus.spring.SpringBusFactory;
> -import org.apache.cxf.common.logging.LogUtils;
> -import org.apache.cxf.greeter_control.Control;
> -import org.apache.cxf.greeter_control.ControlService;
> -import org.apache.cxf.greeter_control.Greeter;
> -import org.apache.cxf.greeter_control.GreeterService;
> -import org.apache.cxf.greeter_control.types.FaultLocation;
> -import org.apache.cxf.interceptor.ServiceInvokerInterceptor;
> -import org.apache.cxf.phase.Phase;
> -import org.apache.cxf.test.TestUtilities;
> -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
> -import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
> -import org.apache.cxf.ws.rm.RMManager;
> -
> -import org.junit.After;
> -import org.junit.AfterClass;
> -import org.junit.BeforeClass;
> -import org.junit.Test;
> -
>  /**
>   * Tests the acknowledgement delivery back to the non-decoupled port when there is some
>   * error at the provider side and how its behavior is affected by the robust in-only mode setting.
>   */
> -public class ServiceInvocationAckTest extends AbstractBusClientServerTestBase {
> -    public static final String PORT = allocatePort(Server.class);
> -    
> -    private static final Logger LOG = LogUtils.getLogger(ServiceInvocationAckTest.class);
> -
> -    private static final String CONTROL_PORT_ADDRESS = 
> -        "http://localhost:" + PORT + "/SoapContext/ControlPort";
> -
> -    public static class Server extends AbstractBusTestServerBase {
> -
> -        protected void run() {
> -            SpringBusFactory factory = new SpringBusFactory();
> -            Bus bus = factory.createBus();
> -            BusFactory.setDefaultBus(bus);
> -            setBus(bus);
> -
> -            ControlImpl implementor = new ControlImpl();
> -            implementor.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort");
> -            GreeterImpl greeterImplementor = new GreeterImpl();
> -            implementor.setImplementor(greeterImplementor);
> -            Endpoint.publish(CONTROL_PORT_ADDRESS, implementor);
> -            LOG.fine("Published control endpoint.");
> -        }
> -
> -        public static void main(String[] args) {
> -            try {
> -                Server s = new Server();
> -                s.start();
> -            } catch (Exception ex) {
> -                ex.printStackTrace();
> -                System.exit(-1);
> -            } finally {
> -                System.out.println("done!");
> -            }
> -        }
> -    }
> -    
> -    private Bus controlBus;
> -    private Control control;
> -    private Bus greeterBus;
> -    private Greeter greeter;
> -    
> -
> -    @BeforeClass
> -    public static void startServers() throws Exception {
> -        TestUtilities.setKeepAliveSystemProperty(false);
> -        assertTrue("server did not launch correctly", launchServer(Server.class, true));
> -    }
> -    
> -    @AfterClass
> -    public static void cleanup() {
> -        TestUtilities.recoverKeepAliveSystemProperty();
> -    }
> -    
> -    @After
> -    public void tearDown() {
> -        if (null != greeter) {
> -            assertTrue("Failed to stop greeter.", control.stopGreeter(null));
> -            greeterBus.shutdown(true);
> -            greeterBus = null;
> -        }
> -        if (null != control) {  
> -            assertTrue("Failed to stop greeter", control.stopGreeter(null));
> -            controlBus.shutdown(true);
> -        }
> -    }
> -
> -    @Test
> -    public void testDefaultInvocationHandling() throws Exception {
> +public class ServiceInvocationAckTest extends ServiceInvocationAckBase {
> +    protected void setupGreeter() throws Exception {
>          setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml");
> -
> -        control.setRobustInOnlyMode(false);
> -        
> -        FaultLocation location = new org.apache.cxf.greeter_control.types.ObjectFactory()
> -            .createFaultLocation();
> -        location.setPhase(Phase.INVOKE);
> -        location.setBefore(ServiceInvokerInterceptor.class.getName());
> -        
> -        RMManager manager = greeterBus.getExtension(RMManager.class);
> -
> -        // the message is acked and the invocation takes place
> -        greeter.greetMeOneWay("one");
> -        Thread.sleep(6000L);
> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
> -    
> -        control.setFaultLocation(location);
> -
> -        // the invocation fails but the message is acked because the delivery succeeds
> -        greeter.greetMeOneWay("two");
> -        Thread.sleep(6000L);
> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
> -    }
> -
> -    @Test
> -    public void testRobustInvocationHandling() throws Exception {
> -        setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml");
> -
> -        control.setRobustInOnlyMode(true);
> -        
> -        FaultLocation location = new org.apache.cxf.greeter_control.types.ObjectFactory()
> -            .createFaultLocation();
> -        location.setPhase(Phase.INVOKE);
> -        location.setBefore(ServiceInvokerInterceptor.class.getName());
> -        
> -        RMManager manager = greeterBus.getExtension(RMManager.class);
> -
> -        
> -        // the message is acked and the invocation takes place
> -        greeter.greetMeOneWay("one");
> -        Thread.sleep(6000L);
> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
> -
> -        control.setFaultLocation(location);
> -
> -        // the invocation fails but the message is acked because the delivery succeeds
> -        greeter.greetMeOneWay("two");
> -        Thread.sleep(6000L);
> -        assertFalse("RetransmissionQueue must not be empty", manager.getRetransmissionQueue().isEmpty());
> -        
> -        location.setPhase(null);
> -        control.setFaultLocation(location);
> -
> -        // the retransmission succeeds and the invocation succeeds, the message is acked
> -        Thread.sleep(6000L);
> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
> -        
> -    }
> -
> -    private void setupGreeter(String cfgResource) throws NumberFormatException, MalformedURLException {
> -        
> -        SpringBusFactory bf = new SpringBusFactory();
> -        
> -        controlBus = bf.createBus();
> -        BusFactory.setDefaultBus(controlBus);
> -
> -        ControlService cs = new ControlService();
> -        control = cs.getControlPort();
> -        updateAddressPort(control, PORT);
> -        
> -        assertTrue("Failed to start greeter", control.startGreeter(cfgResource));
> -        
> -        greeterBus = bf.createBus(cfgResource);
> -        BusFactory.setDefaultBus(greeterBus);
> -        LOG.fine("Initialised greeter bus with configuration: " + cfgResource);
> -        
> -        GreeterService gs = new GreeterService();
> -
> -        greeter = gs.getGreeterPort();
> -        updateAddressPort(greeter, PORT);
> -        LOG.fine("Created greeter client.");
> -
>      }
>  }
>
>