You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/03/30 22:50:23 UTC

svn commit: r1307604 [1/2] - in /cxf/trunk/rt/ws/rm/src: main/java/org/apache/cxf/ws/rm/ main/java/org/apache/cxf/ws/rm/persistence/ main/java/org/apache/cxf/ws/rm/persistence/jdbc/ main/java/org/apache/cxf/ws/rm/soap/ test/java/org/apache/cxf/ws/rm/ t...

Author: ay
Date: Fri Mar 30 20:50:22 2012
New Revision: 1307604

URL: http://svn.apache.org/viewvc?rev=1307604&view=rev
Log:
[CXF-4218] Change RMEndpoint and RMStore so that the sequences are recovered and monitored correctly

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ServantTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Fri Mar 30 20:50:22 2012
@@ -66,17 +66,13 @@ public class Destination extends Abstrac
     }
 
     public void addSequence(DestinationSequence seq, boolean persist) {
-        if (seq.getProtocol() == getReliableEndpoint().getProtocol()) {
-            seq.setDestination(this);
-            map.put(seq.getIdentifier().getValue(), seq);
-            if (persist) {
-                RMStore store = getReliableEndpoint().getManager().getStore();
-                if (null != store) {
-                    store.createDestinationSequence(seq);
-                }
+        seq.setDestination(this);
+        map.put(seq.getIdentifier().getValue(), seq);
+        if (persist) {
+            RMStore store = getReliableEndpoint().getManager().getStore();
+            if (null != store) {
+                store.createDestinationSequence(seq);
             }
-        } else {
-            LOG.log(Level.SEVERE, "Incompatible protocol version");
         }
     }
 
@@ -142,7 +138,8 @@ public class Destination extends Abstrac
                 }
             }
         } else {
-            RMConstants consts = getReliableEndpoint().getProtocol().getConstants();
+            ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
+            RMConstants consts = protocol.getConstants();
             SequenceFaultFactory sff = new SequenceFaultFactory(consts);
             throw sff.createUnknownSequenceFault(sequenceType.getIdentifier());
         }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Fri Mar 30 20:50:22 2012
@@ -54,20 +54,20 @@ public class ManagedRMEndpoint implement
 
     private static final String[] SOURCE_SEQUENCE_NAMES = 
     {"sequenceId", "currentMessageNumber", "expires", "lastMessage", "queuedMessageCount", 
-     "target"};
+     "target", "wsrm", "wsa"};
     private static final String[] SOURCE_SEQUENCE_DESCRIPTIONS = SOURCE_SEQUENCE_NAMES;
     @SuppressWarnings("rawtypes") // needed as OpenType isn't generic on Java5
     private static final OpenType[] SOURCE_SEQUENCE_TYPES =  
     {SimpleType.STRING, SimpleType.LONG, SimpleType.DATE, SimpleType.BOOLEAN, SimpleType.INTEGER, 
-     SimpleType.STRING};
+     SimpleType.STRING, SimpleType.STRING, SimpleType.STRING};
     
     private static final String[] DESTINATION_SEQUENCE_NAMES = 
-    {"sequenceId", "lastMessageNumber", "correlationId", "ackTo"};
+    {"sequenceId", "lastMessageNumber", "correlationId", "ackTo", "wsrm", "wsa"};
     private static final String[] DESTINATION_SEQUENCE_DESCRIPTIONS = DESTINATION_SEQUENCE_NAMES;
     @SuppressWarnings("rawtypes") // needed as OpenType isn't generic on Java5
     private static final OpenType[] DESTINATION_SEQUENCE_TYPES =  
     {SimpleType.STRING, SimpleType.LONG, SimpleType.STRING, 
-     SimpleType.STRING};
+     SimpleType.STRING, SimpleType.STRING, SimpleType.STRING};
 
     private static final String[] RETRY_STATUS_NAMES = 
     {"messageNumber", "retries", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
@@ -541,7 +541,9 @@ public class ManagedRMEndpoint implement
         Object[] ssv = new Object[]{ss.getIdentifier().getValue(), ss.getCurrentMessageNr(), 
                                     ss.getExpires(), ss.isLastMessage(),
                                     manager.getRetransmissionQueue().countUnacknowledged(ss),
-                                    getAddressValue(ss.getTarget())};
+                                    getAddressValue(ss.getTarget()),
+                                    ss.getProtocol().getWSRMNamespace(),
+                                    ss.getProtocol().getWSANamespace()};
         
         CompositeData ssps = new CompositeDataSupport(sourceSequenceType, 
                                                       SOURCE_SEQUENCE_NAMES, ssv);
@@ -554,7 +556,9 @@ public class ManagedRMEndpoint implement
         }
         Object[] dsv = new Object[]{ds.getIdentifier().getValue(), ds.getLastMessageNumber(), 
                                     ds.getCorrelationID(),
-                                    getAddressValue(ds.getAcksTo())};
+                                    getAddressValue(ds.getAcksTo()),
+                                    ds.getProtocol().getWSRMNamespace(),
+                                    ds.getProtocol().getWSANamespace()};
         
         CompositeData dsps = new CompositeDataSupport(destinationSequenceType, 
                                                       DESTINATION_SEQUENCE_NAMES, dsv);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java Fri Mar 30 20:50:22 2012
