You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2006/11/10 13:01:09 UTC

svn commit: r473319 - in /incubator/cxf/trunk: rt/core/src/main/java/org/apache/cxf/endpoint/ rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/src/...

Author: andreasmyth
Date: Fri Nov 10 04:01:06 2006
New Revision: 473319

URL: http://svn.apache.org/viewvc?view=rev&rev=473319
Log:
[JIRA CXF-130, CXF-140] Support for stand-alone SequenceAcknowledgment messages.

Added:
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml   (with props)
Modified:
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
    incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Fri Nov 10 04:01:06 2006
@@ -30,6 +30,7 @@
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusException;
 import org.apache.cxf.binding.Binding;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.interceptor.AbstractBasicInterceptorProvider;
 import org.apache.cxf.interceptor.ClientOutFaultObserver;
 import org.apache.cxf.interceptor.Fault;
@@ -56,7 +57,7 @@
 
 public class ClientImpl extends AbstractBasicInterceptorProvider implements Client, MessageObserver {
     
-    private static final Logger LOG = Logger.getLogger(ClientImpl.class.getName());
+    private static final Logger LOG = LogUtils.getL7dLogger(ClientImpl.class);
 
     private static final String FINISHED = "exchange.finished";
     
@@ -64,12 +65,19 @@
     private Endpoint endpoint;
     private Conduit initedConduit;
     private ClientOutFaultObserver outFaultObserver; 
-    private int synchronousTimeout = 100000; // default 10 second timeout
+    private int synchronousTimeout = 10000; // default 10 second timeout
 
     public ClientImpl(Bus b, Endpoint e) {
+        this(b, e, null);
+    }
+
+    public ClientImpl(Bus b, Endpoint e, Conduit c) {
         bus = b;
         endpoint = e;
         outFaultObserver = new ClientOutFaultObserver(bus);
+        if (null != c) {
+            initedConduit = c;
+        }
     }
 
     public Endpoint getEndpoint() {
@@ -184,7 +192,7 @@
             remaining -= (int)(end - start);
         }
         if (!Boolean.TRUE.equals(exchange.get(FINISHED))) {
-            LOG.info("RESPONSE_TIMEOUT");
+            LogUtils.log(LOG, Level.WARNING, "RESPONSE_TIMEOUT");
         }
     }
 

Modified: incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java Fri Nov 10 04:01:06 2006
@@ -161,4 +161,53 @@
     public void exposeAs(String uri) {
         namespaceURI = uri;
     }
+
+    public String toString() {
+        StringBuffer buf = new StringBuffer();
+        buf.append("[");
+        if (null != messageID) {
+            if (buf.length() > 1) {
+                buf.append(", ");
+            }
+            buf.append("MessageId: ");
+            buf.append(messageID.getValue());
+        }
+        if (null != action) {
+            if (buf.length() > 1) {
+                buf.append(", ");
+            }
+            buf.append("Action: ");
+            buf.append(action.getValue());
+        }
+        if (null != to) {
+            if (buf.length() > 1) {
+                buf.append(", ");
+            }
+            buf.append("To: ");
+            buf.append(to.getValue()); 
+        }
+        if (null != replyTo) {
+            AttributedURIType address = replyTo.getAddress();
+            if (null != address) {
+                if (buf.length() > 1) {
+                    buf.append(", ");
+                }
+                buf.append("ReplyTo: ");
+                buf.append(address.getValue()); 
+            }
+        }
+        if (null != faultTo) {
+            AttributedURIType address = faultTo.getAddress();
+            if (null != address) {
+                if (buf.length() > 1) {
+                    buf.append(", ");
+                }
+                buf.append("FaultTo: ");
+                buf.append(address.getValue()); 
+            }
+        }
+        buf.append("]");
+        return buf.toString();
+        
+    }
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Fri Nov 10 04:01:06 2006
@@ -119,184 +119,4 @@
         }
         return null;
     }
