You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/03/30 22:50:23 UTC
svn commit: r1307604 [1/2] - in /cxf/trunk/rt/ws/rm/src:
main/java/org/apache/cxf/ws/rm/ main/java/org/apache/cxf/ws/rm/persistence/
main/java/org/apache/cxf/ws/rm/persistence/jdbc/
main/java/org/apache/cxf/ws/rm/soap/ test/java/org/apache/cxf/ws/rm/ t...
Author: ay
Date: Fri Mar 30 20:50:22 2012
New Revision: 1307604
URL: http://svn.apache.org/viewvc?rev=1307604&view=rev
Log:
[CXF-4218] Change RMEndpoint and RMStore so that the sequences are recovered and monitored correctly
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ServantTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Fri Mar 30 20:50:22 2012
@@ -66,17 +66,13 @@ public class Destination extends Abstrac
}
public void addSequence(DestinationSequence seq, boolean persist) {
- if (seq.getProtocol() == getReliableEndpoint().getProtocol()) {
- seq.setDestination(this);
- map.put(seq.getIdentifier().getValue(), seq);
- if (persist) {
- RMStore store = getReliableEndpoint().getManager().getStore();
- if (null != store) {
- store.createDestinationSequence(seq);
- }
+ seq.setDestination(this);
+ map.put(seq.getIdentifier().getValue(), seq);
+ if (persist) {
+ RMStore store = getReliableEndpoint().getManager().getStore();
+ if (null != store) {
+ store.createDestinationSequence(seq);
}
- } else {
- LOG.log(Level.SEVERE, "Incompatible protocol version");
}
}
@@ -142,7 +138,8 @@ public class Destination extends Abstrac
}
}
} else {
- RMConstants consts = getReliableEndpoint().getProtocol().getConstants();
+ ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
+ RMConstants consts = protocol.getConstants();
SequenceFaultFactory sff = new SequenceFaultFactory(consts);
throw sff.createUnknownSequenceFault(sequenceType.getIdentifier());
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Fri Mar 30 20:50:22 2012
@@ -54,20 +54,20 @@ public class ManagedRMEndpoint implement
private static final String[] SOURCE_SEQUENCE_NAMES =
{"sequenceId", "currentMessageNumber", "expires", "lastMessage", "queuedMessageCount",
- "target"};
+ "target", "wsrm", "wsa"};
private static final String[] SOURCE_SEQUENCE_DESCRIPTIONS = SOURCE_SEQUENCE_NAMES;
@SuppressWarnings("rawtypes") // needed as OpenType isn't generic on Java5
private static final OpenType[] SOURCE_SEQUENCE_TYPES =
{SimpleType.STRING, SimpleType.LONG, SimpleType.DATE, SimpleType.BOOLEAN, SimpleType.INTEGER,
- SimpleType.STRING};
+ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING};
private static final String[] DESTINATION_SEQUENCE_NAMES =
- {"sequenceId", "lastMessageNumber", "correlationId", "ackTo"};
+ {"sequenceId", "lastMessageNumber", "correlationId", "ackTo", "wsrm", "wsa"};
private static final String[] DESTINATION_SEQUENCE_DESCRIPTIONS = DESTINATION_SEQUENCE_NAMES;
@SuppressWarnings("rawtypes") // needed as OpenType isn't generic on Java5
private static final OpenType[] DESTINATION_SEQUENCE_TYPES =
{SimpleType.STRING, SimpleType.LONG, SimpleType.STRING,
- SimpleType.STRING};
+ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING};
private static final String[] RETRY_STATUS_NAMES =
{"messageNumber", "retries", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
@@ -541,7 +541,9 @@ public class ManagedRMEndpoint implement
Object[] ssv = new Object[]{ss.getIdentifier().getValue(), ss.getCurrentMessageNr(),
ss.getExpires(), ss.isLastMessage(),
manager.getRetransmissionQueue().countUnacknowledged(ss),
- getAddressValue(ss.getTarget())};
+ getAddressValue(ss.getTarget()),
+ ss.getProtocol().getWSRMNamespace(),
+ ss.getProtocol().getWSANamespace()};
CompositeData ssps = new CompositeDataSupport(sourceSequenceType,
SOURCE_SEQUENCE_NAMES, ssv);
@@ -554,7 +556,9 @@ public class ManagedRMEndpoint implement
}
Object[] dsv = new Object[]{ds.getIdentifier().getValue(), ds.getLastMessageNumber(),
ds.getCorrelationID(),
- getAddressValue(ds.getAcksTo())};
+ getAddressValue(ds.getAcksTo()),
+ ds.getProtocol().getWSRMNamespace(),
+ ds.getProtocol().getWSANamespace()};
CompositeData dsps = new CompositeDataSupport(destinationSequenceType,
DESTINATION_SEQUENCE_NAMES, dsv);
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java Fri Mar 30 20:50:22 2012
@@ -19,9 +19,7 @@
package org.apache.cxf.ws.rm;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import javax.management.JMException;
@@ -63,22 +61,12 @@ public class ManagedRMManager implements
@ManagedOperation
public String[] getEndpointIdentifiers() {
Set<String> identifiers = new HashSet<String>();
- //FIXME find this method for 2.5
-// for (Endpoint ep : manager.getReliableEndpointsMap().keySet()) {
- for (Endpoint ep : getReliableEndpointsMap().keySet()) {
+ for (Endpoint ep : manager.getReliableEndpointsMap().keySet()) {
identifiers.add(RMUtils.getEndpointIdentifier(ep));
}
return identifiers.toArray(new String[identifiers.size()]);
}
- //TODO see the comment above
- private Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
- Map<Endpoint, RMEndpoint> epmap = new HashMap<Endpoint, RMEndpoint>();
- for (ProtocolVariation pv : manager.getEndpointMaps().keySet()) {
- epmap.putAll(manager.getEndpointMaps().get(pv));
- }
- return epmap;
- }
@ManagedAttribute(description = "Using Store")
public boolean isUsingStore() {
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Fri Mar 30 20:50:22 2012
@@ -73,45 +73,46 @@ public class Proxy {
}
void acknowledge(DestinationSequence ds) throws RMException {
+ final ProtocolVariation protocol = ds.getProtocol();
String address = ds.getAcksTo().getAddress().getValue();
if (RMUtils.getAddressingConstants().getAnonymousURI().equals(address)) {
LOG.log(Level.WARNING, "STANDALONE_ANON_ACKS_NOT_SUPPORTED");
return;
}
-
- RMConstants constants = reliableEndpoint.getProtocol().getConstants();
- OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
- .getOperation(constants.getSequenceAckOperationName());
- invoke(oi, new Object[] {ds}, null);
+ RMConstants constants = protocol.getConstants();
+ OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
+ .getService().getInterface().getOperation(constants.getSequenceAckOperationName());
+ invoke(oi, protocol, new Object[] {ds}, null);
}
void terminate(SourceSequence ss) throws RMException {
- RMConstants constants = reliableEndpoint.getProtocol().getConstants();
- OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
- .getOperation(constants.getTerminateSequenceOperationName());
+ final ProtocolVariation protocol = ss.getProtocol();
+ RMConstants constants = protocol.getConstants();
+ OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
+ .getService().getInterface().getOperation(constants.getTerminateSequenceOperationName());
TerminateSequenceType ts = new TerminateSequenceType();
ts.setIdentifier(ss.getIdentifier());
- EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
- invoke(oi, new Object[] {codec.convertToSend(ts)}, null);
+ EncoderDecoder codec = protocol.getCodec();
+ invoke(oi, protocol, new Object[] {codec.convertToSend(ts)}, null);
}
- void createSequenceResponse(final Object createResponse) throws RMException {
+ void createSequenceResponse(final Object createResponse, ProtocolVariation protocol) throws RMException {
LOG.fine("sending CreateSequenceResponse from client side");
- RMConstants constants = reliableEndpoint.getProtocol().getConstants();
- final OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
- .getOperation(constants.getCreateSequenceResponseOnewayOperationName());
+ RMConstants constants = protocol.getConstants();
+ final OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo().getService()
+ .getInterface().getOperation(constants.getCreateSequenceResponseOnewayOperationName());
// TODO: need to set relatesTo
- invoke(oi, new Object[] {createResponse}, null);
+ invoke(oi, protocol, new Object[] {createResponse}, null);
}
public CreateSequenceResponseType createSequence(
EndpointReferenceType defaultAcksTo,
RelatesToType relatesTo,
- boolean isServer) throws RMException {
+ boolean isServer, final ProtocolVariation protocol) throws RMException {
SourcePolicyType sp = reliableEndpoint.getManager().getSourcePolicy();
CreateSequenceType create = new CreateSequenceType();
@@ -145,9 +146,10 @@ public class Proxy {
setOfferedIdentifier(offer);
}
- InterfaceInfo ii = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface();
+ InterfaceInfo ii = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
+ .getService().getInterface();
- EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
+ EncoderDecoder codec = protocol.getCodec();
RMConstants constants = codec.getConstants();
final OperationInfo oi = isServer
? ii.getOperation(constants.getCreateSequenceOnewayOperationName())
@@ -161,7 +163,7 @@ public class Proxy {
Runnable r = new Runnable() {
public void run() {
try {
- invoke(oi, new Object[] {send}, null);
+ invoke(oi, protocol, new Object[] {send}, null);
} catch (RMException ex) {
// already logged
}
@@ -172,11 +174,12 @@ public class Proxy {
}
- Object resp = invoke(oi, new Object[] {send}, null);
+ Object resp = invoke(oi, protocol, new Object[] {send}, null);
return codec.convertReceivedCreateSequenceResponse(resp);
}
void lastMessage(SourceSequence s) throws RMException {
+ final ProtocolVariation protocol = s.getProtocol();
EndpointReferenceType target = s.getTarget();
AttributedURIType uri = null;
if (null != target) {
@@ -196,19 +199,19 @@ public class Proxy {
LOG.log(Level.WARNING, "STANDALONE_CLOSE_SEQUENCE_ANON_TARGET_MSG");
return;
}
-
- RMConstants constants = reliableEndpoint.getProtocol().getConstants();
- OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
- .getOperation(constants.getCloseSequenceOperationName());
+ RMConstants constants = protocol.getConstants();
+ OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo().getService()
+ .getInterface().getOperation(constants.getCloseSequenceOperationName());
// pass reference to source sequence in invocation context
Map<String, Object> context = new HashMap<String, Object>(
Collections.singletonMap(SourceSequence.class.getName(),
(Object)s));
- invoke(oi, new Object[] {}, context);
+ invoke(oi, protocol, new Object[] {}, context);
}
void ackRequested(SourceSequence s) throws RMException {
+ final ProtocolVariation protocol = s.getProtocol();
EndpointReferenceType target = s.getTarget();
AttributedURIType uri = null;
if (null != target) {
@@ -229,10 +232,10 @@ public class Proxy {
return;
}
- RMConstants constants = reliableEndpoint.getProtocol().getConstants();
- OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
- .getOperation(constants.getAckRequestedOperationName());
- invoke(oi, new Object[] {}, null);
+ RMConstants constants = protocol.getConstants();
+ OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo().getService()
+ .getInterface().getOperation(constants.getAckRequestedOperationName());
+ invoke(oi, protocol, new Object[] {}, null);
}
Identifier getOfferedIdentifier() {
@@ -245,7 +248,8 @@ public class Proxy {
}
}
- Object invoke(OperationInfo oi, Object[] params, Map<String, Object> context) throws RMException {
+ Object invoke(OperationInfo oi, ProtocolVariation protocol,
+ Object[] params, Map<String, Object> context) throws RMException {
if (LOG.isLoggable(Level.INFO)) {
LOG.log(Level.INFO, "Sending out-of-band RM protocol message {0}.",
@@ -254,8 +258,8 @@ public class Proxy {
RMManager manager = reliableEndpoint.getManager();
Bus bus = manager.getBus();
- Endpoint endpoint = reliableEndpoint.getEndpoint();
- BindingInfo bi = reliableEndpoint.getBindingInfo();
+ Endpoint endpoint = reliableEndpoint.getEndpoint(protocol);
+ BindingInfo bi = reliableEndpoint.getBindingInfo(protocol);
Conduit c = reliableEndpoint.getConduit();
Client client = null;
if (params.length > 0 && params[0] instanceof DestinationSequence) {
@@ -265,11 +269,11 @@ public class Proxy {
attrURIType.setValue(acksAddress);
EndpointReferenceType acks = new EndpointReferenceType();
acks.setAddress(attrURIType);
- client = createClient(bus, endpoint, c, acks);
+ client = createClient(bus, endpoint, protocol, c, acks);
params = new Object[] {};
} else {
EndpointReferenceType replyTo = reliableEndpoint.getReplyTo();
- client = createClient(bus, endpoint, c, replyTo);
+ client = createClient(bus, endpoint, protocol, c, replyTo);
}
BindingOperationInfo boi = bi.getOperation(oi);
@@ -289,8 +293,8 @@ public class Proxy {
return null;
}
- protected Client createClient(Bus bus, Endpoint endpoint, Conduit conduit,
- final EndpointReferenceType address) {
+ protected Client createClient(Bus bus, Endpoint endpoint, final ProtocolVariation protocol,
+ Conduit conduit, final EndpointReferenceType address) {
ConduitSelector cs = new DeferredConduitSelector(conduit) {
@Override
public synchronized Conduit selectConduit(Message message) {
@@ -311,7 +315,6 @@ public class Proxy {
};
RMClient client = new RMClient(bus, endpoint, cs);
Map<String, Object> context = client.getRequestContext();
- ProtocolVariation protocol = reliableEndpoint.getProtocol();
context.put(RMManager.WSRM_VERSION_PROPERTY, protocol.getWSRMNamespace());
context.put(RMManager.WSRM_WSA_VERSION_PROPERTY, protocol.getWSANamespace());
return client;
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Fri Mar 30 20:50:22 2012
@@ -138,4 +138,13 @@ public final class RMContextUtils {
return outbound
? RMMessageConstants.RM_PROPERTIES_OUTBOUND : RMMessageConstants.RM_PROPERTIES_INBOUND;
}
+
+ //TODO put this key to the constant
+ public static ProtocolVariation getProtocolVariation(Message message) {
+ return (ProtocolVariation)message.get(RMMessageConstants.RM_PROTOCOL_VARIATION);
+ }
+
+ public static void setProtocolVariation(Message message, ProtocolVariation protocol) {
+ message.put(RMMessageConstants.RM_PROTOCOL_VARIATION, protocol);
+ }
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Fri Mar 30 20:50:22 2012
@@ -20,7 +20,9 @@
package org.apache.cxf.ws.rm;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -86,17 +88,13 @@ public class RMEndpoint {
private RMManager manager;
private Endpoint applicationEndpoint;
private Conduit conduit;
- private ProtocolVariation protocol;
private EndpointReferenceType replyTo;
private Source source;
private Destination destination;
- private WrappedService service;
- private Endpoint endpoint;
+ private Map<ProtocolVariation, WrappedService> services;
+ private Map<ProtocolVariation, Endpoint> endpoints;
private Proxy proxy;
private Servant servant;
- private QName serviceQName;
- private QName bindingQName;
- private QName interfaceQName;
private long lastApplicationMessage;
private long lastControlMessage;
private InstrumentationManager instrumentationManager;
@@ -109,14 +107,15 @@ public class RMEndpoint {
* @param ae
* @param pv
*/
- public RMEndpoint(RMManager m, Endpoint ae, ProtocolVariation pv) {
+ public RMEndpoint(RMManager m, Endpoint ae) {
manager = m;
applicationEndpoint = ae;
source = new Source(this);
destination = new Destination(this);
proxy = new Proxy(this);
servant = new Servant(this);
- protocol = pv;
+ services = new HashMap<ProtocolVariation, WrappedService>();
+ endpoints = new HashMap<ProtocolVariation, Endpoint>();
}
/**
@@ -136,36 +135,23 @@ public class RMEndpoint {
/**
* @return Returns the RM protocol endpoint.
*/
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
- public ProtocolVariation getProtocol() {
- return protocol;
- }
-
- /**
- * Set the protocol used by this endpoint. This method is only intended for use in testing; all normal use
- * uses the constructor to set the value.
- *
- * @param protocol
- */
- public void setProtocol(ProtocolVariation protocol) {
- this.protocol = protocol;
+ public Endpoint getEndpoint(ProtocolVariation protocol) {
+ return endpoints.get(protocol);
}
/**
* @return Returns the RM protocol service.
*/
- public Service getService() {
- return service;
+ public Service getService(ProtocolVariation protocol) {
+ return services.get(protocol);
}
/**
* @return Returns the RM protocol binding info.
*/
- public BindingInfo getBindingInfo() {
- return service.getServiceInfo().getBinding(bindingQName);
+ public BindingInfo getBindingInfo(ProtocolVariation protocol) {
+ final QName bindingQName = new QName(protocol.getWSRMNamespace(), BINDING_NAME);
+ return services.get(protocol).getServiceInfo().getBinding(bindingQName);
}
/**
@@ -261,8 +247,8 @@ public class RMEndpoint {
org.apache.cxf.transport.Destination d) {
conduit = c;
replyTo = r;
- createService();
- createEndpoint(d);
+ createServices();
+ createEndpoints(d);
setPolicies();
if (manager != null && manager.getBus() != null) {
managedEndpoint = new ManagedRMEndpoint(this);
@@ -277,15 +263,24 @@ public class RMEndpoint {
}
}
- void createService() {
+ // internally we keep three services and three endpoints to support three protocol variations
+ // using the specifically generated jaxb classes and operation names etc but this could probably
+ // be simplified/unified.
+ void createServices() {
+ for (ProtocolVariation protocol : ProtocolVariation.values()) {
+ createService(protocol);
+ }
+ }
+
+ void createService(ProtocolVariation protocol) {
ServiceInfo si = new ServiceInfo();
si.setProperty(Schema.class.getName(), getSchema());
- serviceQName = new QName(protocol.getWSRMNamespace(), SERVICE_NAME);
+ QName serviceQName = new QName(protocol.getWSRMNamespace(), SERVICE_NAME);
si.setName(serviceQName);
- buildInterfaceInfo(si);
+ buildInterfaceInfo(si, protocol);
- service = new WrappedService(applicationEndpoint.getService(), serviceQName, si);
+ WrappedService service = new WrappedService(applicationEndpoint.getService(), serviceQName, si);
DataBinding dataBinding = null;
Class<?> create = protocol.getCodec().getCreateSequenceType();
@@ -298,6 +293,7 @@ public class RMEndpoint {
}
service.setDataBinding(dataBinding);
service.setInvoker(servant);
+ services.put(protocol, service);
}
private static synchronized Schema getSchema() {
@@ -320,10 +316,18 @@ public class RMEndpoint {
}
return rmSchema;
}
-
- void createEndpoint(org.apache.cxf.transport.Destination d) {
+
+ void createEndpoints(org.apache.cxf.transport.Destination d) {
+ for (ProtocolVariation protocol : ProtocolVariation.values()) {
+ createEndpoint(d, protocol);
+ }
+ }
+
+ void createEndpoint(org.apache.cxf.transport.Destination d, ProtocolVariation protocol) {
+ final QName bindingQName = new QName(protocol.getWSRMNamespace(), BINDING_NAME);
+ WrappedService service = services.get(protocol);
ServiceInfo si = service.getServiceInfo();
- buildBindingInfo(si);
+ buildBindingInfo(si, protocol);
EndpointInfo aei = applicationEndpoint.getEndpointInfo();
String transportId = aei.getTransportId();
EndpointInfo ei = new EndpointInfo(si, transportId);
@@ -346,8 +350,9 @@ public class RMEndpoint {
}
si.addEndpoint(ei);
- endpoint = new WrappedEndpoint(applicationEndpoint, ei, service);
+ Endpoint endpoint = new WrappedEndpoint(applicationEndpoint, ei, service);
service.setEndpoint(endpoint);
+ endpoints.put(protocol, endpoint);
}
void setPolicies() {
@@ -357,7 +362,7 @@ public class RMEndpoint {
return;
}
- EndpointInfo ei = getEndpoint().getEndpointInfo();
+ EndpointInfo ei = getEndpoint(ProtocolVariation.RM10WSA200408).getEndpointInfo();
PolicyInterceptorProviderRegistry reg = manager.getBus()
.getExtension(PolicyInterceptorProviderRegistry.class);
@@ -388,23 +393,23 @@ public class RMEndpoint {
// TODO: FaultPolicy (SequenceFault)
}
- void buildInterfaceInfo(ServiceInfo si) {
- interfaceQName = new QName(protocol.getWSRMNamespace(), INTERFACE_NAME);
+ void buildInterfaceInfo(ServiceInfo si, ProtocolVariation protocol) {
+ QName interfaceQName = new QName(protocol.getWSRMNamespace(), INTERFACE_NAME);
InterfaceInfo ii = new InterfaceInfo(si, interfaceQName);
- buildOperationInfo(ii);
+ buildOperationInfo(ii, protocol);
}
- void buildOperationInfo(InterfaceInfo ii) {
- buildCreateSequenceOperationInfo(ii);
- buildTerminateSequenceOperationInfo(ii);
- buildSequenceAckOperationInfo(ii);
- buildCloseSequenceOperationInfo(ii);
- buildAckRequestedOperationInfo(ii);
+ void buildOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
+ buildCreateSequenceOperationInfo(ii, protocol);
+ buildTerminateSequenceOperationInfo(ii, protocol);
+ buildSequenceAckOperationInfo(ii, protocol);
+ buildCloseSequenceOperationInfo(ii, protocol);
+ buildAckRequestedOperationInfo(ii, protocol);
// TODO: FaultInfo (SequenceFault)
}
- void buildCreateSequenceOperationInfo(InterfaceInfo ii) {
+ void buildCreateSequenceOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
OperationInfo operationInfo = null;
MessagePartInfo partInfo = null;
@@ -447,7 +452,7 @@ public class RMEndpoint {
partInfo.setTypeClass(protocol.getCodec().getCreateSequenceResponseType());
}
- void buildTerminateSequenceOperationInfo(InterfaceInfo ii) {
+ void buildTerminateSequenceOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
OperationInfo operationInfo = null;
MessagePartInfo partInfo = null;
@@ -464,7 +469,7 @@ public class RMEndpoint {
partInfo.setTypeClass(protocol.getCodec().getTerminateSequenceType());
}
- void buildSequenceAckOperationInfo(InterfaceInfo ii) {
+ void buildSequenceAckOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
OperationInfo operationInfo = null;
MessageInfo messageInfo = null;
@@ -476,7 +481,7 @@ public class RMEndpoint {
operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
}
- void buildCloseSequenceOperationInfo(InterfaceInfo ii) {
+ void buildCloseSequenceOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
OperationInfo operationInfo = null;
MessageInfo messageInfo = null;
@@ -488,7 +493,7 @@ public class RMEndpoint {
operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
}
- void buildAckRequestedOperationInfo(InterfaceInfo ii) {
+ void buildAckRequestedOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
OperationInfo operationInfo = null;
MessageInfo messageInfo = null;
@@ -500,17 +505,17 @@ public class RMEndpoint {
operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
}
- void buildBindingInfo(ServiceInfo si) {
+ void buildBindingInfo(ServiceInfo si, ProtocolVariation protocol) {
// use same binding id as for application endpoint
// also, to workaround the problem that it may not be possible to determine
// the soap version depending on the bindingId, speciffy the soap version
// explicitly
if (null != applicationEndpoint) {
+ final QName bindingQName = new QName(protocol.getWSRMNamespace(), BINDING_NAME);
SoapBindingInfo sbi = (SoapBindingInfo)applicationEndpoint.getEndpointInfo().getBinding();
SoapVersion sv = sbi.getSoapVersion();
String bindingId = sbi.getBindingId();
SoapBindingInfo bi = new SoapBindingInfo(si, bindingId, sv);
- bindingQName = new QName(protocol.getWSRMNamespace(), BINDING_NAME);
bi.setName(bindingQName);
BindingOperationInfo boi = null;
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Fri Mar 30 20:50:22 2012
@@ -84,6 +84,12 @@ public class RMInInterceptor extends Abs
LOG.fine("Restoring original requestor role to: " + originalRequestor);
message.put(Message.REQUESTOR_ROLE, originalRequestor);
}
+
+ String rmUri = getManager().getRMNamespace(message);
+ String addrUri = getManager().getAddressingNamespace(message);
+
+ ProtocolVariation protocol = ProtocolVariation.findVariant(rmUri, addrUri);
+ RMContextUtils.setProtocolVariation(message, protocol);
// Destination destination = getManager().getDestination(message);
// RMEndpoint rme = getManager().getReliableEndpoint(message);
@@ -101,7 +107,7 @@ public class RMInInterceptor extends Abs
if (isApplicationMessage) {
if (null != rmps) {
- processAcknowledgments(rme, rmps);
+ processAcknowledgments(rme, rmps, protocol);
processAcknowledgmentRequests(destination, message);
processSequence(destination, message);
processDeliveryAssurance(rmps);
@@ -116,7 +122,7 @@ public class RMInInterceptor extends Abs
rme.receivedControlMessage();
if (RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)
|| RM11Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)) {
- processAcknowledgments(rme, rmps);
+ processAcknowledgments(rme, rmps, protocol);
} else if (RM10Constants.CLOSE_SEQUENCE_ACTION.equals(action)
|| RM11Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)) {
processSequence(destination, message);
@@ -126,7 +132,7 @@ public class RMInInterceptor extends Abs
Servant servant = rme.getServant();
Object csr = servant.createSequence(message);
Proxy proxy = rme.getProxy();
- proxy.createSequenceResponse(csr);
+ proxy.createSequenceResponse(csr, protocol);
return;
}
}
@@ -134,7 +140,8 @@ public class RMInInterceptor extends Abs
assertReliability(message);
}
- void processAcknowledgments(RMEndpoint rme, RMProperties rmps) throws SequenceFault, RMException {
+ void processAcknowledgments(RMEndpoint rme, RMProperties rmps, ProtocolVariation protocol)
+ throws SequenceFault, RMException {
Collection<SequenceAcknowledgement> acks = rmps.getAcks();
Source source = rme.getSource();
@@ -145,7 +152,7 @@ public class RMInInterceptor extends Abs
if (null != ss) {
ss.setAcknowledged(ack);
} else {
- RMConstants consts = rme.getProtocol().getConstants();
+ RMConstants consts = protocol.getConstants();
SequenceFaultFactory sff = new SequenceFaultFactory(consts);
throw sff.createUnknownSequenceFault(id);
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Fri Mar 30 20:50:22 2012
@@ -103,7 +103,7 @@ public class RMManager {
private RMStore store;
private SequenceIdentifierGenerator idGenerator;
private RetransmissionQueue retransmissionQueue;
- private Map<ProtocolVariation, Map<Endpoint, RMEndpoint>> endpointMaps;
+ private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
private AtomicReference<Timer> timer = new AtomicReference<Timer>();
private RMAssertion rmAssertion;
private DeliveryAssuranceType deliveryAssurance;
@@ -114,16 +114,10 @@ public class RMManager {
private String rmNamespace = RM10Constants.NAMESPACE_URI;
private RM10AddressingNamespaceType rm10AddressingNamespace;
- public RMManager() {
- setEndpointMaps(new HashMap<ProtocolVariation, Map<Endpoint, RMEndpoint>>());
- }
-
// ServerLifeCycleListener
public void startServer(Server server) {
- for (ProtocolVariation protocol : ProtocolVariation.values()) {
- recoverReliableEndpoint(server.getEndpoint(), (Conduit)null, protocol);
- }
+ recoverReliableEndpoint(server.getEndpoint(), (Conduit)null);
}
public void stopServer(Server server) {
@@ -137,13 +131,12 @@ public class RMManager {
return;
}
String id = RMUtils.getEndpointIdentifier(client.getEndpoint());
- ProtocolVariation protocol = getConfiguredProtocol();
- Collection<SourceSequence> sss = store.getSourceSequences(id, protocol);
+ Collection<SourceSequence> sss = store.getSourceSequences(id);
if (null == sss || 0 == sss.size()) {
return;
}
LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
- recoverReliableEndpoint(client.getEndpoint(), client.getConduit(), protocol);
+ recoverReliableEndpoint(client.getEndpoint(), client.getConduit());
}
private ProtocolVariation getConfiguredProtocol() {
@@ -318,16 +311,15 @@ public class RMManager {
String rmUri = getRMNamespace(message);
String addrUri = getAddressingNamespace(message);
ProtocolVariation protocol = ProtocolVariation.findVariant(rmUri, addrUri);
- Map<Endpoint, RMEndpoint> endpointMap = endpointMaps.get(protocol);
- if (endpointMap == null) {
+ if (protocol == null) {
org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
"UNSUPPORTED_NAMESPACE", LOG, addrUri, rmUri);
LOG.log(Level.INFO, msg.toString());
throw new RMException(msg);
}
- RMEndpoint rme = endpointMap.get(endpoint);
+ RMEndpoint rme = reliableEndpoints.get(endpoint);
if (null == rme) {
- rme = createReliableEndpoint(endpoint, protocol);
+ rme = createReliableEndpoint(endpoint);
org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
EndpointReferenceType replyTo = null;
if (null != destination) {
@@ -340,7 +332,7 @@ public class RMManager {
.getProperty(MAPAggregator.DECOUPLED_DESTINATION,
org.apache.cxf.transport.Destination.class);
rme.initialise(message.getExchange().getConduit(message), replyTo, dest);
- endpointMap.put(endpoint, rme);
+ reliableEndpoints.put(endpoint, rme);
LOG.fine("Created new RMEndpoint.");
}
return rme;
@@ -409,6 +401,7 @@ public class RMManager {
Source source = getSource(message);
SourceSequence seq = source.getCurrent(inSeqId);
+ ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
if (null == seq || seq.isExpired()) {
// TODO: better error handling
EndpointReferenceType to = null;
@@ -452,10 +445,11 @@ public class RMManager {
throw new RMException(msg);
}
Proxy proxy = source.getReliableEndpoint().getProxy();
- CreateSequenceResponseType createResponse = proxy.createSequence(acksTo, relatesTo, isServer);
+ CreateSequenceResponseType createResponse =
+ proxy.createSequence(acksTo, relatesTo, isServer, protocol);
if (!isServer) {
Servant servant = source.getReliableEndpoint().getServant();
- servant.createSequenceResponse(createResponse);
+ servant.createSequenceResponse(createResponse, protocol);
}
seq = source.awaitCurrent(inSeqId);
@@ -468,15 +462,12 @@ public class RMManager {
@PreDestroy
public void shutdown() {
// shutdown remaining endpoints
- for (ProtocolVariation protocol : ProtocolVariation.values()) {
- Map<Endpoint, RMEndpoint> map = endpointMaps.get(protocol);
- if (map.size() > 0) {
- LOG.log(Level.FINE,
- "Shutting down RMManager with {0} remaining endpoints for protocol variation {1}.",
- new Object[] {new Integer(map.size()), protocol });
- for (RMEndpoint rme : map.values()) {
- rme.shutdown();
- }
+ if (reliableEndpoints.size() > 0) {
+ LOG.log(Level.FINE,
+ "Shutting down RMManager with {0} remaining endpoints.",
+ new Object[] {new Integer(reliableEndpoints.size())});
+ for (RMEndpoint rme : reliableEndpoints.values()) {
+ rme.shutdown();
}
}
@@ -493,13 +484,8 @@ public class RMManager {
synchronized void shutdownReliableEndpoint(Endpoint e) {
RMEndpoint rme = null;
- for (ProtocolVariation protocol : ProtocolVariation.values()) {
- Map<Endpoint, RMEndpoint> map = endpointMaps.get(protocol);
- rme = map.get(e);
- if (rme != null) {
- break;
- }
- }
+
+ rme = reliableEndpoints.get(e);
if (rme == null) {
// not found
return;
@@ -513,20 +499,18 @@ public class RMManager {
t.purge();
}
- for (ProtocolVariation protocol : ProtocolVariation.values()) {
- endpointMaps.get(protocol).remove(e);
- }
+ reliableEndpoints.remove(e);
}
- void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit, ProtocolVariation protocol) {
+ void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit) {
if (null == store || null == retransmissionQueue) {
return;
}
String id = RMUtils.getEndpointIdentifier(endpoint);
- Collection<SourceSequence> sss = store.getSourceSequences(id, protocol);
- Collection<DestinationSequence> dss = store.getDestinationSequences(id, protocol);
+ Collection<SourceSequence> sss = store.getSourceSequences(id);
+ Collection<DestinationSequence> dss = store.getDestinationSequences(id);
if ((null == sss || 0 == sss.size()) && (null == dss || 0 == dss.size())) {
return;
}
@@ -535,9 +519,9 @@ public class RMManager {
LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
new Object[] {null == conduit ? "client" : "server", id});
- RMEndpoint rme = createReliableEndpoint(endpoint, protocol);
+ RMEndpoint rme = createReliableEndpoint(endpoint);
rme.initialise(conduit, null, null);
- endpointMaps.get(protocol).put(endpoint, rme);
+ reliableEndpoints.put(endpoint, rme);
SourceSequence css = null;
for (SourceSequence ss : sss) {
@@ -591,7 +575,8 @@ public class RMManager {
}
message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
-
+ RMContextUtils.setProtocolVariation(message, ss.getProtocol());
+
retransmissionQueue.addUnacknowledged(message);
}
}
@@ -603,8 +588,8 @@ public class RMManager {
}
- RMEndpoint createReliableEndpoint(Endpoint endpoint, ProtocolVariation protocol) {
- return new RMEndpoint(this, endpoint, protocol);
+ RMEndpoint createReliableEndpoint(Endpoint endpoint) {
+ return new RMEndpoint(this, endpoint);
}
public void init(Bus b) {
@@ -688,6 +673,14 @@ public class RMManager {
}
}
+ Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
+ return reliableEndpoints;
+ }
+
+ void setReliableEndpointsMap(Map<Endpoint, RMEndpoint> map) {
+ reliableEndpoints = map;
+ }
+
class DefaultSequenceIdentifierGenerator implements SequenceIdentifierGenerator {
public Identifier generateSequenceIdentifier() {
@@ -697,15 +690,4 @@ public class RMManager {
return sid;
}
}
-
- Map<ProtocolVariation, Map<Endpoint, RMEndpoint>> getEndpointMaps() {
- return endpointMaps;
- }
-
- final void setEndpointMaps(Map<ProtocolVariation, Map<Endpoint, RMEndpoint>> endpointMaps) {
- endpointMaps.put(ProtocolVariation.RM10WSA200408, new HashMap<Endpoint, RMEndpoint>());
- endpointMaps.put(ProtocolVariation.RM10WSA200508, new HashMap<Endpoint, RMEndpoint>());
- endpointMaps.put(ProtocolVariation.RM11WSA200508, new HashMap<Endpoint, RMEndpoint>());
- this.endpointMaps = endpointMaps;
- }
}
\ No newline at end of file
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Fri Mar 30 20:50:22 2012
@@ -42,6 +42,9 @@ public final class RMMessageConstants {
public static final String SAVED_CONTENT =
"org.apache.cxf.ws.rm.content";
+ static final String RM_PROTOCOL_VARIATION =
+ "org.apache.cxf.ws.rm.protocol";
+
// keep this constant in the ws-rm package until it finds a general use outside of ws-rm
static final String DELIVERING_ROBUST_ONEWAY =
"org.apache.cxf.oneway.robust.delivering";
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Fri Mar 30 20:50:22 2012
@@ -62,7 +62,11 @@ public class RMOutInterceptor extends Ab
}
Source source = getManager().getSource(msg);
- ProtocolVariation protocol = source.getReliableEndpoint().getProtocol();
+ String rmUri = getManager().getRMNamespace(msg);
+ String addrUri = getManager().getAddressingNamespace(msg);
+
+ ProtocolVariation protocol = ProtocolVariation.findVariant(rmUri, addrUri);
+ RMContextUtils.setProtocolVariation(msg, protocol);
maps.exposeAs(protocol.getWSANamespace());
Destination destination = getManager().getDestination(msg);
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Fri Mar 30 20:50:22 2012
@@ -60,6 +60,7 @@ public class Servant implements Invoker
public Object invoke(Exchange exchange, Object o) {
LOG.fine("Invoking on RM Endpoint");
+ final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(exchange.getInMessage());
OperationInfo oi = exchange.get(OperationInfo.class);
if (null == oi) {
LOG.fine("No operation info.");
@@ -77,10 +78,10 @@ public class Servant implements Invoker
}
} else if (RM10Constants.INSTANCE.getCreateSequenceResponseOnewayOperationName().equals(oi.getName())
|| RM11Constants.INSTANCE.getCreateSequenceResponseOnewayOperationName().equals(oi.getName())) {
- EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
+ EncoderDecoder codec = protocol.getCodec();
CreateSequenceResponseType createResponse =
codec.convertReceivedCreateSequenceResponse(getParameter(exchange.getInMessage()));
- createSequenceResponse(createResponse);
+ createSequenceResponse(createResponse, protocol);
} else if (RM10Constants.INSTANCE.getTerminateSequenceOperationName().equals(oi.getName())
|| RM11Constants.INSTANCE.getTerminateSequenceOperationName().equals(oi.getName())) {
terminateSequence(exchange.getInMessage());
@@ -92,6 +93,7 @@ public class Servant implements Invoker
Object createSequence(Message message) {
LOG.fine("Creating sequence");
+ final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
Message outMessage = message.getExchange().getOutMessage();
@@ -99,7 +101,7 @@ public class Servant implements Invoker
RMContextUtils.storeMAPs(maps, outMessage, false, false);
}
- EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
+ EncoderDecoder codec = protocol.getCodec();
CreateSequenceType create = codec.convertReceivedCreateSequence(getParameter(message));
Destination destination = reliableEndpoint.getDestination();
@@ -135,7 +137,7 @@ public class Servant implements Invoker
// AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
accept.setAcksTo(RMUtils.createReference(maps.getTo().getValue()));
SourceSequence seq = new SourceSequence(offer.getIdentifier(), null,
- createResponse.getIdentifier(), reliableEndpoint.getProtocol());
+ createResponse.getIdentifier(), protocol);
seq.setExpires(offer.getExpires());
seq.setTarget(create.getAcksTo());
source.addSequence(seq);
@@ -154,18 +156,19 @@ public class Servant implements Invoker
}
DestinationSequence seq = new DestinationSequence(createResponse.getIdentifier(),
- create.getAcksTo(), destination, reliableEndpoint.getProtocol());
+ create.getAcksTo(), destination, protocol);
seq.setCorrelationID(maps.getMessageID().getValue());
destination.addSequence(seq);
LOG.fine("returning " + createResponse);
return codec.convertToSend(createResponse);
}
- public void createSequenceResponse(CreateSequenceResponseType createResponse) {
+ public void createSequenceResponse(CreateSequenceResponseType createResponse,
+ ProtocolVariation protocol) {
LOG.fine("Creating sequence response");
SourceSequence seq = new SourceSequence(createResponse.getIdentifier(),
- reliableEndpoint.getProtocol());
+ protocol);
seq.setExpires(createResponse.getExpires());
Source source = reliableEndpoint.getSource();
source.addSequence(seq);
@@ -187,7 +190,7 @@ public class Servant implements Invoker
String address = accept.getAcksTo().getAddress().getValue();
if (!RMUtils.getAddressingConstants().getNoneURI().equals(address)) {
DestinationSequence ds = new DestinationSequence(offeredId, accept.getAcksTo(), dest,
- reliableEndpoint.getProtocol());
+ protocol);
dest.addSequence(ds);
}
}
@@ -195,8 +198,9 @@ public class Servant implements Invoker
public void terminateSequence(Message message) {
LOG.fine("Terminating sequence");
+ final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
- EncoderDecoder codec = reliableEndpoint.getProtocol().getCodec();
+ EncoderDecoder codec = protocol.getCodec();
TerminateSequenceType terminate = codec.convertReceivedTerminateSequence(getParameter(message));
// check if the terminated sequence was created in response to a a createSequence
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java Fri Mar 30 20:50:22 2012
@@ -26,18 +26,13 @@ import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.v200702.Identifier;
public class Source extends AbstractEndpoint {
- private static final Logger LOG = LogUtils.getL7dLogger(Source.class);
-
private static final String REQUESTOR_SEQUENCE_ID = "";
private Map<String, SourceSequence> map;
@@ -68,17 +63,13 @@ public class Source extends AbstractEndp
}
public void addSequence(SourceSequence seq, boolean persist) {
- if (seq.getProtocol() == getReliableEndpoint().getProtocol()) {
- seq.setSource(this);
- map.put(seq.getIdentifier().getValue(), seq);
- if (persist) {
- RMStore store = getReliableEndpoint().getManager().getStore();
- if (null != store) {
- store.createSourceSequence(seq);
- }
+ seq.setSource(this);
+ map.put(seq.getIdentifier().getValue(), seq);
+ if (persist) {
+ RMStore store = getReliableEndpoint().getManager().getStore();
+ if (null != store) {
+ store.createSourceSequence(seq);
}
- } else {
- LOG.log(Level.SEVERE, "Incompatible protocol version");
}
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java Fri Mar 30 20:50:22 2012
@@ -22,7 +22,6 @@ package org.apache.cxf.ws.rm.persistence
import java.util.Collection;
import org.apache.cxf.ws.rm.DestinationSequence;
-import org.apache.cxf.ws.rm.ProtocolVariation;
import org.apache.cxf.ws.rm.SourceSequence;
import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -48,14 +47,14 @@ public interface RMStore {
* @param seq the sequence
* @return the sequence if present; otherwise null
*/
- SourceSequence getSourceSequence(Identifier seq, ProtocolVariation protocol);
+ SourceSequence getSourceSequence(Identifier seq);
/**
* Retrieve the destination sequence with the specified identifier from persistent store.
* @param seq the sequence
* @return the sequence if present; otherwise null
*/
- DestinationSequence getDestinationSequence(Identifier seq, ProtocolVariation protocol);
+ DestinationSequence getDestinationSequence(Identifier seq);
/**
* Remove the source sequence with the specified identifier from persistent store.
@@ -76,8 +75,7 @@ public interface RMStore {
* @param endpointIdentifier the identifier for the source
* @return the collection of sequences
*/
- Collection<SourceSequence> getSourceSequences(String endpointIdentifier,
- ProtocolVariation protocol);
+ Collection<SourceSequence> getSourceSequences(String endpointIdentifier);
/**
* Retrieves all sequences managed by the identified RM destination endpoint
@@ -86,8 +84,7 @@ public interface RMStore {
* @param endpointIdentifier the identifier for the destination
* @return the collection of sequences
*/
- Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier,
- ProtocolVariation protocol);
+ Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier);
/**
* Retrieves the outbound/inbound messages stored for the source/destination sequence with
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Fri Mar 30 20:50:22 2012
@@ -66,6 +66,7 @@ public class RMTxStore implements RMStor
+ "LAST_MSG_NO DECIMAL(19, 0), "
+ "ENDPOINT_ID VARCHAR(1024), "
+ "ACKNOWLEDGED BLOB, "
+ + "PROTOCOL_VERSION VARCHAR(256), "
+ "PRIMARY KEY (SEQ_ID))";
private static final String CREATE_SRC_SEQUENCES_TABLE_STMT =
"CREATE TABLE CXF_RM_SRC_SEQUENCES "
@@ -74,7 +75,8 @@ public class RMTxStore implements RMStor
+ "LAST_MSG CHAR(1), "
+ "EXPIRY DECIMAL(19, 0), "
+ "OFFERING_SEQ_ID VARCHAR(256), "
- + "ENDPOINT_ID VARCHAR(1024), "
+ + "ENDPOINT_ID VARCHAR(1024), "
+ + "PROTOCOL_VERSION VARCHAR(256), "
+ "PRIMARY KEY (SEQ_ID))";
private static final String CREATE_MESSAGES_TABLE_STMT =
"CREATE TABLE {0} "
@@ -87,9 +89,10 @@ public class RMTxStore implements RMStor
private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";
private static final String CREATE_DEST_SEQUENCE_STMT_STR
- = "INSERT INTO CXF_RM_DEST_SEQUENCES (SEQ_ID, ACKS_TO, ENDPOINT_ID) VALUES(?, ?, ?)";
+ = "INSERT INTO CXF_RM_DEST_SEQUENCES (SEQ_ID, ACKS_TO, ENDPOINT_ID, PROTOCOL_VERSION) "
+ + "VALUES(?, ?, ?, ?)";
private static final String CREATE_SRC_SEQUENCE_STMT_STR
- = "INSERT INTO CXF_RM_SRC_SEQUENCES VALUES(?, 1, '0', ?, ?, ?)";
+ = "INSERT INTO CXF_RM_SRC_SEQUENCES VALUES(?, 1, '0', ?, ?, ?, ?)";
private static final String DELETE_DEST_SEQUENCE_STMT_STR =
"DELETE FROM CXF_RM_DEST_SEQUENCES WHERE SEQ_ID = ?";
private static final String DELETE_SRC_SEQUENCE_STMT_STR =
@@ -103,17 +106,17 @@ public class RMTxStore implements RMStor
private static final String DELETE_MESSAGE_STMT_STR =
"DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
private static final String SELECT_DEST_SEQUENCE_STMT_STR =
- "SELECT ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "WHERE SEQ_ID = ?";
private static final String SELECT_SRC_SEQUENCE_STMT_STR =
- "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CXF_RM_SRC_SEQUENCES "
+ "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION FROM CXF_RM_SRC_SEQUENCES "
+ "WHERE SEQ_ID = ?";
private static final String SELECT_DEST_SEQUENCES_STMT_STR =
- "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "WHERE ENDPOINT_ID = ?";
private static final String SELECT_SRC_SEQUENCES_STMT_STR =
- "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CXF_RM_SRC_SEQUENCES "
- + "WHERE ENDPOINT_ID = ?";
+ "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION "
+ + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
private static final String SELECT_MESSAGES_STMT_STR =
"SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?";
@@ -209,6 +212,7 @@ public class RMTxStore implements RMStor
public void createDestinationSequence(DestinationSequence seq) {
String sequenceIdentifier = seq.getIdentifier().getValue();
String endpointIdentifier = seq.getEndpointIdentifier();
+ String protocolVersion = encodeProtocolVersion(seq.getProtocol());
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Creating destination sequence: " + sequenceIdentifier + ", (endpoint: "
+ endpointIdentifier + ")");
@@ -223,7 +227,7 @@ public class RMTxStore implements RMStor
String addr = seq.getAcksTo().getAddress().getValue();
createDestSequenceStmt.setString(2, addr);
createDestSequenceStmt.setString(3, endpointIdentifier);
-
+ createDestSequenceStmt.setString(4, protocolVersion);
createDestSequenceStmt.execute();
commit();
@@ -237,6 +241,7 @@ public class RMTxStore implements RMStor
public void createSourceSequence(SourceSequence seq) {
String sequenceIdentifier = seq.getIdentifier().getValue();
String endpointIdentifier = seq.getEndpointIdentifier();
+ String protocolVersion = encodeProtocolVersion(seq.getProtocol());
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Creating source sequence: " + sequenceIdentifier + ", (endpoint: "
+ endpointIdentifier + ")");
@@ -255,6 +260,7 @@ public class RMTxStore implements RMStor
Identifier osid = seq.getOfferingSequenceIdentifier();
createSrcSequenceStmt.setString(3, osid == null ? null : osid.getValue());
createSrcSequenceStmt.setString(4, endpointIdentifier);
+ createSrcSequenceStmt.setString(5, protocolVersion);
createSrcSequenceStmt.execute();
commit();
@@ -264,8 +270,8 @@ public class RMTxStore implements RMStor
throw new RMStoreException(ex);
}
}
-
- public DestinationSequence getDestinationSequence(Identifier sid, ProtocolVariation protocol) {
+
+ public DestinationSequence getDestinationSequence(Identifier sid) {
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting destination sequence for id: " + sid);
}
@@ -275,26 +281,26 @@ public class RMTxStore implements RMStor
connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);
}
selectDestSequenceStmt.setString(1, sid.getValue());
-
ResultSet res = selectDestSequenceStmt.executeQuery();
if (res.next()) {
EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
long lm = res.getLong(2);
- InputStream is = res.getBinaryStream(3);
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
+ InputStream is = res.getBinaryStream(4);
SequenceAcknowledgement ack = null;
if (null != is) {
ack = PersistenceUtils.getInstance()
.deserialiseAcknowledgment(is);
}
- return new DestinationSequence(sid, acksTo, lm, ack, protocol);
+ return new DestinationSequence(sid, acksTo, lm, ack, pv);
}
} catch (SQLException ex) {
LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(), ex);
}
return null;
}
-
- public SourceSequence getSourceSequence(Identifier sid, ProtocolVariation protocol) {
+
+ public SourceSequence getSourceSequence(Identifier sid) {
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting source sequences for id: " + sid);
}
@@ -316,8 +322,9 @@ public class RMTxStore implements RMStor
if (null != oidValue) {
oi = RMUtils.getWSRMFactory().createIdentifier();
oi.setValue(oidValue);
- }
- return new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
+ }
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(5));
+ return new SourceSequence(sid, expiry, oi, cmn, lm, pv);
}
} catch (SQLException ex) {
@@ -364,8 +371,7 @@ public class RMTxStore implements RMStor
}
}
- public Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier,
- ProtocolVariation protocol) {
+ public Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier) {
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting destination sequences for endpoint: " + endpointIdentifier);
}
@@ -383,13 +389,14 @@ public class RMTxStore implements RMStor
sid.setValue(res.getString(1));
EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));
long lm = res.getLong(3);
- InputStream is = res.getBinaryStream(4);
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
+ InputStream is = res.getBinaryStream(5);
SequenceAcknowledgement ack = null;
if (null != is) {
ack = PersistenceUtils.getInstance()
.deserialiseAcknowledgment(is);
}
- DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, protocol);
+ DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, pv);
seqs.add(seq);
}
} catch (SQLException ex) {
@@ -398,8 +405,7 @@ public class RMTxStore implements RMStor
return seqs;
}
- public Collection<SourceSequence> getSourceSequences(String endpointIdentifier,
- ProtocolVariation protocol) {
+ public Collection<SourceSequence> getSourceSequences(String endpointIdentifier) {
if (LOG.isLoggable(Level.FINE)) {
LOG.info("Getting source sequences for endpoint: " + endpointIdentifier);
}
@@ -424,8 +430,9 @@ public class RMTxStore implements RMStor
if (null != oidValue) {
oi = new Identifier();
oi.setValue(oidValue);
- }
- SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
+ }
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(6));
+ SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, pv);
seqs.add(seq);
}
} catch (SQLException ex) {
@@ -747,6 +754,21 @@ public class RMTxStore implements RMStor
}
+ protected static String encodeProtocolVersion(ProtocolVariation pv) {
+ return pv.getCodec().getWSRMNamespace() + ' ' + pv.getCodec().getWSANamespace();
+ }
+
+ protected static ProtocolVariation decodeProtocolVersion(String pv) {
+ if (null != pv) {
+ int d = pv.indexOf(' ');
+ if (d > 0) {
+ return ProtocolVariation.findVariant(pv.substring(0, d), pv.substring(d + 1));
+ }
+ }
+ return ProtocolVariation.RM10WSA200408;
+ }
+
+
private static void recursiveDelete(File dir, boolean now) {
for (File f : dir.listFiles()) {
if (f.isDirectory()) {
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java Fri Mar 30 20:50:22 2012
@@ -357,6 +357,8 @@ public class RMSoapInterceptor extends A
}
RMProperties rmps = RMContextUtils.retrieveRMProperties(message, false);
rmps.exposeAs(consts.getWSRMNamespace());
+ ProtocolVariation protocol =
+ ProtocolVariation.findVariant(consts.getWSRMNamespace(), maps.getNamespaceURI());
LOG.info("Updating service model info in exchange");
@@ -372,16 +374,16 @@ public class RMSoapInterceptor extends A
}
Exchange exchange = message.getExchange();
-
- exchange.put(Endpoint.class, rme.getEndpoint());
- exchange.put(Service.class, rme.getService());
- exchange.put(Binding.class, rme.getEndpoint().getBinding());
+ Endpoint ep = rme.getEndpoint(protocol);
+ exchange.put(Endpoint.class, ep);
+ exchange.put(Service.class, ep.getService());
+ exchange.put(Binding.class, ep.getBinding());
// Also set BindingOperationInfo as some operations (SequenceAcknowledgment) have
// neither in nor out messages, and thus the WrappedInInterceptor cannot
// determine the operation name.
- BindingInfo bi = rme.getEndpoint().getEndpointInfo().getBinding();
+ BindingInfo bi = ep.getEndpointInfo().getBinding();
BindingOperationInfo boi = null;
boolean isOneway = true;
if (consts.getCreateSequenceAction().equals(action)) {
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Fri Mar 30 20:50:22 2012
@@ -54,6 +54,7 @@ import org.apache.cxf.ws.addressing.Attr
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.policy.AssertionInfo;
import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
+import org.apache.cxf.ws.rm.ProtocolVariation;
import org.apache.cxf.ws.rm.RMContextUtils;
import org.apache.cxf.ws.rm.RMException;
import org.apache.cxf.ws.rm.RMManager;
@@ -372,8 +373,8 @@ public class RetransmissionQueueImpl imp
final String address = to.getValue();
LOG.fine("Resending to address: " + address);
-
- final Endpoint reliableEndpoint = manager.getReliableEndpoint(message).getEndpoint();
+ final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
+ final Endpoint reliableEndpoint = manager.getReliableEndpoint(message).getEndpoint(protocol);
ConduitSelector cs = new DeferredConduitSelector() {
@Override
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Fri Mar 30 20:50:22 2012
@@ -110,11 +110,12 @@ public class DestinationTest extends Ass
Message message = setupMessage();
RMProperties rmps = control.createMock(RMProperties.class);
EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps);
+ EasyMock.expect(RMContextUtils.getProtocolVariation(message))
+ .andReturn(ProtocolVariation.RM10WSA200408);
SequenceType st = control.createMock(SequenceType.class);
EasyMock.expect(rmps.getSequence()).andReturn(st);
Identifier id = control.createMock(Identifier.class);
EasyMock.expect(st.getIdentifier()).andReturn(id).times(2);
- EasyMock.expect(rme.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408).anyTimes();
String sid = "sid";
EasyMock.expect(id.getValue()).andReturn(sid);
control.replay();
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1307604&r1=1307603&r2=1307604&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Fri Mar 30 20:50:22 2012
@@ -290,7 +290,6 @@ public class ManagedRMManagerTest extend
EasyMock.expect(rme.getManager()).andReturn(manager).anyTimes();
EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
EasyMock.expect(rme.getDestination()).andReturn(destination).anyTimes();
- EasyMock.expect(rme.getProtocol()).andReturn(null).anyTimes();
control.replay();
setCurrentMessageNumber(sss.get(0), 5L);
@@ -314,8 +313,10 @@ public class ManagedRMManagerTest extend
private List<SourceSequence> createTestSourceSequences(Source source,
EndpointReferenceType to) {
List<SourceSequence> sss = new ArrayList<SourceSequence>();
- sss.add(createTestSourceSequence(source, "seq1", to, new long[]{1L, 1L, 3L, 3L}));
- sss.add(createTestSourceSequence(source, "seq2", to, new long[]{1L, 1L, 3L, 3L}));
+ sss.add(createTestSourceSequence(source, "seq1", to,
+ ProtocolVariation.RM10WSA200408, new long[]{1L, 1L, 3L, 3L}));
+ sss.add(createTestSourceSequence(source, "seq2", to,
+ ProtocolVariation.RM11WSA200508, new long[]{1L, 1L, 3L, 3L}));
return sss;
}
@@ -323,17 +324,20 @@ public class ManagedRMManagerTest extend
private List<DestinationSequence> createTestDestinationSequences(Destination destination,
EndpointReferenceType to) {
List<DestinationSequence> dss = new ArrayList<DestinationSequence>();
- dss.add(createTestDestinationSequence(destination, "seq3", to, new long[]{1L, 1L, 3L, 3L}));
- dss.add(createTestDestinationSequence(destination, "seq4", to, new long[]{1L, 1L, 3L, 3L}));
+ dss.add(createTestDestinationSequence(destination, "seq3", to,
+ ProtocolVariation.RM10WSA200408, new long[]{1L, 1L, 3L, 3L}));
+ dss.add(createTestDestinationSequence(destination, "seq4", to,
+ ProtocolVariation.RM11WSA200508, new long[]{1L, 1L, 3L, 3L}));
return dss;
}
private SourceSequence createTestSourceSequence(Source source, String sid,
- EndpointReferenceType to, long[] acked) {
+ EndpointReferenceType to,
+ ProtocolVariation protocol, long[] acked) {
Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
identifier.setValue(sid);
- SourceSequence ss = new SourceSequence(identifier, null);
+ SourceSequence ss = new SourceSequence(identifier, protocol);
ss.setSource(source);
ss.setTarget(to);
List<SequenceAcknowledgement.AcknowledgementRange> ranges =
@@ -345,10 +349,11 @@ public class ManagedRMManagerTest extend
}
private DestinationSequence createTestDestinationSequence(Destination destination, String sid,
- EndpointReferenceType to, long[] acked) {
+ EndpointReferenceType to,
+ ProtocolVariation protocol, long[] acked) {
Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
identifier.setValue(sid);
- DestinationSequence ds = new DestinationSequence(identifier, to, null, null);
+ DestinationSequence ds = new DestinationSequence(identifier, to, null, protocol);
ds.setDestination(destination);
List<SequenceAcknowledgement.AcknowledgementRange> ranges =
@@ -387,7 +392,6 @@ public class ManagedRMManagerTest extend
EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
EasyMock.expect(exchange.get(Endpoint.class)).andReturn(endpoint);
- EasyMock.expect(exchange.getOutMessage()).andReturn(message);
control.replay();
return manager.getReliableEndpoint(message);