@@ -19,9 +19,7 @@
 
 package org.apache.cxf.ws.rm;
 
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import javax.management.JMException;
@@ -63,22 +61,12 @@ public class ManagedRMManager implements
     @ManagedOperation
     public String[] getEndpointIdentifiers() {
         Set<String> identifiers = new HashSet<String>();
-        //FIXME find this method for 2.5
-//        for (Endpoint ep : manager.getReliableEndpointsMap().keySet()) {
-        for (Endpoint ep : getReliableEndpointsMap().keySet()) {
+        for (Endpoint ep : manager.getReliableEndpointsMap().keySet()) {
             identifiers.add(RMUtils.getEndpointIdentifier(ep));
         }
         return identifiers.toArray(new String[identifiers.size()]);
     }
 
-    //TODO see the comment above
-    private Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
-        Map<Endpoint, RMEndpoint> epmap = new HashMap<Endpoint, RMEndpoint>();
-        for (ProtocolVariation pv : manager.getEndpointMaps().keySet()) {
-            epmap.putAll(manager.getEndpointMaps().get(pv));
-        }
-        return epmap;
-    }
     
     @ManagedAttribute(description = "Using Store")
     public boolean isUsingStore() {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Fri Mar 30 20:50:22 2012
@@ -73,45 +73,46 @@ public class Proxy {
     }
 
     void acknowledge(DestinationSequence ds) throws RMException {        
+        final ProtocolVariation protocol = ds.getProtocol(); 
         String address = ds.getAcksTo().getAddress().getValue();
         if (RMUtils.getAddressingConstants().getAnonymousURI().equals(address)) {
             LOG.log(Level.WARNING, "STANDALONE_ANON_ACKS_NOT_SUPPORTED");
             return;
         }
-        
-        RMConstants constants = reliableEndpoint.getProtocol().getConstants();
-        OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
-            .getOperation(constants.getSequenceAckOperationName());
-        invoke(oi, new Object[] {ds}, null);
+        RMConstants constants = protocol.getConstants();
+        OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
+            .getService().getInterface().getOperation(constants.getSequenceAckOperationName());
+        invoke(oi, protocol, new Object[] {ds}, null);
     }
     
     void terminate(SourceSequence ss) throws RMException {
-        RMConstants constants = reliableEndpoint.getProtocol().getConstants();
-        OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
-            .getOperation(constants.getTerminateSequenceOperationName());
+        final ProtocolVariation protocol = ss.getProtocol(); 
+        RMConstants constants = protocol.getConstants();
+        OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
+            .getService().getInterface().getOperation(constants.getTerminateSequenceOperationName());
         
         TerminateSequenceType ts = new TerminateSequenceType();
         ts.setIdentifier(ss.getIdentifier());
-        EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
-        invoke(oi, new Object[] {codec.convertToSend(ts)}, null);
+        EncoderDecoder codec = protocol.getCodec();
+        invoke(oi, protocol, new Object[] {codec.convertToSend(ts)}, null);
     }
     
-    void createSequenceResponse(final Object createResponse) throws RMException {
+    void createSequenceResponse(final Object createResponse, ProtocolVariation protocol) throws RMException {
         LOG.fine("sending CreateSequenceResponse from client side");
-        RMConstants constants = reliableEndpoint.getProtocol().getConstants();
-        final OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
-            .getOperation(constants.getCreateSequenceResponseOnewayOperationName());
+        RMConstants constants = protocol.getConstants();
+        final OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo().getService()
+            .getInterface().getOperation(constants.getCreateSequenceResponseOnewayOperationName());
         
         // TODO: need to set relatesTo
 
-        invoke(oi, new Object[] {createResponse}, null);
+        invoke(oi, protocol, new Object[] {createResponse}, null);
        
     }
 
     public CreateSequenceResponseType createSequence(
                         EndpointReferenceType defaultAcksTo,
                         RelatesToType relatesTo,
-                        boolean isServer) throws RMException {
+                        boolean isServer, final ProtocolVariation protocol) throws RMException {
         
         SourcePolicyType sp = reliableEndpoint.getManager().getSourcePolicy();
         CreateSequenceType create = new CreateSequenceType();        
@@ -145,9 +146,10 @@ public class Proxy {
             setOfferedIdentifier(offer);
         }
         
-        InterfaceInfo ii = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface();
+        InterfaceInfo ii = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
+            .getService().getInterface();
         
-        EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
+        EncoderDecoder codec = protocol.getCodec();
         RMConstants constants = codec.getConstants();
         final OperationInfo oi = isServer 
             ? ii.getOperation(constants.getCreateSequenceOnewayOperationName())
@@ -161,7 +163,7 @@ public class Proxy {
             Runnable r = new Runnable() {
                 public void run() {
                     try {
-                        invoke(oi, new Object[] {send}, null);
+                        invoke(oi, protocol, new Object[] {send}, null);
                     } catch (RMException ex) {
                         // already logged
                     }
@@ -172,11 +174,12 @@ public class Proxy {
         }
         
         
-        Object resp = invoke(oi, new Object[] {send}, null);
+        Object resp = invoke(oi, protocol, new Object[] {send}, null);
         return codec.convertReceivedCreateSequenceResponse(resp);
     }
     
     void lastMessage(SourceSequence s) throws RMException {
+        final ProtocolVariation protocol = s.getProtocol();
         EndpointReferenceType target = s.getTarget();
         AttributedURIType uri = null;
         if (null != target) {
@@ -196,19 +199,19 @@ public class Proxy {
             LOG.log(Level.WARNING, "STANDALONE_CLOSE_SEQUENCE_ANON_TARGET_MSG");
             return; 
         }
-        
-        RMConstants constants = reliableEndpoint.getProtocol().getConstants();
-        OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
-            .getOperation(constants.getCloseSequenceOperationName());
+        RMConstants constants = protocol.getConstants();
+        OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo().getService()
+            .getInterface().getOperation(constants.getCloseSequenceOperationName());
         // pass reference to source sequence in invocation context
         Map<String, Object> context = new HashMap<String, Object>(
                 Collections.singletonMap(SourceSequence.class.getName(), 
                                          (Object)s));
 
-        invoke(oi, new Object[] {}, context);
+        invoke(oi, protocol, new Object[] {}, context);
     }
     
     void ackRequested(SourceSequence s) throws RMException {
+        final ProtocolVariation protocol = s.getProtocol();
         EndpointReferenceType target = s.getTarget();
         AttributedURIType uri = null;
         if (null != target) {
@@ -229,10 +232,10 @@ public class Proxy {
             return; 
         }
         
-        RMConstants constants = reliableEndpoint.getProtocol().getConstants();
-        OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
-            .getOperation(constants.getAckRequestedOperationName());
-        invoke(oi, new Object[] {}, null);
+        RMConstants constants = protocol.getConstants();
+        OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo().getService()
+            .getInterface().getOperation(constants.getAckRequestedOperationName());
+        invoke(oi, protocol, new Object[] {}, null);
     }
         
     Identifier getOfferedIdentifier() {
@@ -245,7 +248,8 @@ public class Proxy {
         }
     }
        
-    Object invoke(OperationInfo oi, Object[] params, Map<String, Object> context) throws RMException {
+    Object invoke(OperationInfo oi, ProtocolVariation protocol, 
+                  Object[] params, Map<String, Object> context) throws RMException {
         
         if (LOG.isLoggable(Level.INFO)) {
             LOG.log(Level.INFO, "Sending out-of-band RM protocol message {0}.", 
@@ -254,8 +258,8 @@ public class Proxy {
         
         RMManager manager = reliableEndpoint.getManager();
         Bus bus = manager.getBus();
-        Endpoint endpoint = reliableEndpoint.getEndpoint();
-        BindingInfo bi = reliableEndpoint.getBindingInfo();
+        Endpoint endpoint = reliableEndpoint.getEndpoint(protocol);
+        BindingInfo bi = reliableEndpoint.getBindingInfo(protocol);
         Conduit c = reliableEndpoint.getConduit();
         Client client = null;
         if (params.length > 0 && params[0] instanceof DestinationSequence) {
@@ -265,11 +269,11 @@ public class Proxy {
             attrURIType.setValue(acksAddress);
             EndpointReferenceType acks =  new EndpointReferenceType();
             acks.setAddress(attrURIType);
-            client = createClient(bus, endpoint, c, acks);
+            client = createClient(bus, endpoint, protocol, c, acks);
             params = new Object[] {};
         } else {
             EndpointReferenceType replyTo = reliableEndpoint.getReplyTo();
-            client = createClient(bus, endpoint, c, replyTo);
+            client = createClient(bus, endpoint, protocol, c, replyTo);
         }
         
         BindingOperationInfo boi = bi.getOperation(oi);
@@ -289,8 +293,8 @@ public class Proxy {
         return null;
     }
     
-    protected Client createClient(Bus bus, Endpoint endpoint, Conduit conduit,
-                                  final EndpointReferenceType address) {
+    protected Client createClient(Bus bus, Endpoint endpoint, final ProtocolVariation protocol, 
+                                  Conduit conduit, final EndpointReferenceType address) {
         ConduitSelector cs = new DeferredConduitSelector(conduit) {
             @Override
             public synchronized Conduit selectConduit(Message message) {
@@ -311,7 +315,6 @@ public class Proxy {
         };  
         RMClient client = new RMClient(bus, endpoint, cs);
         Map<String, Object> context = client.getRequestContext();
-        ProtocolVariation protocol = reliableEndpoint.getProtocol();
         context.put(RMManager.WSRM_VERSION_PROPERTY, protocol.getWSRMNamespace());
         context.put(RMManager.WSRM_WSA_VERSION_PROPERTY, protocol.getWSANamespace());
         return client;

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Fri Mar 30 20:50:22 2012
@@ -138,4 +138,13 @@ public final class RMContextUtils {
         return outbound
             ? RMMessageConstants.RM_PROPERTIES_OUTBOUND : RMMessageConstants.RM_PROPERTIES_INBOUND;
     }
+
+    //TODO put this key to the constant
+    public static ProtocolVariation getProtocolVariation(Message message) {
+        return (ProtocolVariation)message.get(RMMessageConstants.RM_PROTOCOL_VARIATION);
+    }
+
+    public static void setProtocolVariation(Message message, ProtocolVariation protocol) {
+        message.put(RMMessageConstants.RM_PROTOCOL_VARIATION, protocol);
+    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Fri Mar 30 20:50:22 2012
@@ -20,7 +20,9 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -86,17 +88,13 @@ public class RMEndpoint {
     private RMManager manager;
     private Endpoint applicationEndpoint;
     private Conduit conduit;
-    private ProtocolVariation protocol;
     private EndpointReferenceType replyTo;
     private Source source;
     private Destination destination;
-    private WrappedService service;
-    private Endpoint endpoint;
+    private Map<ProtocolVariation, WrappedService> services;
+    private Map<ProtocolVariation, Endpoint> endpoints;
     private Proxy proxy;
     private Servant servant;
-    private QName serviceQName;
-    private QName bindingQName;
-    private QName interfaceQName;
     private long lastApplicationMessage;
     private long lastControlMessage;
     private InstrumentationManager instrumentationManager;
@@ -109,14 +107,15 @@ public class RMEndpoint {
      * @param ae
      * @param pv
      */
-    public RMEndpoint(RMManager m, Endpoint ae, ProtocolVariation pv) {
+    public RMEndpoint(RMManager m, Endpoint ae) {
         manager = m;
         applicationEndpoint = ae;
         source = new Source(this);
         destination = new Destination(this);
         proxy = new Proxy(this);
         servant = new Servant(this);
-        protocol = pv;
+        services = new HashMap<ProtocolVariation, WrappedService>();
+        endpoints = new HashMap<ProtocolVariation, Endpoint>();
     }
 
     /**
@@ -136,36 +135,23 @@ public class RMEndpoint {
     /**
      * @return Returns the RM protocol endpoint.
      */
-    public Endpoint getEndpoint() {
-        return endpoint;
-    }
-
-    public ProtocolVariation getProtocol() {
-        return protocol;
-    }
-
-    /**
-     * Set the protocol used by this endpoint. This method is only intended for use in testing; all normal use
-     * uses the constructor to set the value.
-     * 
-     * @param protocol
-     */
-    public void setProtocol(ProtocolVariation protocol) {
-        this.protocol = protocol;
+    public Endpoint getEndpoint(ProtocolVariation protocol) {
+        return endpoints.get(protocol);
     }
 
     /**
      * @return Returns the RM protocol service.
      */
-    public Service getService() {
-        return service;
+    public Service getService(ProtocolVariation protocol) {
+        return services.get(protocol);
     }
 
     /**
      * @return Returns the RM protocol binding info.
      */
-    public BindingInfo getBindingInfo() {
-        return service.getServiceInfo().getBinding(bindingQName);
+    public BindingInfo getBindingInfo(ProtocolVariation protocol) {
+        final QName bindingQName = new QName(protocol.getWSRMNamespace(), BINDING_NAME);
+        return services.get(protocol).getServiceInfo().getBinding(bindingQName);
     }
 
     /**
@@ -261,8 +247,8 @@ public class RMEndpoint {
                     org.apache.cxf.transport.Destination d) {
         conduit = c;
         replyTo = r;
-        createService();
-        createEndpoint(d);
+        createServices();
+        createEndpoints(d);
         setPolicies();
         if (manager != null && manager.getBus() != null) {
             managedEndpoint = new ManagedRMEndpoint(this);
@@ -277,15 +263,24 @@ public class RMEndpoint {
         }
     }
 
-    void createService() {
+    // internally we keep three services and three endpoints to support three protocol variations
+    // using the specifically generated jaxb classes and operation names etc but this could probably 
+    // be simplified/unified.
+    void createServices() {
+        for (ProtocolVariation protocol : ProtocolVariation.values()) {
+            createService(protocol);
+        }
+    }
+    
+    void createService(ProtocolVariation protocol) {
         ServiceInfo si = new ServiceInfo();
         si.setProperty(Schema.class.getName(), getSchema());
-        serviceQName = new QName(protocol.getWSRMNamespace(), SERVICE_NAME);
+        QName serviceQName = new QName(protocol.getWSRMNamespace(), SERVICE_NAME);
         si.setName(serviceQName);
         
-        buildInterfaceInfo(si);
+        buildInterfaceInfo(si, protocol);
 
-        service = new WrappedService(applicationEndpoint.getService(), serviceQName, si);
+        WrappedService service = new WrappedService(applicationEndpoint.getService(), serviceQName, si);
 
         DataBinding dataBinding = null;
         Class<?> create = protocol.getCodec().getCreateSequenceType();
@@ -298,6 +293,7 @@ public class RMEndpoint {
         }
         service.setDataBinding(dataBinding);
         service.setInvoker(servant);
+        services.put(protocol, service);
     }
 
     private static synchronized Schema getSchema() {
@@ -320,10 +316,18 @@ public class RMEndpoint {
         }
         return rmSchema;
     }
-
-    void createEndpoint(org.apache.cxf.transport.Destination d) {
+    
+    void createEndpoints(org.apache.cxf.transport.Destination d) {
+        for (ProtocolVariation protocol : ProtocolVariation.values()) {
+            createEndpoint(d, protocol);
+        }
+    }
+    
+    void createEndpoint(org.apache.cxf.transport.Destination d, ProtocolVariation protocol) {
+        final QName bindingQName = new QName(protocol.getWSRMNamespace(), BINDING_NAME);
+        WrappedService service = services.get(protocol);
         ServiceInfo si = service.getServiceInfo();
-        buildBindingInfo(si);
+        buildBindingInfo(si, protocol);
         EndpointInfo aei = applicationEndpoint.getEndpointInfo();
         String transportId = aei.getTransportId();
         EndpointInfo ei = new EndpointInfo(si, transportId);
@@ -346,8 +350,9 @@ public class RMEndpoint {
         }
         si.addEndpoint(ei);
 
-        endpoint = new WrappedEndpoint(applicationEndpoint, ei, service);
+        Endpoint endpoint = new WrappedEndpoint(applicationEndpoint, ei, service);
         service.setEndpoint(endpoint);
+        endpoints.put(protocol, endpoint);
     }
 
     void setPolicies() {
@@ -357,7 +362,7 @@ public class RMEndpoint {
             return;
         }
 
-        EndpointInfo ei = getEndpoint().getEndpointInfo();
+        EndpointInfo ei = getEndpoint(ProtocolVariation.RM10WSA200408).getEndpointInfo();
 
         PolicyInterceptorProviderRegistry reg = manager.getBus()
             .getExtension(PolicyInterceptorProviderRegistry.class);
@@ -388,23 +393,23 @@ public class RMEndpoint {
         // TODO: FaultPolicy (SequenceFault)
     }
 
-    void buildInterfaceInfo(ServiceInfo si) {
-        interfaceQName = new QName(protocol.getWSRMNamespace(), INTERFACE_NAME);
+    void buildInterfaceInfo(ServiceInfo si, ProtocolVariation protocol) {
+        QName interfaceQName = new QName(protocol.getWSRMNamespace(), INTERFACE_NAME);
         InterfaceInfo ii = new InterfaceInfo(si, interfaceQName);
-        buildOperationInfo(ii);
+        buildOperationInfo(ii, protocol);
     }
 
-    void buildOperationInfo(InterfaceInfo ii) {
-        buildCreateSequenceOperationInfo(ii);
-        buildTerminateSequenceOperationInfo(ii);
-        buildSequenceAckOperationInfo(ii);
-        buildCloseSequenceOperationInfo(ii);
-        buildAckRequestedOperationInfo(ii);
+    void buildOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
+        buildCreateSequenceOperationInfo(ii, protocol);
+        buildTerminateSequenceOperationInfo(ii, protocol);
+        buildSequenceAckOperationInfo(ii, protocol);
+        buildCloseSequenceOperationInfo(ii, protocol);
+        buildAckRequestedOperationInfo(ii, protocol);
 
         // TODO: FaultInfo (SequenceFault)
     }
 
-    void buildCreateSequenceOperationInfo(InterfaceInfo ii) {
+    void buildCreateSequenceOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
 
         OperationInfo operationInfo = null;
         MessagePartInfo partInfo = null;
@@ -447,7 +452,7 @@ public class RMEndpoint {
         partInfo.setTypeClass(protocol.getCodec().getCreateSequenceResponseType());
     }
 
-    void buildTerminateSequenceOperationInfo(InterfaceInfo ii) {
+    void buildTerminateSequenceOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
 
         OperationInfo operationInfo = null;
         MessagePartInfo partInfo = null;
@@ -464,7 +469,7 @@ public class RMEndpoint {
         partInfo.setTypeClass(protocol.getCodec().getTerminateSequenceType());
     }
 
-    void buildSequenceAckOperationInfo(InterfaceInfo ii) {
+    void buildSequenceAckOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
 
         OperationInfo operationInfo = null;
         MessageInfo messageInfo = null;
@@ -476,7 +481,7 @@ public class RMEndpoint {
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
     }
 
-    void buildCloseSequenceOperationInfo(InterfaceInfo ii) {
+    void buildCloseSequenceOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
 
         OperationInfo operationInfo = null;
         MessageInfo messageInfo = null;
@@ -488,7 +493,7 @@ public class RMEndpoint {
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
     }
 
-    void buildAckRequestedOperationInfo(InterfaceInfo ii) {
+    void buildAckRequestedOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
 
         OperationInfo operationInfo = null;
         MessageInfo messageInfo = null;
@@ -500,17 +505,17 @@ public class RMEndpoint {
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
     }
 
-    void buildBindingInfo(ServiceInfo si) {
+    void buildBindingInfo(ServiceInfo si, ProtocolVariation protocol) {
         // use same binding id as for application endpoint
         // also, to workaround the problem that it may not be possible to determine
         // the soap version depending on the bindingId, speciffy the soap version
         // explicitly
         if (null != applicationEndpoint) {
+            final QName bindingQName = new QName(protocol.getWSRMNamespace(), BINDING_NAME);
             SoapBindingInfo sbi = (SoapBindingInfo)applicationEndpoint.getEndpointInfo().getBinding();
             SoapVersion sv = sbi.getSoapVersion();
             String bindingId = sbi.getBindingId();
             SoapBindingInfo bi = new SoapBindingInfo(si, bindingId, sv);
-            bindingQName = new QName(protocol.getWSRMNamespace(), BINDING_NAME);
             bi.setName(bindingQName);
             BindingOperationInfo boi = null;
 

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Fri Mar 30 20:50:22 2012
@@ -84,6 +84,12 @@ public class RMInInterceptor extends Abs
             LOG.fine("Restoring original requestor role to: " + originalRequestor);
             message.put(Message.REQUESTOR_ROLE, originalRequestor);
         }
+
+        String rmUri = getManager().getRMNamespace(message);
+        String addrUri = getManager().getAddressingNamespace(message);
+
+        ProtocolVariation protocol = ProtocolVariation.findVariant(rmUri, addrUri);
+        RMContextUtils.setProtocolVariation(message, protocol);
         
         // Destination destination = getManager().getDestination(message);
         // RMEndpoint rme = getManager().getReliableEndpoint(message);
@@ -101,7 +107,7 @@ public class RMInInterceptor extends Abs
         
         if (isApplicationMessage) {                        
             if (null != rmps) {
-                processAcknowledgments(rme, rmps);
+                processAcknowledgments(rme, rmps, protocol);
                 processAcknowledgmentRequests(destination, message);
                 processSequence(destination, message);
                 processDeliveryAssurance(rmps);
@@ -116,7 +122,7 @@ public class RMInInterceptor extends Abs
             rme.receivedControlMessage();
             if (RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)
                 || RM11Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)) {
-                processAcknowledgments(rme, rmps);
+                processAcknowledgments(rme, rmps, protocol);
             } else if (RM10Constants.CLOSE_SEQUENCE_ACTION.equals(action)
                 || RM11Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)) {
                 processSequence(destination, message);
@@ -126,7 +132,7 @@ public class RMInInterceptor extends Abs
                 Servant servant = rme.getServant();
                 Object csr = servant.createSequence(message);
                 Proxy proxy = rme.getProxy();
-                proxy.createSequenceResponse(csr);
+                proxy.createSequenceResponse(csr, protocol);
                 return;
             }
         }
@@ -134,7 +140,8 @@ public class RMInInterceptor extends Abs
         assertReliability(message);
     }
     
-    void processAcknowledgments(RMEndpoint rme, RMProperties rmps) throws SequenceFault, RMException {
+    void processAcknowledgments(RMEndpoint rme, RMProperties rmps, ProtocolVariation protocol) 
+        throws SequenceFault, RMException {
         
         Collection<SequenceAcknowledgement> acks = rmps.getAcks();
         Source source = rme.getSource();
@@ -145,7 +152,7 @@ public class RMInInterceptor extends Abs
                 if (null != ss) {
                     ss.setAcknowledged(ack);
                 } else {
-                    RMConstants consts = rme.getProtocol().getConstants();
+                    RMConstants consts = protocol.getConstants();
                     SequenceFaultFactory sff = new SequenceFaultFactory(consts);
                     throw sff.createUnknownSequenceFault(id);
                 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Fri Mar 30 20:50:22 2012
@@ -103,7 +103,7 @@ public class RMManager {
     private RMStore store;
     private SequenceIdentifierGenerator idGenerator;
     private RetransmissionQueue retransmissionQueue;
-    private Map<ProtocolVariation, Map<Endpoint, RMEndpoint>> endpointMaps;
+    private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
     private AtomicReference<Timer> timer = new AtomicReference<Timer>();
     private RMAssertion rmAssertion;
     private DeliveryAssuranceType deliveryAssurance;
@@ -114,16 +114,10 @@ public class RMManager {
     private String rmNamespace = RM10Constants.NAMESPACE_URI;
     private RM10AddressingNamespaceType rm10AddressingNamespace;
     
-    public RMManager() {
-        setEndpointMaps(new HashMap<ProtocolVariation, Map<Endpoint, RMEndpoint>>());
-    }
-    
     // ServerLifeCycleListener
     
     public void startServer(Server server) {
-        for (ProtocolVariation protocol : ProtocolVariation.values()) {
-            recoverReliableEndpoint(server.getEndpoint(), (Conduit)null, protocol);
-        }
+        recoverReliableEndpoint(server.getEndpoint(), (Conduit)null);
     }
 
     public void stopServer(Server server) {
@@ -137,13 +131,12 @@ public class RMManager {
             return;
         }        
         String id = RMUtils.getEndpointIdentifier(client.getEndpoint());
-        ProtocolVariation protocol = getConfiguredProtocol();
-        Collection<SourceSequence> sss = store.getSourceSequences(id, protocol);
+        Collection<SourceSequence> sss = store.getSourceSequences(id);
         if (null == sss || 0 == sss.size()) {                        
             return;
         }
         LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
-        recoverReliableEndpoint(client.getEndpoint(), client.getConduit(), protocol);
+        recoverReliableEndpoint(client.getEndpoint(), client.getConduit());
     }
 
     private ProtocolVariation getConfiguredProtocol() {
@@ -318,16 +311,15 @@ public class RMManager {
         String rmUri = getRMNamespace(message);
         String addrUri = getAddressingNamespace(message);
         ProtocolVariation protocol = ProtocolVariation.findVariant(rmUri, addrUri);
-        Map<Endpoint, RMEndpoint> endpointMap = endpointMaps.get(protocol);
-        if (endpointMap == null) {
+        if (protocol == null) {
             org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
                 "UNSUPPORTED_NAMESPACE", LOG, addrUri, rmUri);
             LOG.log(Level.INFO, msg.toString());
             throw new RMException(msg);
         }
-        RMEndpoint rme = endpointMap.get(endpoint);
+        RMEndpoint rme = reliableEndpoints.get(endpoint);
         if (null == rme) {
-            rme = createReliableEndpoint(endpoint, protocol);
+            rme = createReliableEndpoint(endpoint);
             org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
             EndpointReferenceType replyTo = null;
             if (null != destination) {
@@ -340,7 +332,7 @@ public class RMManager {
                     .getProperty(MAPAggregator.DECOUPLED_DESTINATION, 
                              org.apache.cxf.transport.Destination.class);
             rme.initialise(message.getExchange().getConduit(message), replyTo, dest);
-            endpointMap.put(endpoint, rme);
+            reliableEndpoints.put(endpoint, rme);
             LOG.fine("Created new RMEndpoint.");
         }
         return rme;
@@ -409,6 +401,7 @@ public class RMManager {
 
         Source source = getSource(message);
         SourceSequence seq = source.getCurrent(inSeqId);
+        ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
         if (null == seq || seq.isExpired()) {
             // TODO: better error handling
             EndpointReferenceType to = null;
@@ -452,10 +445,11 @@ public class RMManager {
                 throw new RMException(msg);
             }
             Proxy proxy = source.getReliableEndpoint().getProxy();
-            CreateSequenceResponseType createResponse = proxy.createSequence(acksTo, relatesTo, isServer);
+            CreateSequenceResponseType createResponse = 
+                proxy.createSequence(acksTo, relatesTo, isServer, protocol);
             if (!isServer) {
                 Servant servant = source.getReliableEndpoint().getServant();
-                servant.createSequenceResponse(createResponse);
+                servant.createSequenceResponse(createResponse, protocol);
             }
 
             seq = source.awaitCurrent(inSeqId);
@@ -468,15 +462,12 @@ public class RMManager {
     @PreDestroy
     public void shutdown() {
         // shutdown remaining endpoints 
-        for (ProtocolVariation protocol : ProtocolVariation.values()) {
-            Map<Endpoint, RMEndpoint> map = endpointMaps.get(protocol);
-            if (map.size() > 0) {
-                LOG.log(Level.FINE,
-                    "Shutting down RMManager with {0} remaining endpoints for protocol variation {1}.",
-                    new Object[] {new Integer(map.size()), protocol });
-                for (RMEndpoint rme : map.values()) {            
-                    rme.shutdown();
-                }
+        if (reliableEndpoints.size() > 0) {
+            LOG.log(Level.FINE,
+                    "Shutting down RMManager with {0} remaining endpoints.",
+                    new Object[] {new Integer(reliableEndpoints.size())});
+            for (RMEndpoint rme : reliableEndpoints.values()) {            
+                rme.shutdown();
             }
         }
 
@@ -493,13 +484,8 @@ public class RMManager {
     
     synchronized void shutdownReliableEndpoint(Endpoint e) {
         RMEndpoint rme = null;
-        for (ProtocolVariation protocol : ProtocolVariation.values()) {
-            Map<Endpoint, RMEndpoint> map = endpointMaps.get(protocol);
-            rme = map.get(e);
-            if (rme != null) {
-                break;
-            }
-        }
+
+        rme = reliableEndpoints.get(e);
         if (rme == null) {
             // not found
             return;
@@ -513,20 +499,18 @@ public class RMManager {
             t.purge();
         }
         
-        for (ProtocolVariation protocol : ProtocolVariation.values()) {
-            endpointMaps.get(protocol).remove(e);
-        }
+        reliableEndpoints.remove(e);
     }
     
-    void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit, ProtocolVariation protocol) {
+    void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit) {
         if (null == store || null == retransmissionQueue) {
             return;
         }        
         
         String id = RMUtils.getEndpointIdentifier(endpoint);
         
-        Collection<SourceSequence> sss = store.getSourceSequences(id, protocol);
-        Collection<DestinationSequence> dss = store.getDestinationSequences(id, protocol);
+        Collection<SourceSequence> sss = store.getSourceSequences(id);
+        Collection<DestinationSequence> dss = store.getDestinationSequences(id);
         if ((null == sss || 0 == sss.size()) && (null == dss || 0 == dss.size())) {                        
             return;
         }
@@ -535,9 +519,9 @@ public class RMManager {
         
         LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
                 new Object[] {null == conduit ? "client" : "server", id});
-        RMEndpoint rme = createReliableEndpoint(endpoint, protocol);
+        RMEndpoint rme = createReliableEndpoint(endpoint);
         rme.initialise(conduit, null, null);
-        endpointMaps.get(protocol).put(endpoint, rme);
+        reliableEndpoints.put(endpoint, rme);
         SourceSequence css = null;
         for (SourceSequence ss : sss) {            
  
@@ -591,7 +575,8 @@ public class RMManager {
                 }
                                     
                 message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
-                          
+                RMContextUtils.setProtocolVariation(message, ss.getProtocol());
+                
                 retransmissionQueue.addUnacknowledged(message);
             }            
         }
@@ -603,8 +588,8 @@ public class RMManager {
         
     }
     
-    RMEndpoint createReliableEndpoint(Endpoint endpoint, ProtocolVariation protocol) {
-        return new RMEndpoint(this, endpoint, protocol);
+    RMEndpoint createReliableEndpoint(Endpoint endpoint) {
+        return new RMEndpoint(this, endpoint);
     }  
     
     public void init(Bus b) {
@@ -688,6 +673,14 @@ public class RMManager {
         }
     }
 
+    Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
+        return reliableEndpoints;
+    }
+    
+    void setReliableEndpointsMap(Map<Endpoint, RMEndpoint> map) {
+        reliableEndpoints = map;
+    }
+
     class DefaultSequenceIdentifierGenerator implements SequenceIdentifierGenerator {
 
         public Identifier generateSequenceIdentifier() {
@@ -697,15 +690,4 @@ public class RMManager {
             return sid;
         }
     }
-
-    Map<ProtocolVariation, Map<Endpoint, RMEndpoint>> getEndpointMaps() {
-        return endpointMaps;
-    }
-
-    final void setEndpointMaps(Map<ProtocolVariation, Map<Endpoint, RMEndpoint>> endpointMaps) {
-        endpointMaps.put(ProtocolVariation.RM10WSA200408, new HashMap<Endpoint, RMEndpoint>());
-        endpointMaps.put(ProtocolVariation.RM10WSA200508, new HashMap<Endpoint, RMEndpoint>());
-        endpointMaps.put(ProtocolVariation.RM11WSA200508, new HashMap<Endpoint, RMEndpoint>());
-        this.endpointMaps = endpointMaps;
-    }
 }
\ No newline at end of file

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Fri Mar 30 20:50:22 2012
@@ -42,6 +42,9 @@ public final class RMMessageConstants {
     public static final String SAVED_CONTENT =
         "org.apache.cxf.ws.rm.content";
     
+    static final String RM_PROTOCOL_VARIATION = 
+        "org.apache.cxf.ws.rm.protocol";
+
     // keep this constant in the ws-rm package until it finds a general use outside of ws-rm
     static final String DELIVERING_ROBUST_ONEWAY = 
         "org.apache.cxf.oneway.robust.delivering";

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Fri Mar 30 20:50:22 2012
@@ -62,7 +62,11 @@ public class RMOutInterceptor extends Ab
         }
         
         Source source = getManager().getSource(msg);
-        ProtocolVariation protocol = source.getReliableEndpoint().getProtocol();
+        String rmUri = getManager().getRMNamespace(msg);
+        String addrUri = getManager().getAddressingNamespace(msg);
+
+        ProtocolVariation protocol = ProtocolVariation.findVariant(rmUri, addrUri);
+        RMContextUtils.setProtocolVariation(msg, protocol);
         maps.exposeAs(protocol.getWSANamespace());
         Destination destination = getManager().getDestination(msg);
 

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Fri Mar 30 20:50:22 2012
@@ -60,6 +60,7 @@ public class Servant implements Invoker 
     
     public Object invoke(Exchange exchange, Object o) {
         LOG.fine("Invoking on RM Endpoint");
+        final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(exchange.getInMessage());
         OperationInfo oi = exchange.get(OperationInfo.class);
         if (null == oi) {
             LOG.fine("No operation info."); 
@@ -77,10 +78,10 @@ public class Servant implements Invoker 
             }
         } else if (RM10Constants.INSTANCE.getCreateSequenceResponseOnewayOperationName().equals(oi.getName())
             || RM11Constants.INSTANCE.getCreateSequenceResponseOnewayOperationName().equals(oi.getName())) {
-            EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
+            EncoderDecoder codec = protocol.getCodec();
             CreateSequenceResponseType createResponse = 
                 codec.convertReceivedCreateSequenceResponse(getParameter(exchange.getInMessage()));
-            createSequenceResponse(createResponse);
+            createSequenceResponse(createResponse, protocol);
         } else if (RM10Constants.INSTANCE.getTerminateSequenceOperationName().equals(oi.getName())
             || RM11Constants.INSTANCE.getTerminateSequenceOperationName().equals(oi.getName())) {
             terminateSequence(exchange.getInMessage());
@@ -92,6 +93,7 @@ public class Servant implements Invoker 
 
     Object createSequence(Message message) {
         LOG.fine("Creating sequence");
+        final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
         
         AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);        
         Message outMessage = message.getExchange().getOutMessage();  
@@ -99,7 +101,7 @@ public class Servant implements Invoker 
             RMContextUtils.storeMAPs(maps, outMessage, false, false);
         }
         
-        EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
+        EncoderDecoder codec = protocol.getCodec();
         CreateSequenceType create = codec.convertReceivedCreateSequence(getParameter(message));
         Destination destination = reliableEndpoint.getDestination();
         
@@ -135,7 +137,7 @@ public class Servant implements Invoker 
                 // AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
                 accept.setAcksTo(RMUtils.createReference(maps.getTo().getValue()));
                 SourceSequence seq = new SourceSequence(offer.getIdentifier(), null,
-                    createResponse.getIdentifier(), reliableEndpoint.getProtocol());
+                    createResponse.getIdentifier(), protocol);
                 seq.setExpires(offer.getExpires());
                 seq.setTarget(create.getAcksTo());
                 source.addSequence(seq);
@@ -154,18 +156,19 @@ public class Servant implements Invoker 
         }
         
         DestinationSequence seq = new DestinationSequence(createResponse.getIdentifier(),
-            create.getAcksTo(), destination, reliableEndpoint.getProtocol());
+            create.getAcksTo(), destination, protocol);
         seq.setCorrelationID(maps.getMessageID().getValue());
         destination.addSequence(seq);
         LOG.fine("returning " + createResponse);
         return codec.convertToSend(createResponse);
     }
 
-    public void createSequenceResponse(CreateSequenceResponseType createResponse) {
+    public void createSequenceResponse(CreateSequenceResponseType createResponse, 
+                                       ProtocolVariation protocol) {
         LOG.fine("Creating sequence response");
         
         SourceSequence seq = new SourceSequence(createResponse.getIdentifier(),
-            reliableEndpoint.getProtocol());
+            protocol);
         seq.setExpires(createResponse.getExpires());
         Source source  = reliableEndpoint.getSource();
         source.addSequence(seq);
@@ -187,7 +190,7 @@ public class Servant implements Invoker 
             String address = accept.getAcksTo().getAddress().getValue();
             if (!RMUtils.getAddressingConstants().getNoneURI().equals(address)) {
                 DestinationSequence ds =  new DestinationSequence(offeredId, accept.getAcksTo(), dest,
-                    reliableEndpoint.getProtocol());
+                    protocol);
                 dest.addSequence(ds);
             }
         }
@@ -195,8 +198,9 @@ public class Servant implements Invoker 
 
     public void terminateSequence(Message message) {
         LOG.fine("Terminating sequence");
+        final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
         
-        EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
+        EncoderDecoder codec = protocol.getCodec();
         TerminateSequenceType terminate = codec.convertReceivedTerminateSequence(getParameter(message));
         
         // check if the terminated sequence was created in response to a a createSequence

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java Fri Mar 30 20:50:22 2012
@@ -26,18 +26,13 @@ import java.util.Map;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
-import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 
 public class Source extends AbstractEndpoint {
     
-    private static final Logger LOG = LogUtils.getL7dLogger(Source.class);
-
     private static final String REQUESTOR_SEQUENCE_ID = "";
     
     private Map<String, SourceSequence> map;
@@ -68,17 +63,13 @@ public class Source extends AbstractEndp
     }
     
     public void addSequence(SourceSequence seq, boolean persist) {
-        if (seq.getProtocol() == getReliableEndpoint().getProtocol()) {
-            seq.setSource(this);
-            map.put(seq.getIdentifier().getValue(), seq);
-            if (persist) {
-                RMStore store = getReliableEndpoint().getManager().getStore();
-                if (null != store) {
-                    store.createSourceSequence(seq);
-                }
+        seq.setSource(this);
+        map.put(seq.getIdentifier().getValue(), seq);
+        if (persist) {
+            RMStore store = getReliableEndpoint().getManager().getStore();
+            if (null != store) {
+                store.createSourceSequence(seq);
             }
-        } else {
-            LOG.log(Level.SEVERE, "Incompatible protocol version");
         }
     }
     

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java Fri Mar 30 20:50:22 2012
@@ -22,7 +22,6 @@ package org.apache.cxf.ws.rm.persistence
 import java.util.Collection;
 
 import org.apache.cxf.ws.rm.DestinationSequence;
-import org.apache.cxf.ws.rm.ProtocolVariation;
 import org.apache.cxf.ws.rm.SourceSequence;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 
@@ -48,14 +47,14 @@ public interface RMStore {
      * @param seq the sequence
      * @return the sequence if present; otherwise null
      */
-    SourceSequence getSourceSequence(Identifier seq, ProtocolVariation protocol);
+    SourceSequence getSourceSequence(Identifier seq);
     
     /**
      * Retrieve the destination sequence with the specified identifier from persistent store. 
      * @param seq the sequence
      * @return the sequence if present; otherwise null
      */
-    DestinationSequence getDestinationSequence(Identifier seq, ProtocolVariation protocol);
+    DestinationSequence getDestinationSequence(Identifier seq);
 
     /**
      * Remove the source sequence with the specified identifier from persistent store. 
@@ -76,8 +75,7 @@ public interface RMStore {
      * @param endpointIdentifier the identifier for the source
      * @return the collection of sequences
      */    
-    Collection<SourceSequence> getSourceSequences(String endpointIdentifier,
-        ProtocolVariation protocol);
+    Collection<SourceSequence> getSourceSequences(String endpointIdentifier);
     
     /**
      * Retrieves all sequences managed by the identified RM destination endpoint 
@@ -86,8 +84,7 @@ public interface RMStore {
      * @param endpointIdentifier the identifier for the destination
      * @return the collection of sequences
      */    
-    Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier,
-        ProtocolVariation protocol);
+    Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier);
     
     /**
      * Retrieves the outbound/inbound messages stored for the source/destination sequence with 

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Fri Mar 30 20:50:22 2012
@@ -66,6 +66,7 @@ public class RMTxStore implements RMStor
         + "LAST_MSG_NO DECIMAL(19, 0), "
         + "ENDPOINT_ID VARCHAR(1024), "
         + "ACKNOWLEDGED BLOB, "
+        + "PROTOCOL_VERSION VARCHAR(256), "
         + "PRIMARY KEY (SEQ_ID))";
     private static final String CREATE_SRC_SEQUENCES_TABLE_STMT =
         "CREATE TABLE CXF_RM_SRC_SEQUENCES " 
@@ -74,7 +75,8 @@ public class RMTxStore implements RMStor
         + "LAST_MSG CHAR(1), "
         + "EXPIRY DECIMAL(19, 0), "
         + "OFFERING_SEQ_ID VARCHAR(256), "
-        + "ENDPOINT_ID VARCHAR(1024), "            
+        + "ENDPOINT_ID VARCHAR(1024), "
+        + "PROTOCOL_VERSION VARCHAR(256), "
         + "PRIMARY KEY (SEQ_ID))";
     private static final String CREATE_MESSAGES_TABLE_STMT =
         "CREATE TABLE {0} " 
@@ -87,9 +89,10 @@ public class RMTxStore implements RMStor
     private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";    
     
     private static final String CREATE_DEST_SEQUENCE_STMT_STR 
-        = "INSERT INTO CXF_RM_DEST_SEQUENCES (SEQ_ID, ACKS_TO, ENDPOINT_ID) VALUES(?, ?, ?)";
+        = "INSERT INTO CXF_RM_DEST_SEQUENCES (SEQ_ID, ACKS_TO, ENDPOINT_ID, PROTOCOL_VERSION) " 
+            + "VALUES(?, ?, ?, ?)";
     private static final String CREATE_SRC_SEQUENCE_STMT_STR
-        = "INSERT INTO CXF_RM_SRC_SEQUENCES VALUES(?, 1, '0', ?, ?, ?)";
+        = "INSERT INTO CXF_RM_SRC_SEQUENCES VALUES(?, 1, '0', ?, ?, ?, ?)";
     private static final String DELETE_DEST_SEQUENCE_STMT_STR =
         "DELETE FROM CXF_RM_DEST_SEQUENCES WHERE SEQ_ID = ?";
     private static final String DELETE_SRC_SEQUENCE_STMT_STR =
@@ -103,17 +106,17 @@ public class RMTxStore implements RMStor
     private static final String DELETE_MESSAGE_STMT_STR =
         "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
     private static final String SELECT_DEST_SEQUENCE_STMT_STR =
-        "SELECT ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+        "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
         + "WHERE SEQ_ID = ?";
     private static final String SELECT_SRC_SEQUENCE_STMT_STR =
-        "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CXF_RM_SRC_SEQUENCES "
+        "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION FROM CXF_RM_SRC_SEQUENCES "
         + "WHERE SEQ_ID = ?";
     private static final String SELECT_DEST_SEQUENCES_STMT_STR =
-        "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+        "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
         + "WHERE ENDPOINT_ID = ?";
     private static final String SELECT_SRC_SEQUENCES_STMT_STR =
-        "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CXF_RM_SRC_SEQUENCES "
-        + "WHERE ENDPOINT_ID = ?";
+        "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION "
+        + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
     private static final String SELECT_MESSAGES_STMT_STR =
         "SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?";
     
@@ -209,6 +212,7 @@ public class RMTxStore implements RMStor
     public void createDestinationSequence(DestinationSequence seq) {
         String sequenceIdentifier = seq.getIdentifier().getValue();
         String endpointIdentifier = seq.getEndpointIdentifier();
+        String protocolVersion = encodeProtocolVersion(seq.getProtocol());
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Creating destination sequence: " + sequenceIdentifier + ", (endpoint: "
                  + endpointIdentifier + ")");
@@ -223,7 +227,7 @@ public class RMTxStore implements RMStor
             String addr = seq.getAcksTo().getAddress().getValue();
             createDestSequenceStmt.setString(2, addr);
             createDestSequenceStmt.setString(3, endpointIdentifier);
-            
+            createDestSequenceStmt.setString(4, protocolVersion);
             createDestSequenceStmt.execute();
             
             commit();
@@ -237,6 +241,7 @@ public class RMTxStore implements RMStor
     public void createSourceSequence(SourceSequence seq) {
         String sequenceIdentifier = seq.getIdentifier().getValue();
         String endpointIdentifier = seq.getEndpointIdentifier();
+        String protocolVersion = encodeProtocolVersion(seq.getProtocol());
         if (LOG.isLoggable(Level.FINE)) {
             LOG.fine("Creating source sequence: " + sequenceIdentifier + ", (endpoint: "
                      + endpointIdentifier + ")"); 
@@ -255,6 +260,7 @@ public class RMTxStore implements RMStor
             Identifier osid = seq.getOfferingSequenceIdentifier();
             createSrcSequenceStmt.setString(3, osid == null ? null : osid.getValue());
             createSrcSequenceStmt.setString(4, endpointIdentifier);
+            createSrcSequenceStmt.setString(5, protocolVersion);
             createSrcSequenceStmt.execute();    
             
             commit();
@@ -264,8 +270,8 @@ public class RMTxStore implements RMStor
             throw new RMStoreException(ex);
         }
     }
-    
-    public DestinationSequence getDestinationSequence(Identifier sid, ProtocolVariation protocol) {
+
+    public DestinationSequence getDestinationSequence(Identifier sid) {
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Getting destination sequence for id: " + sid);
         }
@@ -275,26 +281,26 @@ public class RMTxStore implements RMStor
                     connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);               
             }
             selectDestSequenceStmt.setString(1, sid.getValue());
-            
             ResultSet res = selectDestSequenceStmt.executeQuery(); 
             if (res.next()) {
                 EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));  
                 long lm = res.getLong(2);
-                InputStream is = res.getBinaryStream(3); 
+                ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
+                InputStream is = res.getBinaryStream(4);
                 SequenceAcknowledgement ack = null;
                 if (null != is) {
                     ack = PersistenceUtils.getInstance()
                         .deserialiseAcknowledgment(is); 
                 }
-                return new DestinationSequence(sid, acksTo, lm, ack, protocol);
+                return new DestinationSequence(sid, acksTo, lm, ack, pv);
             }
         } catch (SQLException ex) {
             LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(), ex);
         }
         return null;
     }
-    
-    public SourceSequence getSourceSequence(Identifier sid, ProtocolVariation protocol) {
+
+    public SourceSequence getSourceSequence(Identifier sid) {
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Getting source sequences for id: " + sid);
         }
@@ -316,8 +322,9 @@ public class RMTxStore implements RMStor
                 if (null != oidValue) {
                     oi = RMUtils.getWSRMFactory().createIdentifier();
                     oi.setValue(oidValue);
-                }                            
-                return new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
+                }
+                ProtocolVariation pv = decodeProtocolVersion(res.getString(5));
+                return new SourceSequence(sid, expiry, oi, cmn, lm, pv);
                           
             }
         } catch (SQLException ex) {
@@ -364,8 +371,7 @@ public class RMTxStore implements RMStor
         }        
     }
     
-    public Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier,
-        ProtocolVariation protocol) {
+    public Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier) {
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Getting destination sequences for endpoint: " + endpointIdentifier);
         }
@@ -383,13 +389,14 @@ public class RMTxStore implements RMStor
                 sid.setValue(res.getString(1));
                 EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));  
                 long lm = res.getLong(3);
-                InputStream is = res.getBinaryStream(4); 
+                ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
+                InputStream is = res.getBinaryStream(5);
                 SequenceAcknowledgement ack = null;
                 if (null != is) {
                     ack = PersistenceUtils.getInstance()
                         .deserialiseAcknowledgment(is); 
                 }
-                DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, protocol);
+                DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, pv);
                 seqs.add(seq);                                                 
             }
         } catch (SQLException ex) {
@@ -398,8 +405,7 @@ public class RMTxStore implements RMStor
         return seqs;
     }
     
-    public Collection<SourceSequence> getSourceSequences(String endpointIdentifier,
-        ProtocolVariation protocol) {
+    public Collection<SourceSequence> getSourceSequences(String endpointIdentifier) {
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Getting source sequences for endpoint: " + endpointIdentifier);
         }
@@ -424,8 +430,9 @@ public class RMTxStore implements RMStor
                 if (null != oidValue) {
                     oi = new Identifier();
                     oi.setValue(oidValue);
-                }                            
-                SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
+                }
+                ProtocolVariation pv = decodeProtocolVersion(res.getString(6));
+                SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, pv);
                 seqs.add(seq);                          
             }
         } catch (SQLException ex) {
@@ -747,6 +754,21 @@ public class RMTxStore implements RMStor
     
     }
     
+    protected static String encodeProtocolVersion(ProtocolVariation pv) {
+        return pv.getCodec().getWSRMNamespace() + ' ' + pv.getCodec().getWSANamespace(); 
+    }
+
+    protected static ProtocolVariation decodeProtocolVersion(String pv) {
+        if (null != pv) {
+            int d = pv.indexOf(' ');
+            if (d > 0) {
+                return ProtocolVariation.findVariant(pv.substring(0, d), pv.substring(d + 1));
+            }
+        }
+        return ProtocolVariation.RM10WSA200408;
+    }
+    
+    
     private static void recursiveDelete(File dir, boolean now) {
         for (File f : dir.listFiles()) {
             if (f.isDirectory()) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java Fri Mar 30 20:50:22 2012
@@ -357,6 +357,8 @@ public class RMSoapInterceptor extends A
         }
         RMProperties rmps = RMContextUtils.retrieveRMProperties(message, false);
         rmps.exposeAs(consts.getWSRMNamespace());
+        ProtocolVariation protocol = 
+            ProtocolVariation.findVariant(consts.getWSRMNamespace(), maps.getNamespaceURI());
         
         LOG.info("Updating service model info in exchange");
         
@@ -372,16 +374,16 @@ public class RMSoapInterceptor extends A
         }
   
         Exchange exchange = message.getExchange();
-        
-        exchange.put(Endpoint.class, rme.getEndpoint());
-        exchange.put(Service.class, rme.getService());
-        exchange.put(Binding.class, rme.getEndpoint().getBinding());
+        Endpoint ep = rme.getEndpoint(protocol);
+        exchange.put(Endpoint.class, ep);
+        exchange.put(Service.class, ep.getService());
+        exchange.put(Binding.class, ep.getBinding());
         
         // Also set BindingOperationInfo as some operations (SequenceAcknowledgment) have
         // neither in nor out messages, and thus the WrappedInInterceptor cannot
         // determine the operation name.
         
-        BindingInfo bi = rme.getEndpoint().getEndpointInfo().getBinding();
+        BindingInfo bi = ep.getEndpointInfo().getBinding();
         BindingOperationInfo boi = null;
         boolean isOneway = true;
         if (consts.getCreateSequenceAction().equals(action)) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Fri Mar 30 20:50:22 2012
@@ -54,6 +54,7 @@ import org.apache.cxf.ws.addressing.Attr
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.policy.AssertionInfo;
 import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
+import org.apache.cxf.ws.rm.ProtocolVariation;
 import org.apache.cxf.ws.rm.RMContextUtils;
 import org.apache.cxf.ws.rm.RMException;
 import org.apache.cxf.ws.rm.RMManager;
@@ -372,8 +373,8 @@ public class RetransmissionQueueImpl imp
         
         final String address = to.getValue();
         LOG.fine("Resending to address: " + address);
-        
-        final Endpoint reliableEndpoint = manager.getReliableEndpoint(message).getEndpoint();
+        final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
+        final Endpoint reliableEndpoint = manager.getReliableEndpoint(message).getEndpoint(protocol);
 
         ConduitSelector cs = new DeferredConduitSelector() {
             @Override

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Fri Mar 30 20:50:22 2012
@@ -110,11 +110,12 @@ public class DestinationTest extends Ass
         Message message = setupMessage();
         RMProperties rmps = control.createMock(RMProperties.class);
         EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps);
+        EasyMock.expect(RMContextUtils.getProtocolVariation(message))
+            .andReturn(ProtocolVariation.RM10WSA200408);
         SequenceType st = control.createMock(SequenceType.class);
         EasyMock.expect(rmps.getSequence()).andReturn(st);
         Identifier id = control.createMock(Identifier.class);
         EasyMock.expect(st.getIdentifier()).andReturn(id).times(2);
-        EasyMock.expect(rme.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408).anyTimes();
         String sid = "sid";
         EasyMock.expect(id.getValue()).andReturn(sid); 
         control.replay();

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Fri Mar 30 20:50:22 2012
@@ -290,7 +290,6 @@ public class ManagedRMManagerTest extend
         EasyMock.expect(rme.getManager()).andReturn(manager).anyTimes();
         EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
         EasyMock.expect(rme.getDestination()).andReturn(destination).anyTimes();
-        EasyMock.expect(rme.getProtocol()).andReturn(null).anyTimes();
 
         control.replay();
         setCurrentMessageNumber(sss.get(0), 5L);
@@ -314,8 +313,10 @@ public class ManagedRMManagerTest extend
     private List<SourceSequence> createTestSourceSequences(Source source, 
                                                            EndpointReferenceType to) {
         List<SourceSequence> sss = new ArrayList<SourceSequence>();
-        sss.add(createTestSourceSequence(source, "seq1", to, new long[]{1L, 1L, 3L, 3L}));
-        sss.add(createTestSourceSequence(source, "seq2", to, new long[]{1L, 1L, 3L, 3L}));
+        sss.add(createTestSourceSequence(source, "seq1", to, 
+                                         ProtocolVariation.RM10WSA200408, new long[]{1L, 1L, 3L, 3L}));
+        sss.add(createTestSourceSequence(source, "seq2", to, 
+                                         ProtocolVariation.RM11WSA200508, new long[]{1L, 1L, 3L, 3L}));
         
         return sss;
     }
@@ -323,17 +324,20 @@ public class ManagedRMManagerTest extend
     private List<DestinationSequence> createTestDestinationSequences(Destination destination, 
                                                                      EndpointReferenceType to) {
         List<DestinationSequence> dss = new ArrayList<DestinationSequence>();
-        dss.add(createTestDestinationSequence(destination, "seq3", to, new long[]{1L, 1L, 3L, 3L}));
-        dss.add(createTestDestinationSequence(destination, "seq4", to, new long[]{1L, 1L, 3L, 3L}));
+        dss.add(createTestDestinationSequence(destination, "seq3", to, 
+                                              ProtocolVariation.RM10WSA200408, new long[]{1L, 1L, 3L, 3L}));
+        dss.add(createTestDestinationSequence(destination, "seq4", to, 
+                                              ProtocolVariation.RM11WSA200508, new long[]{1L, 1L, 3L, 3L}));
         
         return dss;
     }
 
     private SourceSequence createTestSourceSequence(Source source, String sid, 
-                                                    EndpointReferenceType to, long[] acked) {
+                                                    EndpointReferenceType to, 
+                                                    ProtocolVariation protocol, long[] acked) {
         Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
         identifier.setValue(sid);
-        SourceSequence ss = new SourceSequence(identifier, null);
+        SourceSequence ss = new SourceSequence(identifier, protocol);
         ss.setSource(source);
         ss.setTarget(to);
         List<SequenceAcknowledgement.AcknowledgementRange> ranges = 
@@ -345,10 +349,11 @@ public class ManagedRMManagerTest extend
     }
 
     private DestinationSequence createTestDestinationSequence(Destination destination, String sid, 
-                                                              EndpointReferenceType to, long[] acked) {
+                                                              EndpointReferenceType to, 
+                                                              ProtocolVariation protocol, long[] acked) {
         Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
         identifier.setValue(sid);
-        DestinationSequence ds = new DestinationSequence(identifier, to, null, null);
+        DestinationSequence ds = new DestinationSequence(identifier, to, null, protocol);
         ds.setDestination(destination);
 
         List<SequenceAcknowledgement.AcknowledgementRange> ranges = 
@@ -387,7 +392,6 @@ public class ManagedRMManagerTest extend
         
         EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
         EasyMock.expect(exchange.get(Endpoint.class)).andReturn(endpoint);
-        EasyMock.expect(exchange.getOutMessage()).andReturn(message);
         
         control.replay();
         return manager.getReliableEndpoint(message);