-    
-    
-    
-    
-    
-   
-
-
-    /*
-     * private static final Logger LOG = LogUtils.getL7dLogger(RMHandler.class);
-     * private static Map<BindingBase, RMHandler> handlers; private RMSource
-     * source; private RMDestination destination; private RMProxy proxy; private
-     * RMServant servant; private ConfigurationHelper configurationHelper;
-     * private PersistenceManager persistenceManager; private Timer timer;
-     * private boolean busLifeCycleListenesRegistered; @Resource(name =
-     * JAXWSConstants.BUS_PROPERTY) private Bus bus; @Resource(name =
-     * JAXWSConstants.CLIENT_BINDING_PROPERTY) private ClientBinding
-     * clientBinding; @Resource(name = JAXWSConstants.SERVER_BINDING_PROPERTY)
-     * private ServerBinding serverBinding; @Resource(name =
-     * JAXWSConstants.CLIENT_TRANSPORT_PROPERTY) private ClientTransport
-     * clientTransport; @Resource(name =
-     * JAXWSConstants.SERVER_TRANSPORT_PROPERTY) private ServerTransport
-     * serverTransport; public RMHandler() { proxy = new RMProxy(this); servant =
-     * new RMServant(); } @PostConstruct protected synchronized void
-     * initialise() { /* getHandlerMap().put(getBinding(), this); if (null ==
-     * configurationHelper) { configurationHelper = new
-     * ConfigurationHelper(getBinding(), null == clientBinding); } if (null ==
-     * getSource()) { source = new RMSource(this); } if (null == destination) {
-     * destination = new RMDestination(this); } if (null == timer) { timer = new
-     * Timer(); } if (!busLifeCycleListenerRegistered) {
-     * getBinding().getBus().getLifeCycleManager()
-     * .registerLifeCycleListener(new RMBusLifeCycleListener(getSource()));
-     * busLifeCycleListenerRegistered = true; } } public static Map<BindingBase,
-     * RMHandler> getHandlerMap() { if (null == handlers) { handlers = new
-     * HashMap<BindingBase, RMHandler>(); } return handlers; } public void
-     * close(MessageContext context) { // TODO commit transaction } public
-     * boolean handleFault(LogicalMessageContext context) { return
-     * handle(context); } public boolean handleMessage(LogicalMessageContext
-     * context) { return handle(context); } public PersistenceManager
-     * getPersistenceManager() { return persistenceManager; } public void
-     * setPersistenceManager(PersistenceManager pm) { persistenceManager = pm; }
-     * public RMStore getStore() { if (null != persistenceManager) { return
-     * persistenceManager.getStore(); } return null; } public Timer getTimer() {
-     * return timer; } public Bus getBus() { return bus; } public Transport
-     * getTransport() { return null == clientTransport ? serverTransport :
-     * clientTransport; } public ClientTransport getClientTransport() { return
-     * clientTransport; } public ServerTransport getServerTransport() { return
-     * serverTransport; } public ClientBinding getClientBinding() { return
-     * clientBinding; } public ServerBinding getServerBinding() { return
-     * serverBinding; } public boolean isServerSide() { return null !=
-     * serverBinding; } public AbstractBindingBase getBinding() { if (null !=
-     * clientBinding) { return (AbstractBindingBase)clientBinding; } return
-     * (AbstractBindingBase)serverBinding; } public RMProxy getProxy() { return
-     * proxy; } public RMServant getServant() { return servant; } public
-     * RMSource getSource() { return source; } public RMDestination
-     * getDestination() { return destination; } protected void
-     * open(LogicalMessageContext context) { // TODO begin transaction }
-     * protected boolean handle(LogicalMessageContext context) { try { if
-     * (ContextUtils.isOutbound(context)) { handleOutbound(context); } else {
-     * handleInbound(context); } } catch (SequenceFault sf) {
-     * LOG.log(Level.SEVERE, "SequenceFault", sf); } return true; } protected
-     * void handleOutbound(LogicalMessageContext context) throws SequenceFault {
-     * LOG.entering(getClass().getName(), "handleOutbound");
-     * AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(context, false,
-     * true); // ensure the appropriate version of WS-Addressing is used
-     * maps.exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME); String
-     * action = null; if (maps != null && null != maps.getAction()) { action =
-     * maps.getAction().getValue(); } // nothing to do if this is a
-     * CreateSequence, TerminateSequence or // SequenceInfo request if
-     * (LOG.isLoggable(Level.FINE)) { LOG.fine("Action: " + action); } boolean
-     * isApplicationMessage = true; if
-     * (RMUtils.getRMConstants().getCreateSequenceAction().equals(action) ||
-     * RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action) ||
-     * RMUtils.getRMConstants().getTerminateSequenceAction().equals(action) ||
-     * RMUtils.getRMConstants().getLastMessageAction().equals(action) ||
-     * RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action) ||
-     * RMUtils.getRMConstants().getSequenceInfoAction().equals(action)) {
-     * isApplicationMessage = false; } RMPropertiesImpl rmpsOut =
-     * (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, true); if
-     * (null == rmpsOut) { rmpsOut = new RMPropertiesImpl();
-     * RMContextUtils.storeRMProperties(context, rmpsOut, true); }
-     * RMPropertiesImpl rmpsIn = null; Identifier inSeqId = null; BigInteger
-     * inMessageNumber = null; if (isApplicationMessage) { rmpsIn =
-     * (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, false); if
-     * (null != rmpsIn && null != rmpsIn.getSequence()) { inSeqId =
-     * rmpsIn.getSequence().getIdentifier(); inMessageNumber =
-     * rmpsIn.getSequence().getMessageNumber(); } if
-     * (LOG.isLoggable(Level.FINE)) { LOG.fine("inbound sequence: " + (null ==
-     * inSeqId ? "null" : inSeqId.getValue())); } // not for partial responses
-     * to oneway requests if (!(isServerSide() &&
-     * BindingContextUtils.isOnewayTransport(context))) { if
-     * (!ContextUtils.isRequestor(context)) { assert null != inSeqId; } // get
-     * the current sequence, requesting the creation of a new one if necessary
-     * SourceSequence seq = getSequence(inSeqId, context, maps); assert null !=
-     * seq; // increase message number and store a sequence type object in //
-     * context seq.nextMessageNumber(inSeqId, inMessageNumber);
-     * rmpsOut.setSequence(seq); // if this was the last message in the
-     * sequence, reset the // current sequence so that a new one will be created
-     * next // time the handler is invoked if (seq.isLastMessage()) {
-     * source.setCurrent(null); } } } // add Acknowledgements (to application
-     * messages or explicitly // created Acknowledgement messages only) if
-     * (isApplicationMessage ||
-     * RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action)) {
-     * AttributedURI to = VersionTransformer.convert(maps.getTo()); assert null !=
-     * to; addAcknowledgements(rmpsOut, inSeqId, to); } // indicate to the
-     * binding that a response is expected from the transport although // the
-     * web method is a oneway method if
-     * (BindingContextUtils.isOnewayMethod(context) ||
-     * RMUtils.getRMConstants().getLastMessageAction().equals(action)) {
-     * context.put(OutputStreamMessageContext.ONEWAY_MESSAGE_TF, Boolean.FALSE); } }
-     * protected void handleInbound(LogicalMessageContext context) throws
-     * SequenceFault { LOG.entering(getClass().getName(), "handleInbound");
-     * RMProperties rmps = RMContextUtils.retrieveRMProperties(context, false);
-     * final AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(context,
-     * false, false); assert null != maps; String action = null; if (null !=
-     * maps.getAction()) { action = maps.getAction().getValue(); } if
-     * (LOG.isLoggable(Level.FINE)) { LOG.fine("Action: " + action); } if
-     * (RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action)) {
-     * Object[] parameters =
-     * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
-     * CreateSequenceResponseType csr =
-     * (CreateSequenceResponseType)parameters[0];
-     * getServant().createSequenceResponse(getSource(), csr,
-     * getProxy().getOfferedIdentifier()); return; } else if
-     * (RMUtils.getRMConstants().getCreateSequenceAction().equals(action)) {
-     * Object[] parameters =
-     * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
-     * CreateSequenceType cs = (CreateSequenceType)parameters[0]; final
-     * CreateSequenceResponseType csr =
-     * getServant().createSequence(getDestination(), cs, maps); Runnable
-     * response = new Runnable() { public void run() { try {
-     * getProxy().createSequenceResponse(maps, csr); } catch (IOException ex) {
-     * ex.printStackTrace(); } catch (SequenceFault sf) { sf.printStackTrace(); } } };
-     * getBinding().getBus().getWorkQueueManager().getAutomaticWorkQueue().execute(response);
-     * return; } else if
-     * (RMUtils.getRMConstants().getTerminateSequenceAction().equals(action)) {
-     * Object[] parameters =
-     * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
-     * TerminateSequenceType cs = (TerminateSequenceType)parameters[0];
-     * getServant().terminateSequence(getDestination(), cs.getIdentifier()); } //
-     * for application AND out of band messages if (null != rmps) {
-     * processAcknowledgments(rmps); processAcknowledgmentRequests(rmps);
-     * processSequence(rmps, maps); processDeliveryAssurance(rmps); } } void
-     * processAcknowledgments(RMProperties rmps) { Collection<SequenceAcknowledgement>
-     * acks = rmps.getAcks(); if (null != acks) { for (SequenceAcknowledgement
-     * ack : acks) { getSource().setAcknowledged(ack); } } } void
-     * processSequence(RMProperties rmps, AddressingProperties maps) throws
-     * SequenceFault { SequenceType s = rmps.getSequence(); if (null == s) {
-     * return; } getDestination().acknowledge(s, null == maps.getReplyTo() ?
-     * null : maps.getReplyTo().getAddress().getValue()); } void
-     * processAcknowledgmentRequests(RMProperties rmps) { Collection<AckRequestedType>
-     * requested = rmps.getAcksRequested(); if (null != requested) { for
-     * (AckRequestedType ar : requested) { DestinationSequence seq =
-     * getDestination().getSequence(ar.getIdentifier()); if (null != seq) {
-     * seq.scheduleImmediateAcknowledgement(); } else { LOG.severe("No such
-     * sequence."); } } } } boolean processDeliveryAssurance(RMProperties rmps) {
-     * SequenceType s = rmps.getSequence(); if (null == s) { return true; }
-     * DestinationSequence ds = destination.getSequence(s.getIdentifier());
-     * return ds.applyDeliveryAssurance(s.getMessageNumber()); }
-     */
-
-    /*
-   
-    protected void setInitialised(ConfigurationHelper ch,
-                                  RMSource s,
-                                  RMDestination d,
-                                  Timer t,
-                                  boolean registered
-                                  ) {
-        configurationHelper = ch;
-        source = s;
-        destination = d;
-        timer = t;
-        busLifeCycleListenerRegistered = registered;
-        initialise();
-    }
-    */
-    
-    
-    
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Fri Nov 10 04:01:06 2006
@@ -180,7 +180,7 @@
     boolean canPiggybackAckOnPartialResponse() {
         // TODO: should also check if we allow breaking the WI Profile rule by which no headers
         // can be included in a HTTP response
-        return getAcksTo().getAddress().getValue().equals(RMConstants.getAnonympusAddress());
+        return getAcksTo().getAddress().getValue().equals(RMConstants.getAnonymousAddress());
     }
        
     /**
@@ -277,6 +277,7 @@
     }
 
     synchronized void scheduleDeferredAcknowledgement(int delay) {
+        
         if (null == deferredAcknowledgments) {
             deferredAcknowledgments = new ArrayList<DeferredAcknowledgment>();
         }
@@ -290,11 +291,13 @@
         DeferredAcknowledgment da = new DeferredAcknowledgment();
         deferredAcknowledgments.add(da);
         destination.getManager().getTimer().schedule(da, delay);
+        LOG.fine("Scheduled acknowledgment to be sent in " + delay + " ms");
     }
 
     final class DeferredAcknowledgment extends TimerTask {
 
         public void run() {
+            LOG.fine("timer task: send acknowledgment.");
             DestinationSequence.this.scheduleImmediateAcknowledgement();
 
             try {                

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Fri Nov 10 04:01:06 2006
@@ -20,20 +20,29 @@
 package org.apache.cxf.ws.rm;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.xml.datatype.Duration;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.ClientImpl;
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.service.model.BindingInfo;
 import org.apache.cxf.service.model.BindingOperationInfo;
 import org.apache.cxf.service.model.OperationInfo;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.JAXWSAConstants;
 import org.apache.cxf.ws.addressing.RelatesToType;
+import org.apache.cxf.ws.addressing.VersionTransformer;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
 
@@ -43,7 +52,7 @@
  */
 public class Proxy {
 
-    private static final Logger LOG = Logger.getLogger(Proxy.class.getName());
+    private static final Logger LOG = LogUtils.getL7dLogger(Proxy.class);
 
     private RMEndpoint reliableEndpoint;
     
@@ -58,7 +67,17 @@
     }
 
     void acknowledge(DestinationSequence ds) throws IOException {
-
+        OperationInfo oi = reliableEndpoint.getService().getServiceInfo().getInterface()
+            .getOperation(RMConstants.getSequenceAckOperationName());
+        Map<String, Object> requestContext = new HashMap<String, Object>();
+        AddressingPropertiesImpl maps = new AddressingPropertiesImpl();
+        maps.setTo(VersionTransformer.convert(ds.getAcksTo()).getAddress());        
+        maps.setReplyTo(reliableEndpoint.getTransportDestination().getAddress());
+        requestContext.put(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES, maps);
+        Map<String, Object> context = CastUtils.cast(
+            Collections.singletonMap(Client.REQUEST_CONTEXT, requestContext),
+            String.class,  Object.class);
+        invoke(oi, new Object[] {}, context);
     }
 
     public CreateSequenceResponseType createSequence(org.apache.cxf.ws.addressing.EndpointReferenceType to, 
@@ -98,20 +117,16 @@
         
         OperationInfo oi = reliableEndpoint.getService().getServiceInfo().getInterface()
             .getOperation(RMConstants.getCreateSequenceOperationName());
-        Object result = invoke(oi, new Object[] {create});
-        LOG.info("result: " + result);
-        return (CreateSequenceResponseType)result;
-        
+        return (CreateSequenceResponseType)invoke(oi, new Object[] {create}, null);
     }
     
     void lastMessage(SourceSequence s) throws IOException {
         // TODO
     }
        
-    Object invoke(OperationInfo oi, Object[] params) {
+    Object invoke(OperationInfo oi, Object[] params, Map<String, Object> context) {
         LOG.log(Level.INFO, "Invoking out-of-band RM protocol message {0}.", 
                 oi == null ? null : oi.getName());
-        LOG.log(Level.INFO, "params: " + params);
         
         // assuming we are on the client side
         
@@ -120,11 +135,15 @@
         Endpoint endpoint = reliableEndpoint.getEndpoint();
         BindingInfo bi = reliableEndpoint.getBindingInfo();
         
-                
-        Client client = new RMClient(bus, endpoint);
+        if (null == reliableEndpoint.getConduit()) {
+            LOG.severe("No conduit to available.");
+            return null;
+        }
+        Client client = new RMClient(bus, endpoint, reliableEndpoint.getConduit());
+       
         BindingOperationInfo boi = bi.getOperation(oi);
         try {
-            Object[] result = client.invoke(boi, params, null);
+            Object[] result = client.invoke(boi, params, context);
             if (result != null && result.length > 0) {
                 return result[0];
             }
@@ -136,8 +155,8 @@
     }
     
     class RMClient extends ClientImpl {
-        RMClient(Bus bus, Endpoint endpoint) {
-            super(bus, endpoint);
+        RMClient(Bus bus, Endpoint endpoint, Conduit conduit) {
+            super(bus, endpoint, conduit);
         }
 
         @Override

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java Fri Nov 10 04:01:06 2006
@@ -93,6 +93,9 @@
     private static final String WSRM_TERMINATE_SEQUENCE_ACTION =
         WSRM_NAMESPACE_NAME + "/TerminateSequence";
     
+    private static final String WSRM_SEQUENCE_ACK_ACTION =
+        WSRM_NAMESPACE_NAME + "/SequenceAcknowledgement";
+    
     private static final String WSRM_LAST_MESSAGE_ACTION =
         WSRM_NAMESPACE_NAME + "/LastMessage";
     
@@ -179,7 +182,7 @@
         return WSRM_ACK_REQUESTED_QNAME;
     }
     
-    public static String getAnonympusAddress() {
+    public static String getAnonymousAddress() {
         return WSA_ANONYMOUS_ADDRESS; 
     }
     
@@ -199,6 +202,10 @@
         return WSRM_TERMINATE_SEQUENCE_QNAME;
     }
     
+    public static QName getSequenceAckOperationName() {
+        return WSRM_SEQUENCE_ACK_QNAME;
+    }
+    
     public static String getCreateSequenceAction() {
         return WSRM_CREATE_SEQUENCE_ACTION;
     }
@@ -209,6 +216,10 @@
     
     public static String getTerminateSequenceAction() {
         return WSRM_TERMINATE_SEQUENCE_ACTION;
+    }
+    
+    public static String getSequenceAckAction() {
+        return WSRM_SEQUENCE_ACK_ACTION;
     }
     
     public static String getLastMessageAction() {

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Fri Nov 10 04:01:06 2006
@@ -73,8 +73,12 @@
      * @return true iff message is currently being processed on server side
      */
     public static boolean isServerSide(Message message) {
-        // TODO
-        return false;
+        if (isOutbound(message)) {
+            return message.getExchange().getInMessage() != null;
+        } else {
+            return message.getExchange().getOutMessage() == null 
+                && message.getExchange().getFaultMessage() == null;
+        }
     }
     
     /**

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Fri Nov 10 04:01:06 2006
@@ -41,6 +41,7 @@
 import org.apache.cxf.service.model.OperationInfo;
 import org.apache.cxf.service.model.ServiceInfo;
 import org.apache.cxf.service.model.UnwrappedOperationInfo;
+import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.wsdl.UsingAddressing;
 
 public class RMEndpoint {
@@ -56,6 +57,8 @@
         
     private final RMManager manager;
     private final Endpoint applicationEndpoint;
+    private Conduit conduit;
+    private org.apache.cxf.transport.Destination transportDestination;
     private Source source;
     private Destination destination;
     private Service service;
@@ -156,7 +159,23 @@
         this.source = source;
     } 
     
-    void initialise() {
+    /** 
+     * @return Returns the conduit.
+     */
+    public Conduit getConduit() {
+        return conduit;
+    }
+    
+    /** 
+     * @return Returns the conduit.
+     */
+    public org.apache.cxf.transport.Destination getTransportDestination() {
+        return transportDestination;
+    }
+    
+    void initialise(Conduit c, org.apache.cxf.transport.Destination d) {
+        conduit = c;
+        transportDestination = d;
         createService();
         createEndpoint();
     }
@@ -223,7 +242,6 @@
         partInfo.setElementQName(RMConstants.getCreateSequenceOperationName());
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceType.class);
-        
         unwrappedMessageInfo = new MessageInfo(operationInfo, messageInfo.getName());
         unwrappedOperationInfo = new UnwrappedOperationInfo(operationInfo);
         operationInfo.setUnwrappedOperation(unwrappedOperationInfo);
@@ -239,7 +257,6 @@
         partInfo.setElementQName(RMConstants.getCreateSequenceResponseOperationName());
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceResponseType.class);
-
         unwrappedMessageInfo = new MessageInfo(operationInfo, messageInfo.getName());
         unwrappedOperationInfo.setOutput(operationInfo.getOutputName(), unwrappedMessageInfo);
         partInfo = unwrappedMessageInfo.addMessagePart("createResponse");
@@ -247,20 +264,6 @@
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceResponseType.class);
         
-        /*
-        oi = ii.addOperation(RMConstants.getCreateSequenceResponseOperationName());
-        mi = oi.createMessage(RMConstants.getCreateSequenceResponseOperationName());
-        oi.setInput(mi.getName().getLocalPart(), mi);
-        pi = mi.addMessagePart("createResponse");
-        pi.setElementQName(RMConstants.getCreateSequenceResponseOperationName());
-        pi.setElement(true);
-        pi.setTypeClass(CreateSequenceResponseType.class);
-        unwrappedInput = new MessageInfo(oi, mi.getName());
-        unwrapped = new UnwrappedOperationInfo(oi);
-        oi.setUnwrappedOperation(unwrapped);
-        unwrapped.setInput(oi.getInputName(), unwrappedInput);
-        */
-        
         operationInfo = ii.addOperation(RMConstants.getTerminateSequenceOperationName());
         messageInfo = operationInfo.createMessage(RMConstants.getTerminateSequenceOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
@@ -272,6 +275,14 @@
         unwrappedOperationInfo = new UnwrappedOperationInfo(operationInfo);
         operationInfo.setUnwrappedOperation(unwrappedOperationInfo);
         unwrappedOperationInfo.setInput(operationInfo.getInputName(), unwrappedMessageInfo);
+        
+        operationInfo = ii.addOperation(RMConstants.getSequenceAckOperationName());
+        messageInfo = operationInfo.createMessage(RMConstants.getSequenceAckOperationName());
+        operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
+        unwrappedMessageInfo = new MessageInfo(operationInfo, messageInfo.getName());
+        unwrappedOperationInfo = new UnwrappedOperationInfo(operationInfo);
+        operationInfo.setUnwrappedOperation(unwrappedOperationInfo);
+        unwrappedOperationInfo.setInput(operationInfo.getInputName(), unwrappedMessageInfo);
     }
 
     void buildBindingInfo(ServiceInfo si) {
@@ -290,19 +301,18 @@
             boi.addExtensor(soi);
             bi.addOperation(boi);
             
-            /*
-            boi = bi.buildOperation(RMConstants.getCreateSequenceResponseOperationName(), 
-                RMConstants.getCreateSequenceResponseOperationName().getLocalPart(), null);
+            boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(), 
+                RMConstants.getTerminateSequenceOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
-            soi.setAction(RMConstants.getCreateSequenceResponseAction());
+            soi.setAction(RMConstants.getTerminateSequenceAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            */
             
-            boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(), 
-                RMConstants.getTerminateSequenceOperationName().getLocalPart(), null);
+            boi = bi.buildOperation(RMConstants.getSequenceAckOperationName(), 
+                null, null);
+            assert null != boi;
             soi = new SoapOperationInfo();
-            soi.setAction(RMConstants.getTerminateSequenceAction());
+            soi.setAction(RMConstants.getSequenceAckAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
 

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Fri Nov 10 04:01:06 2006
@@ -109,16 +109,19 @@
             return;
         } else if (RMConstants.getTerminateSequenceAction().equals(action)) {
             // servant.terminateSequence(message);
+        } else if (RMConstants.getSequenceAckAction().equals(action)) {
+            processAcknowledgments(rmps);
+            return;
         }
         
         // for application AND out of band messages
-
+        
         Destination destination = getManager().getDestination(message);
         
         if (null != rmps) {            
-            
-            processAcknowledgments(rmps);
 
+            processAcknowledgments(rmps);
+            
             processAcknowledgmentRequests(rmps);  
             
             processSequence(destination, rmps, maps);

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Fri Nov 10 04:01:06 2006
@@ -31,6 +31,7 @@
 import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
 import org.apache.cxf.ws.addressing.RelatesToType;
@@ -96,7 +97,20 @@
         RMEndpoint rme = reliableEndpoints.get(endpoint);
         if (null == rme) {
             rme = new RMEndpoint(this, endpoint);
-            rme.initialise();
+            Conduit conduit = null;
+            org.apache.cxf.transport.Destination destination = null;
+            if (RMContextUtils.isServerSide(message)) {
+                AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
+                destination = message.getExchange().getDestination();
+                try {
+                    conduit = destination.getBackChannel(message, null, maps.getReplyTo());
+                } catch (IOException ex) {
+                    ex.printStackTrace();
+                }
+            } else {
+                conduit = message.getExchange().getConduit();
+            }
+            rme.initialise(conduit, destination);
             reliableEndpoints.put(endpoint, rme);
         }
         return rme;
@@ -142,10 +156,16 @@
                     relatesTo.setValue(inSeq != null ? inSeq.getCorrelationID() : null);
 
                 } else {
-                    acksTo = VersionTransformer.convert(maps.getReplyTo());
-                    // for oneways
-                    if (RMConstants.getNoneAddress().equals(acksTo.getAddress().getValue())) {
-                        acksTo = RMUtils.createAnonymousReference2004();
+                    acksTo = VersionTransformer.convert(maps.getReplyTo()); 
+                    if (!RMContextUtils.isServerSide(message)
+                        && RMConstants.getNoneAddress().equals(acksTo.getAddress().getValue())) {
+                        org.apache.cxf.transport.Destination dest = message.getExchange()
+                            .getConduit().getBackChannel();
+                        if (null == dest) {
+                            acksTo = RMUtils.createAnonymousReference2004();
+                        } else {
+                            acksTo = VersionTransformer.convert(dest.getAddress());
+                        }
                     }
                 }
 
@@ -184,8 +204,12 @@
         }
         if (!isSetSourcePolicy()) {
             SourcePolicyType sp = factory.createSourcePolicyType();
-            sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
             setSourcePolicy(sp);
+            
+        }
+        if (!getSourcePolicy().isSetSequenceTerminationPolicy()) {
+            getSourcePolicy().setSequenceTerminationPolicy(
+                factory.createSequenceTerminationPolicyType());            
         }
         if (!isSetDestinationPolicy()) {
             DestinationPolicyType dp = factory.createDestinationPolicyType();

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Fri Nov 10 04:01:06 2006
@@ -130,7 +130,11 @@
             AttributedURI to = VersionTransformer.convert(maps.getTo());
             assert null != to;
             addAcknowledgements(destination, rmpsOut, inSeqId, to);
-        }     
+        } 
+        
+        if (RMConstants.getSequenceAckAction().equals(action)) {
+            maps.setReplyTo(RMUtils.createNoneReference());
+        }
     }
     
     void addAcknowledgements(Destination destination, 
@@ -150,8 +154,8 @@
                     LOG.fine("no need to add an acknowledgements for sequence "
                              + seq.getIdentifier().getValue());
                 } else {
-                    LOG.fine("sequences acksTo (" + seq.getAcksTo().getAddress().getValue()
-                             + ") does not match to (" + to.getValue() + ")");
+                    LOG.fine("sequences acksTo address (" + seq.getAcksTo().getAddress().getValue()
+                             + ") does not match to address (" + to.getValue() + ")");
                 }
             }
         }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Fri Nov 10 04:01:06 2006
@@ -28,7 +28,6 @@
 import javax.xml.datatype.Duration;
 
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.jaxb.DatatypeFactory;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
@@ -44,7 +43,7 @@
  */
 public class Servant implements Invoker {
 
-    private static final Logger LOG = LogUtils.getL7dLogger(AbstractRMInterceptor.class);
+    private static final Logger LOG = LogUtils.getL7dLogger(Servant.class);
     private RMEndpoint reliableEndpoint;
     // REVISIT assumption there is only a single outstanding unattached Identifier
     private Identifier unattachedIdentifier;
@@ -55,15 +54,6 @@
     
     public Object invoke(Exchange exchange, Object o) {
         LOG.fine("Invoking on RM Endpoint");
-        List<Object> params = CastUtils.cast((List<?>)o);
-        Object param = params.get(0);
-        LOG.fine("param: " + param);
-        if (param instanceof CreateSequenceType) {
-            CreateSequenceType create = (CreateSequenceType)param;
-            LOG.info("CreateSequenceType: " + create);
-            LOG.info("    acksTo: " + create.getAcksTo());
-            LOG.info("    offer: " + create.getOffer());
-        }
         OperationInfo oi = exchange.get(OperationInfo.class);
         if (RMConstants.getCreateSequenceOperationName().equals(oi.getName())) {
             try {
@@ -99,8 +89,6 @@
             supportedDuration = DatatypeFactory.PT0S;
         }
         Expires ex = create.getExpires();
-        LOG.fine("expires: " + ex);
-        LOG.fine("acksTo: " + create.getAcksTo());
         
         if (null != ex || supportedDuration.isShorterThan(DatatypeFactory.PT0S)) {
             Duration effectiveDuration = supportedDuration;
@@ -113,7 +101,6 @@
         }
         
         OfferType offer = create.getOffer();
-        LOG.fine("offer: " + offer);
         if (null != offer) {
             AcceptType accept = RMUtils.getWSRMFactory().createAcceptType();
             if (dp.isAcceptOffers()) {
@@ -147,7 +134,6 @@
         seq.setCorrelationID(maps.getMessageID().getValue());
         destination.addSequence(seq);
         
-        LOG.fine("Returning createResponse: " + createResponse);
         return createResponse;
     }
 

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java Fri Nov 10 04:01:06 2006
@@ -40,10 +40,16 @@
 
         InterfaceInfo intf = si.getInterface();
         
-        assertEquals(2, intf.getOperations().size());
+        assertEquals(3, intf.getOperations().size());
         
         String ns = si.getName().getNamespaceURI();
         OperationInfo oi = intf.getOperation(new QName(ns, "CreateSequence"));
+        assertNotNull("No operation info.", oi);
+        
+        oi = intf.getOperation(new QName(ns, "TerminateSequence"));
+        assertNotNull("No operation info.", oi);
+        
+        oi = intf.getOperation(new QName(ns, "SequenceAcknowledgement"));
         assertNotNull("No operation info.", oi);
         
     }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Fri Nov 10 04:01:06 2006
@@ -30,6 +30,7 @@
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.addressing.RelatesToType;
@@ -180,6 +181,14 @@
         Method m = RMManager.class.getDeclaredMethod("getSource", new Class[] {Message.class});
         RMManager manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
+        Exchange exchange = control.createMock(Exchange.class);
+        EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
+        EasyMock.expect(exchange.getOutMessage()).andReturn(message).anyTimes();
+        EasyMock.expect(exchange.getInMessage()).andReturn(null).anyTimes();
+        EasyMock.expect(exchange.getFaultMessage()).andReturn(null).anyTimes();
+        Conduit conduit = control.createMock(Conduit.class);
+        EasyMock.expect(exchange.getConduit()).andReturn(conduit).anyTimes();
+        EasyMock.expect(conduit.getBackChannel()).andReturn(null).anyTimes();
         Identifier inSid = control.createMock(Identifier.class);
         AddressingProperties maps = control.createMock(AddressingProperties.class);
         Source source = control.createMock(Source.class);

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java Fri Nov 10 04:01:06 2006
@@ -51,6 +51,9 @@
         greeterBus = bf.createBus(cfgResource);
         bf.setDefaultBus(greeterBus);
         LOG.info("Initialised bus with cfg file resource: " + cfgResource);
+        greeterBus.getOutInterceptors().add(new JaxwsInterceptorRemover());
+        greeterBus.getOutInterceptors().add(new OutMessageRecorder());
+        greeterBus.getInInterceptors().add(new InMessageRecorder());
         
         GreeterImpl implementor = new GreeterImpl();
         String address = "http://localhost:9020/SoapContext/GreeterPort";

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java Fri Nov 10 04:01:06 2006
@@ -45,19 +45,36 @@
     private List<Document> inboundMessages;
       
     public MessageFlow(List<byte[]> out, List<byte[]> in) throws Exception {
+        inboundMessages = new ArrayList<Document>();
+        outboundMessages = new ArrayList<Document>();
+        reset(out, in);
+    }
+    
+    public void clear() throws Exception {
+        inStreams.clear();
+        outStreams.clear();
+    }
+    
+    public final void reset(List<byte[]> out, List<byte[]> in) throws Exception {
+        for (int i = 0; i < inboundMessages.size(); i++) {
+            in.remove(0);
+        }
         inStreams = in;
+        for (int i = 0; i < outboundMessages.size(); i++) {
+            out.remove(0);
+        }
         outStreams = out;
         DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
         factory.setNamespaceAware(true);
         DocumentBuilder parser = factory.newDocumentBuilder();
-        inboundMessages = new ArrayList<Document>();
+        inboundMessages.clear();
         for (int i = 0; i < inStreams.size(); i++) {
             byte[] bytes = inStreams.get(i);
             ByteArrayInputStream is = new ByteArrayInputStream(bytes);
             Document document = parser.parse(is);
             inboundMessages.add(document);
         }
-        outboundMessages = new ArrayList<Document>();
+        outboundMessages.clear();
         for (int i = 0; i < outStreams.size(); i++) {
             byte[] bytes = outStreams.get(i);
             ByteArrayInputStream is = new ByteArrayInputStream(bytes);

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Fri Nov 10 04:01:06 2006
@@ -42,10 +42,11 @@
 public class SequenceTest extends ClientServerTestBase {
 
     private static final Logger LOG = Logger.getLogger(SequenceTest.class.getName());
-    private static final String APP_NAMESPACE = "http://celtix.objectweb.org/greeter_control";
-    private static final String GREETMEONEWAY_ACTION = APP_NAMESPACE + "/types/Greeter/greetMeOneWay";
-    //private static final String GREETME_ACTION = APP_NAMESPACE + "/types/Greeter/greetMe";
-    //private static final String GREETME_RESPONSE_ACTION = GREETME_ACTION + "Response";
+    // private static final String APP_NAMESPACE = "http://celtix.objectweb.org/greeter_control";
+    // private static final String GREETMEONEWAY_ACTION = APP_NAMESPACE + "/types/Greeter/greetMeOneWay";
+    // private static final String GREETME_ACTION = APP_NAMESPACE + "/types/Greeter/greetMe";
+    // private static final String GREETME_RESPONSE_ACTION = GREETME_ACTION + "Response";
+    private static final String GREETMEONEWAY_ACTION = null;
 
     private Bus controlBus;
     private Control control;
@@ -56,6 +57,7 @@
 
     private boolean doTestOnewayAnonymousAcks = true;
     private boolean doTestOnewayDeferredAnonymousAcks = true;
+    private boolean doTestOnewayDeferredNonAnonymousAcks = true;
 
     public static void main(String[] args) {
         junit.textui.TestRunner.run(SequenceTest.class);
@@ -128,7 +130,6 @@
         mf.verifyMessages(4, true);
         String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), GREETMEONEWAY_ACTION,
                                                  GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION};
-        expectedActions = new String[] {RMConstants.getCreateSequenceAction(), null, null, null};
         mf.verifyActions(expectedActions, true);
         mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
 
@@ -162,8 +163,8 @@
                 
         // three application messages plus createSequence
         mf.verifyMessages(4, true);
-        String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), null,
-                                                 null, null};
+        String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), GREETMEONEWAY_ACTION,
+                                                 GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION};
         mf.verifyActions(expectedActions, true);
         mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
 
@@ -177,6 +178,57 @@
         mf.verifyMessageNumbers(new String[] {null, null, null, null}, false);
         mf.verifyAcknowledgements(new boolean[] {false, false, false, true}, false);
     }
+    
+    public void testOnewayDeferredNonAnonymousAcks() throws Exception {
+        if (!doTestOnewayDeferredNonAnonymousAcks) {
+            return;
+        }
+        setupGreeter("org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml");
+
+        greeter.greetMeOneWay("once");
+        greeter.greetMeOneWay("twice");
+
+        // CreateSequence plus two greetMeOneWay requests
+
+        MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+        
+        mf.verifyMessages(3, true);
+        String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), 
+                                                 GREETMEONEWAY_ACTION,
+                                                 GREETMEONEWAY_ACTION};
+        mf.verifyActions(expectedActions, true);
+        mf.verifyMessageNumbers(new String[] {null, "1", "2"}, true);
+
+        // CreateSequenceResponse plus two partial responses, no
+        // acknowledgments included
+        
+        Thread.sleep(2000);
+
+        mf.verifyMessages(4, false);
+        expectedActions = new String[] {null, RMConstants.getCreateSequenceResponseAction(), 
+                                        null, null};
+        mf.verifyActions(expectedActions, false);
+        mf.verifyMessageNumbers(new String[4], false);
+        mf.verifyAcknowledgements(new boolean[4], false);
+
+        // mf.clear();
+
+        try {
+            Thread.sleep(3 * 1000);
+        } catch (InterruptedException ex) {
+            // ignore
+        }
+
+        // a standalone acknowledgement should have been sent from the server
+        // side by now
+        
+        mf.reset(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+        mf.verifyMessages(0, true);
+        mf.verifyMessages(1, false);
+
+    }
+
 
     // --- test utilities ---
 

Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml?view=auto&rev=473319
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml (added)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml Fri Nov 10 04:01:06 2006
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:wsrm-mgmt="http://cxf.apache.org/ws/rm/manager"
+       xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+       xmlns:http-conf="http://cxf.apache.org/transports/http/configuration"
+       xsi:schemaLocation="
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+    
+    <bean name="{http://cxf.apache.org/greeter_control}GreeterPort.http-conduit" abstract="true">
+        <property name="client">
+            <value>
+                <http-conf:client DecoupledEndpoint="http://localhost:9999/decoupled_endpoint"/>
+            </value>
+        </property>
+    </bean>
+    
+    <bean id="org.apache.cxf.ws.rm.RMManager" class="org.apache.cxf.ws.rm.RMManager">
+        <property name="bus" ref="cxf"/>  
+        <property name="destinationPolicy">
+            <value>
+                <wsrm-mgmt:destinationPolicy>
+                    <wsrm-mgmt:acksPolicy intraMessageThreshold="0"/>                    
+                </wsrm-mgmt:destinationPolicy>
+            </value>
+        </property>
+        <property name="sourcePolicy">
+            <value>
+                <wsrm-mgmt:sourcePolicy acksTo="http://localhost:9999/decoupled_endpoint"/>
+            </value>
+        </property>
+        
+        <property name="RMAssertion">
+            <value>
+                <wsrm-policy:RMAssertion>         
+                    <wsrm-policy:BaseRetransmissionInterval Milliseconds="10000"/>           
+                    <wsrm-policy:AcknowledgementInterval Milliseconds="2000"/>                                                        
+                </wsrm-policy:RMAssertion>
+            </value>
+        </property>   
+    </bean>   
+
+    <bean id="mapAggregator" class="org.apache.cxf.ws.addressing.MAPAggregator"/>
+    <bean id="mapCodec" class="org.apache.cxf.ws.addressing.soap.MAPCodec"/>
+    <bean id="rmLogicalOut" class="org.apache.cxf.ws.rm.RMOutInterceptor">
+        <property name="bus" ref="cxf"/>
+    </bean>
+    <bean id="rmLogicalIn" class="org.apache.cxf.ws.rm.RMInInterceptor">
+        <property name="bus" ref="cxf"/>
+    </bean>
+    <bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+
+    <!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
+
+    <bean id="cxf" class="org.apache.cxf.bus.spring.SpringBusImpl">
+        <property name="inInterceptors">
+            <list>
+                <ref bean="mapAggregator"/>
+                <ref bean="mapCodec"/>
+                <ref bean="rmLogicalIn"/>
+                <ref bean="rmCodec"/>
+            </list>
+        </property>
+        <property name="inFaultInterceptors">
+            <list>
+                <ref bean="mapAggregator"/>
+                <ref bean="mapCodec"/>
+                <ref bean="rmLogicalIn"/>
+                <ref bean="rmCodec"/>
+            </list>
+        </property>
+        <property name="outInterceptors">
+            <list>
+                <ref bean="mapAggregator"/>
+                <ref bean="mapCodec"/>
+                <ref bean="rmLogicalOut"/>
+                <ref bean="rmCodec"/>
+            </list>
+        </property>
+        <property name="outFaultInterceptors">
+            <list>
+                <ref bean="mapAggregator"/>
+                <ref bean="mapCodec"/>
+                <ref bean="rmLogicalOut"/>
+                <ref bean="rmCodec"/>
+            </list>
+        </property>
+    </bean>
+</beans>
\ No newline at end of file

Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml