You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2007/05/09 16:27:12 UTC
svn commit: r536543 [1/2] - in /incubator/cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/ systests/...
Author: andreasmyth
Date: Wed May 9 07:27:10 2007
New Revision: 536543
URL: http://svn.apache.org/viewvc?view=rev&rev=536543
Log:
[JIRA CXF-282] Try to terminate active sequences when endpoint/bus is shutdown by sending a (soap) message with an empty body and a Sequence header with LastMessage element, which must be responded to with a SequenceAcknowledgment. This in turn triggers sequence termination if all messages are acked.
Added:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/terminate-on-shutdown.xml (with props)
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/SourceSequenceTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Wed May 9 07:27:10 2007
@@ -87,6 +87,7 @@
if (null == sequenceType) {
return;
}
+
DestinationSequence seq = getSequence(sequenceType.getIdentifier());
if (null != seq) {
@@ -94,29 +95,49 @@
seq.acknowledge(message);
if (null != sequenceType.getLastMessage()) {
-
seq.setLastMessageNumber(sequenceType.getMessageNumber());
-
- seq.scheduleImmediateAcknowledgement();
-
- // if we cannot expect an outgoing message to which the
- // acknowledgement
- // can be added we need to send an out-of-band
- // SequenceAcknowledgement message
-
- AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
- String replyToAddress = null;
- if (null != maps.getReplyTo()) {
- replyToAddress = maps.getReplyTo().getAddress().getValue();
- }
- if (!(seq.getAcksTo().getAddress().getValue().equals(replyToAddress) || seq
- .canPiggybackAckOnPartialResponse())) {
- getReliableEndpoint().getProxy().acknowledge(seq);
- }
+ ackImmediately(seq, message);
}
} else {
SequenceFaultFactory sff = new SequenceFaultFactory();
throw sff.createUnknownSequenceFault(sequenceType.getIdentifier());
+ }
+ }
+
+ void ackRequested(Message message) throws SequenceFault, RMException {
+ // TODO
+ Collection<AckRequestedType> ars = RMContextUtils.retrieveRMProperties(message, false)
+ .getAcksRequested();
+ if (null == ars) {
+ return;
+ }
+ for (AckRequestedType ar : ars) {
+ Identifier id = ar.getIdentifier();
+ DestinationSequence seq = getSequence(id);
+ if (null == seq) {
+ continue;
+ }
+ ackImmediately(seq, message);
+ }
+ }
+
+ void ackImmediately(DestinationSequence seq, Message message) throws RMException {
+
+ seq.scheduleImmediateAcknowledgement();
+
+ // if we cannot expect an outgoing message to which the
+ // acknowledgement
+ // can be added we need to send an out-of-band
+ // SequenceAcknowledgement message
+
+ AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
+ String replyToAddress = null;
+ if (null != maps.getReplyTo()) {
+ replyToAddress = maps.getReplyTo().getAddress().getValue();
+ }
+ if (!(seq.getAcksTo().getAddress().getValue().equals(replyToAddress) || seq
+ .canPiggybackAckOnPartialResponse())) {
+ getReliableEndpoint().getProxy().acknowledge(seq);
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Wed May 9 07:27:10 2007
@@ -181,9 +181,6 @@
void mergeRanges() {
List<AcknowledgementRange> ranges = acknowledgement.getAcknowledgementRange();
- if (null == ranges) {
- return;
- }
for (int i = ranges.size() - 1; i > 0; i--) {
AcknowledgementRange current = ranges.get(i);
AcknowledgementRange previous = ranges.get(i - 1);
@@ -351,6 +348,12 @@
for (int i = deferredAcknowledgments.size() - 1; i >= 0; i--) {
DeferredAcknowledgment da = deferredAcknowledgments.get(i);
da.cancel();
+ }
+ }
+
+ synchronized void cancelTermination() {
+ if (null != scheduledTermination) {
+ scheduledTermination.cancel();
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties Wed May 9 07:27:10 2007
@@ -24,6 +24,8 @@
SEQ_TERMINATION_FAILURE = Failed to terminate sequence {0}.
STANDALONE_ANON_ACKS_NOT_SUPPORTED = It is not possible to send out-of-band acknowledgments to the anonymous address.\nAn acknowledgement will be piggybacked on the next response.
+STANDALONE_LAST_MESSAGE_NO_TARGET_MSG = No target address to send out-of-band last message to.
+STANDALONE_LAST_MESSAGE_ANON_TARGET_MSG = It is not possible to send an out-of-band last message to the anonymous address.
POLICY_PROVIDER_CREATION_EXC = Failed to create provider for RM assertion.
POLICY_REFERENCE_RESOLUTION_EXC = Policy reference {0} cannot be resolved.
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Wed May 9 07:27:10 2007
@@ -40,6 +40,7 @@
import org.apache.cxf.service.model.InterfaceInfo;
import org.apache.cxf.service.model.OperationInfo;
import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.RelatesToType;
import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
import org.apache.cxf.ws.rm.manager.SourcePolicyType;
@@ -161,9 +162,57 @@
}
void lastMessage(SourceSequence s) throws RMException {
- // TODO
+ org.apache.cxf.ws.addressing.EndpointReferenceType target = s.getTarget();
+ AttributedURIType uri = null;
+ if (null != target) {
+ uri = target.getAddress();
+ }
+ String addr = null;
+ if (null != uri) {
+ addr = uri.getValue();
+ }
+
+ if (addr == null) {
+ LOG.log(Level.WARNING, "STANDALONE_LAST_MESSAGE_NO_TARGET_MSG");
+ return;
+ }
+
+ if (RMUtils.getAddressingConstants().getAnonymousURI().equals(addr)) {
+ LOG.log(Level.WARNING, "STANDALONE_LAST_MESSAGE_ANON_TARGET_MSG");
+ return;
+ }
+
+ OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
+ .getOperation(RMConstants.getLastMessageOperationName());
+ invoke(oi, new Object[] {}, null);
}
+ void ackRequested(SourceSequence s) throws RMException {
+ org.apache.cxf.ws.addressing.EndpointReferenceType target = s.getTarget();
+ AttributedURIType uri = null;
+ if (null != target) {
+ uri = target.getAddress();
+ }
+ String addr = null;
+ if (null != uri) {
+ addr = uri.getValue();
+ }
+
+ if (addr == null) {
+ LOG.log(Level.WARNING, "STANDALONE_ACK_REQUESTED_NO_TARGET_MSG");
+ return;
+ }
+
+ if (RMUtils.getAddressingConstants().getAnonymousURI().equals(addr)) {
+ LOG.log(Level.WARNING, "STANDALONE_ACK_REQUESTED_ANON_TARGET_MSG");
+ return;
+ }
+
+ OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
+ .getOperation(RMConstants.getAckRequestedOperationName());
+ invoke(oi, new Object[] {}, null);
+ }
+
Identifier getOfferedIdentifier() {
return offeredIdentifier;
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java Wed May 9 07:27:10 2007
@@ -31,6 +31,7 @@
*/
public final class RMConstants {
+ // namespaces
private static final String WSRM_NAMESPACE_NAME =
"http://schemas.xmlsoap.org/ws/2005/02/rm";
@@ -46,6 +47,8 @@
WSRM_NAMESPACE_NAME + "/wsdl";
+ // element and header names
+
private static final String WSRM_SEQUENCE_NAME =
"Sequence";
@@ -81,11 +84,13 @@
private static final QName RMASSERTION_QNAME =
new QName(WSRMP_NAMESPACE_NAME, RMASSERTION_NAME);
- /**
- * The set of headers understood by the protocol binding.
- */
private static final Set<QName> HEADERS;
+ // protocol operation names
+
+ private static final QName WSRM_PORT_NAME =
+ new QName(WSRM_WSDL_NAMESPACE_NAME, "SequenceAbstractSoapPort");
+
private static final QName WSRM_CREATE_SEQUENCE_QNAME =
new QName(WSRM_WSDL_NAMESPACE_NAME, "CreateSequence");
@@ -103,7 +108,16 @@
private static final QName WSRM_SEQUENCE_ACKNOWLEDGEMENT_QNAME =
new QName(WSRM_WSDL_NAMESPACE_NAME, "SequenceAcknowledgement");
+
+ private static final QName WSRM_LAST_MESSAGE_QNAME =
+ new QName(WSRM_WSDL_NAMESPACE_NAME, "LastMessage");
+
+ private static final QName WSRM_ACK_REQ_QNAME =
+ new QName(WSRM_WSDL_NAMESPACE_NAME, "AckRequested");
+
+ // actions
+
private static final String WSRM_CREATE_SEQUENCE_ACTION =
WSRM_NAMESPACE_NAME + "/CreateSequence";
@@ -119,12 +133,17 @@
private static final String WSRM_LAST_MESSAGE_ACTION =
WSRM_NAMESPACE_NAME + "/LastMessage";
+ private static final String WSRM_ACK_REQUESTED_ACTION =
+ WSRM_NAMESPACE_NAME + "/AckRequested";
+
private static final String WSRM_SEQUENCE_ACKNOWLEDGMENT_ACTION =
WSRM_NAMESPACE_NAME + "/SequenceAcknowledgement";
private static final String WSRM_SEQUENCE_INFO_ACTION =
WSRM_NAMESPACE_NAME + "/SequenceInfo";
+ // fault codes
+
private static final String WSRM_UNKNOWN_SEQUENCE_FAULT_CODE =
"UnknownSequence";
@@ -224,6 +243,10 @@
// service model constants
+ public static QName getPortName() {
+ return WSRM_PORT_NAME;
+ }
+
public static QName getCreateSequenceOperationName() {
return WSRM_CREATE_SEQUENCE_QNAME;
}
@@ -248,6 +271,14 @@
return WSRM_SEQUENCE_ACKNOWLEDGEMENT_QNAME;
}
+ public static QName getLastMessageOperationName() {
+ return WSRM_LAST_MESSAGE_QNAME;
+ }
+
+ public static QName getAckRequestedOperationName() {
+ return WSRM_ACK_REQ_QNAME;
+ }
+
// actions
public static String getCreateSequenceAction() {
@@ -268,6 +299,10 @@
public static String getLastMessageAction() {
return WSRM_LAST_MESSAGE_ACTION;
+ }
+
+ public static String getAckRequestedAction() {
+ return WSRM_ACK_REQUESTED_ACTION;
}
public static String getSequenceAcknowledgmentAction() {
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Wed May 9 07:27:10 2007
@@ -21,6 +21,8 @@
import java.util.Collection;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.wsdl.extensions.ExtensibilityElement;
import javax.xml.bind.JAXBException;
@@ -28,6 +30,7 @@
import org.apache.cxf.binding.soap.model.SoapBindingInfo;
import org.apache.cxf.binding.soap.model.SoapOperationInfo;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.databinding.DataBinding;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.interceptor.Interceptor;
@@ -48,19 +51,22 @@
import org.apache.cxf.ws.policy.EndpointPolicy;
import org.apache.cxf.ws.policy.PolicyEngine;
import org.apache.cxf.ws.policy.PolicyInterceptorProviderRegistry;
+import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
+import org.apache.cxf.ws.rm.manager.SourcePolicyType;
import org.apache.neethi.Assertion;
import org.apache.neethi.Policy;
public class RMEndpoint {
+ private static final Logger LOG = LogUtils.getL7dLogger(RMEndpoint.class);
+
private static final QName SERVICE_NAME =
new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractService");
private static final QName INTERFACE_NAME =
new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractPortType");
private static final QName BINDING_NAME =
new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractSoapBinding");
- private static final QName PORT_NAME =
- new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractSoapPort");
+
private static final QName CREATE_PART_NAME =
new QName(RMConstants.getWsdlNamespace(), "create");
private static final QName CREATE_RESPONSE_PART_NAME =
@@ -172,6 +178,34 @@
}
/**
+ * @return The time when last application message was received.
+ */
+ public long getLastApplicationMessage() {
+ return lastApplicationMessage;
+ }
+
+ /**
+ * Indicates that an application message has been received.
+ */
+ public void receivedApplicationMessage() {
+ lastApplicationMessage = System.currentTimeMillis();
+ }
+
+ /**
+ * @return The time when last RM protocol message was received.
+ */
+ public long getLastControlMessage() {
+ return lastControlMessage;
+ }
+
+ /**
+ * Indicates that an RM protocol message has been received.
+ */
+ public void receivedControlMessage() {
+ lastControlMessage = System.currentTimeMillis();
+ }
+
+ /**
* @return Returns the conduit.
*/
public Conduit getConduit() {
@@ -226,7 +260,7 @@
ei.setAddress(aei.getAddress());
- ei.setName(PORT_NAME);
+ ei.setName(RMConstants.getPortName());
ei.setBinding(si.getBinding(BINDING_NAME));
// if addressing was enabled on the application endpoint by means
@@ -286,6 +320,8 @@
buildCreateSequenceOperationInfo(ii);
buildTerminateSequenceOperationInfo(ii);
buildSequenceAckOperationInfo(ii);
+ buildLastMessageOperationInfo(ii);
+ buildAckRequestedOperationInfo(ii);
// TODO: FaultInfo (SequenceFault)
}
@@ -353,6 +389,26 @@
messageInfo = operationInfo.createMessage(RMConstants.getSequenceAckOperationName());
operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
}
+
+ void buildLastMessageOperationInfo(InterfaceInfo ii) {
+
+ OperationInfo operationInfo = null;
+ MessageInfo messageInfo = null;
+
+ operationInfo = ii.addOperation(RMConstants.getLastMessageOperationName());
+ messageInfo = operationInfo.createMessage(RMConstants.getLastMessageOperationName());
+ operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
+ }
+
+ void buildAckRequestedOperationInfo(InterfaceInfo ii) {
+
+ OperationInfo operationInfo = null;
+ MessageInfo messageInfo = null;
+
+ operationInfo = ii.addOperation(RMConstants.getAckRequestedOperationName());
+ messageInfo = operationInfo.createMessage(RMConstants.getAckRequestedOperationName());
+ operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
+ }
void buildBindingInfo(ServiceInfo si) {
// use same binding id as for application endpoint
@@ -385,6 +441,20 @@
boi.addExtensor(soi);
bi.addOperation(boi);
+ boi = bi.buildOperation(RMConstants.getLastMessageOperationName(), null, null);
+ assert null != boi;
+ soi = new SoapOperationInfo();
+ soi.setAction(RMConstants.getLastMessageAction());
+ boi.addExtensor(soi);
+ bi.addOperation(boi);
+
+ boi = bi.buildOperation(RMConstants.getAckRequestedOperationName(), null, null);
+ assert null != boi;
+ soi = new SoapOperationInfo();
+ soi.setAction(RMConstants.getAckRequestedAction());
+ boi.addExtensor(soi);
+ bi.addOperation(boi);
+
boi = bi.buildOperation(RMConstants.getCreateSequenceOnewayOperationName(),
RMConstants.getCreateSequenceOperationName().getLocalPart(), null);
soi = new SoapOperationInfo();
@@ -450,20 +520,49 @@
manager = m;
}
- public long getLastApplicationMessage() {
- return lastApplicationMessage;
- }
-
- public void receivedApplicationMessage() {
- lastApplicationMessage = System.currentTimeMillis();
- }
-
- public long getLastControlMessage() {
- return lastControlMessage;
- }
-
- public void receivedControlMessage() {
- lastControlMessage = System.currentTimeMillis();
+ void shutdown() {
+ // cancel outstanding timer tasks (deferred acknowledgements)
+ // and scheduled termination for all
+ // destination sequences of this endpoint
+
+ for (DestinationSequence ds : getDestination().getAllSequences()) {
+ ds.cancelDeferredAcknowledgments();
+ ds.cancelTermination();
+ }
+
+ // try terminating sequences
+ SourcePolicyType sp = manager.getSourcePolicy();
+ SequenceTerminationPolicyType stp = null;
+ if (null != sp) {
+ stp = sp.getSequenceTerminationPolicy();
+ }
+ if (null != stp && stp.isTerminateOnShutdown()) {
+
+ Collection<SourceSequence> seqs = source.getAllUnacknowledgedSequences();
+ LOG.log(Level.FINE, "Trying to terminate {0} sequences", seqs.size());
+ for (SourceSequence seq : seqs) {
+ try {
+ // destination MUST respond with a
+ // sequence acknowledgement
+ if (seq.isLastMessage()) {
+ // REVISIT: this may be non-standard
+ // getProxy().ackRequested(seq);
+ } else {
+
+ getProxy().lastMessage(seq);
+ }
+ } catch (RMException ex) {
+ // already logged
+ }
+ }
+ }
+
+ // cancel outstanding resends for all source sequences
+ // of this endpoint
+
+ for (SourceSequence ss : getSource().getAllSequences()) {
+ manager.getRetransmissionQueue().stop(ss);
+ }
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Wed May 9 07:27:10 2007
@@ -79,12 +79,12 @@
// for application AND out of band messages
RMEndpoint rme = getManager().getReliableEndpoint(message);
+ Destination destination = getManager().getDestination(message);
- if (isApplicationMessage) {
- Destination destination = getManager().getDestination(message);
+ if (isApplicationMessage) {
if (null != rmps) {
- processAcknowledgments(rmps);
- processAcknowledgmentRequests(rmps);
+ processAcknowledgments(rme.getSource(), rmps);
+ processAcknowledgmentRequests(destination, message);
processSequence(destination, message);
processDeliveryAssurance(rmps);
}
@@ -92,7 +92,9 @@
} else {
rme.receivedControlMessage();
if (RMConstants.getSequenceAckAction().equals(action)) {
- processAcknowledgments(rmps);
+ processAcknowledgments(rme.getSource(), rmps);
+ } else if (RMConstants.getLastMessageAction().equals(action)) {
+ processSequence(destination, message);
} else if (RMConstants.getCreateSequenceAction().equals(action) && !isServer) {
LOG.fine("Processing inbound CreateSequence on client side.");
Servant servant = rme.getServant();
@@ -106,13 +108,13 @@
assertReliability(message);
}
- void processAcknowledgments(RMProperties rmps) throws SequenceFault, RMException {
+ void processAcknowledgments(Source source, RMProperties rmps) throws SequenceFault, RMException {
Collection<SequenceAcknowledgement> acks = rmps.getAcks();
if (null != acks) {
for (SequenceAcknowledgement ack : acks) {
Identifier id = ack.getIdentifier();
- SourceSequence ss = getManager().getSourceSequence(id);
+ SourceSequence ss = source.getSequence(id);
if (null != ss) {
ss.setAcknowledged(ack);
} else {
@@ -122,13 +124,13 @@
}
}
- void processAcknowledgmentRequests(RMProperties rmps) {
- // TODO
+ void processAcknowledgmentRequests(Destination destination, Message message)
+ throws SequenceFault, RMException {
+ destination.ackRequested(message);
}
void processSequence(Destination destination, Message message)
throws SequenceFault, RMException {
-
destination.acknowledge(message);
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed May 9 07:27:10 2007
@@ -35,6 +35,8 @@
import org.apache.cxf.binding.Binding;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.endpoint.ServerLifeCycleListener;
import org.apache.cxf.message.Message;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
@@ -63,8 +65,25 @@
private SequenceIdentifierGenerator idGenerator;
private RetransmissionQueue retransmissionQueue;
private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
- private Map<String, SourceSequence> sourceSequences;
private Timer timer = new Timer(true);
+ private final ServerLifeCycleListener serverLifeCycleListener;
+
+ public RMManager() {
+ serverLifeCycleListener = new ServerLifeCycleListener() {
+
+ public void startServer(Server server) {
+ }
+
+ public void stopServer(Server server) {
+ RMManager.this.shutdownReliableEndpoint(server.getEndpoint());
+ }
+
+ };
+ }
+
+ public ServerLifeCycleListener getServerLifeCycleListener() {
+ return serverLifeCycleListener;
+ }
public Bus getBus() {
return bus;
@@ -116,18 +135,17 @@
public synchronized RMEndpoint getReliableEndpoint(Message message) {
Endpoint endpoint = message.getExchange().get(Endpoint.class);
+ QName name = endpoint.getEndpointInfo().getName();
if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Getting RMEndpoint for endpoint with info: " + endpoint.getEndpointInfo().getName());
+ LOG.fine("Getting RMEndpoint for endpoint with info: " + name);
}
- if (endpoint.getEndpointInfo().getName().equals(
- new QName(RMConstants.getWsdlNamespace(),
- "SequenceAbstractSoapPort"))) {
+ if (name.equals(RMConstants.getPortName())) {
WrappedEndpoint wrappedEndpoint = (WrappedEndpoint)endpoint;
endpoint = wrappedEndpoint.getWrappedEndpoint();
}
RMEndpoint rme = reliableEndpoints.get(endpoint);
if (null == rme) {
- rme = new RMEndpoint(this, endpoint);
+ rme = createReliableEndpoint(this, endpoint);
org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
org.apache.cxf.ws.addressing.EndpointReferenceType replyTo = null;
if (null != destination) {
@@ -149,10 +167,6 @@
return null;
}
- public SourceSequence getSourceSequence(Identifier id) {
- return sourceSequences.get(id.getValue());
- }
-
public Source getSource(Message message) {
RMEndpoint rme = getReliableEndpoint(message);
if (null != rme) {
@@ -210,21 +224,15 @@
}
return seq;
- }
+ }
@PreDestroy
public void shutdown() {
- // shutdown retransmission queue
- if (null != retransmissionQueue) {
- retransmissionQueue.stop();
- }
-
- // cancel outstanding timer tasks (deferred acknowledgements) for all
- // destination sequences
+
+ // shutdown remaining endpoints
+
for (RMEndpoint rme : reliableEndpoints.values()) {
- for (DestinationSequence ds : rme.getDestination().getAllSequences()) {
- ds.cancelDeferredAcknowledgments();
- }
+ rme.shutdown();
}
// remove references to timer tasks cancelled above to make them
@@ -232,6 +240,26 @@
timer.purge();
timer.cancel();
}
+
+ synchronized void shutdownReliableEndpoint(Endpoint e) {
+ RMEndpoint rme = reliableEndpoints.get(e);
+ if (null == rme) {
+ // not interested
+ return;
+ }
+
+ rme.shutdown();
+
+ // remove references to timer tasks cancelled above to make them
+ // eligible for garbage collection
+ timer.purge();
+
+ reliableEndpoints.remove(e);
+ }
+
+ RMEndpoint createReliableEndpoint(RMManager manager, Endpoint endpoint) {
+ return new RMEndpoint(manager, endpoint);
+ }
@PostConstruct
void initialise() {
@@ -245,13 +273,9 @@
setDeliveryAssurance(da);
}
if (!isSetSourcePolicy()) {
- SourcePolicyType sp = factory.createSourcePolicyType();
- setSourcePolicy(sp);
+ setSourcePolicy(null);
- }
- if (!getSourcePolicy().isSetSequenceTerminationPolicy()) {
- getSourcePolicy().setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
- }
+ }
if (!isSetDestinationPolicy()) {
DestinationPolicyType dp = factory.createDestinationPolicyType();
dp.setAcksPolicy(factory.createAcksPolicyType());
@@ -285,17 +309,25 @@
super.setRMAssertion(rma);
}
- void addSourceSequence(SourceSequence ss) {
- if (null == sourceSequences) {
- sourceSequences = new HashMap<String, SourceSequence>();
+
+ @Override
+ public void setSourcePolicy(SourcePolicyType sp) {
+ org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
+ if (null == sp) {
+ sp = factory.createSourcePolicyType();
}
- sourceSequences.put(ss.getIdentifier().getValue(), ss);
+ if (!sp.isSetSequenceTerminationPolicy()) {
+ sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
+ }
+ super.setSourcePolicy(sp);
}
- void removeSourceSequence(Identifier id) {
- if (null != sourceSequences) {
- sourceSequences.remove(id.getValue());
- }
+ Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
+ return reliableEndpoints;
+ }
+
+ void setReliableEndpointsMap(Map<Endpoint, RMEndpoint> map) {
+ reliableEndpoints = map;
}
class DefaultSequenceIdentifierGenerator implements SequenceIdentifierGenerator {
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Wed May 9 07:27:10 2007
@@ -44,14 +44,12 @@
addAfter(MAPAggregator.class.getName());
}
- protected void handle(Message message) throws SequenceFault, RMException {
- LOG.entering(getClass().getName(), "handleMessage");
-
+ protected void handle(Message message) throws SequenceFault, RMException {
if (isRuntimeFault(message)) {
LogUtils.log(LOG, Level.WARNING, "RUNTIME_FAULT_MSG");
// TODO: in case of a SequenceFault need to set action
// to http://schemas.xmlsoap.org/ws/2004/a08/addressing/fault
- // but: need to defer propagation of received MAPS to oubound chain
+ // but: need to defer propagation of received MAPS to outbound chain first
return;
}
@@ -78,8 +76,7 @@
boolean isApplicationMessage = !RMContextUtils.isRMProtocolMessage(action);
boolean isPartialResponse = MessageUtils.isPartialResponse(message);
- LOG.fine("isApplicationMessage: " + isApplicationMessage);
- LOG.fine("isPartialResponse: " + isPartialResponse);
+ boolean isLastMessage = RMConstants.getLastMessageAction().equals(action);
if (isApplicationMessage && !isPartialResponse) {
RetransmissionInterceptor ri = new RetransmissionInterceptor();
@@ -113,7 +110,8 @@
}
}
- if (isApplicationMessage && !isPartialResponse) {
+ if ((isApplicationMessage || isLastMessage)
+ && !isPartialResponse) {
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
@@ -128,7 +126,8 @@
// increase message number and store a sequence type object in
// context
- seq.nextMessageNumber(inSeqId, inMessageNumber);
+ seq.nextMessageNumber(inSeqId, inMessageNumber, isLastMessage);
+
rmpsOut.setSequence(seq);
// if this was the last message in the sequence, reset the
@@ -197,7 +196,7 @@
if (LOG.isLoggable(Level.FINE)) {
Collection<SequenceAcknowledgement> acks = rmpsOut.getAcks();
if (null == acks) {
- LOG.fine("No acknowledgements added");
+ LOG.fine("No acknowledgements added.");
} else {
LOG.fine("Added " + acks.size() + " acknowledgements.");
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Wed May 9 07:27:10 2007
@@ -63,7 +63,7 @@
/**
* Stops retransmission queue.
*/
- void stop();
+ void stop(SourceSequence seq);
/**
* Populates the retransmission queue with messages recovered from
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java Wed May 9 07:27:10 2007
@@ -58,9 +58,7 @@
}
public void addSequence(SourceSequence seq) {
- addSequence(seq, true);
- getReliableEndpoint().getManager().addSourceSequence(seq);
-
+ addSequence(seq, true);
}
public void addSequence(SourceSequence seq, boolean persist) {
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java Wed May 9 07:27:10 2007
@@ -143,7 +143,7 @@
RMEndpoint rme = source.getReliableEndpoint();
Proxy proxy = rme.getProxy();
proxy.terminate(this);
- source.getManager().removeSourceSequence(id);
+ source.removeSequence(this);
}
}
@@ -194,7 +194,7 @@
* @return the next message number.
*/
BigInteger nextMessageNumber() {
- return nextMessageNumber(null, null);
+ return nextMessageNumber(null, null, false);
}
/**
@@ -206,27 +206,22 @@
*
* @return the next message number.
*/
- public BigInteger nextMessageNumber(Identifier inSeqId, BigInteger inMsgNumber) {
+ public BigInteger nextMessageNumber(Identifier inSeqId, BigInteger inMsgNumber, boolean last) {
assert !lastMessage;
BigInteger result = null;
synchronized (this) {
currentMessageNumber = currentMessageNumber.add(BigInteger.ONE);
- checkLastMessage(inSeqId, inMsgNumber);
+ if (last) {
+ lastMessage = true;
+ } else {
+ checkLastMessage(inSeqId, inMsgNumber);
+ }
result = currentMessageNumber;
}
return result;
}
-
- void nextAndLastMessageNumber() {
- assert !lastMessage;
-
- synchronized (this) {
- currentMessageNumber = currentMessageNumber.add(BigInteger.ONE);
- lastMessage = true;
- }
- }
-
+
SequenceAcknowledgement getAcknowledgement() {
return acknowledgement;
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java Wed May 9 07:27:10 2007
@@ -473,6 +473,8 @@
boi = bi.getOperation(RMConstants.getSequenceAckOperationName());
} else if (RMConstants.getTerminateSequenceAction().equals(action)) {
boi = bi.getOperation(RMConstants.getTerminateSequenceOperationName());
+ } else if (RMConstants.getLastMessageAction().equals(action)) {
+ boi = bi.getOperation(RMConstants.getLastMessageOperationName());
}
assert boi != null;
exchange.put(BindingOperationInfo.class, boi);
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Wed May 9 07:27:10 2007
@@ -160,10 +160,23 @@
}
/**
- * Stops retransmission queue.
+ * Stops resending messages for the specified source sequence.
*/
- public void stop() {
- // no-op
+ public void stop(SourceSequence seq) {
+ synchronized (this) {
+ List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+ if (null != sequenceCandidates) {
+ for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+ ResendCandidate candidate = sequenceCandidates.get(i);
+ candidate.cancel();
+ }
+ LOG.log(Level.FINE, "Cancelled resends for sequence {0}.", seq.getIdentifier().getValue());
+ }
+ }
+ }
+
+ void stop() {
+
}
/**
@@ -421,6 +434,15 @@
protected synchronized void resolved() {
pending = false;
next = null;
+ if (null != nextTask) {
+ nextTask.cancel();
+ }
+ }
+
+ /**
+ * Cancel further resend (although no ACK has been received).
+ */
+ protected void cancel() {
if (null != nextTask) {
nextTask.cancel();
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java Wed May 9 07:27:10 2007
@@ -63,4 +63,18 @@
assertSame(ae, tested.getEndpoint());
assertSame(mgr, tested.getManager());
}
-}
+
+ @Test
+ public void testGenerateSequenceIdentifier() {
+ RMManager mgr = control.createMock(RMManager.class);
+ EasyMock.expect(rme.getManager()).andReturn(mgr);
+ SequenceIdentifierGenerator generator = control.createMock(SequenceIdentifierGenerator.class);
+ EasyMock.expect(mgr.getIdGenerator()).andReturn(generator);
+ Identifier id = control.createMock(Identifier.class);
+ EasyMock.expect(generator.generateSequenceIdentifier()).andReturn(id);
+ control.replay();
+ AbstractEndpoint tested = new AbstractEndpoint(rme);
+ assertSame(id, tested.generateSequenceIdentifier());
+ control.verify();
+ }
+}
\ No newline at end of file
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractRMInterceptorTest.java Wed May 9 07:27:10 2007
@@ -76,7 +76,27 @@
}
@Test
- public void testHandleMessage() {
+ public void testHandleMessageSequenceFaultNoBinding() {
+ RMInterceptor interceptor = new RMInterceptor();
+ Message message = control.createMock(Message.class);
+ SequenceFault sf = control.createMock(SequenceFault.class);
+ interceptor.setSequenceFault(sf);
+ Exchange ex = control.createMock(Exchange.class);
+ EasyMock.expect(message.getExchange()).andReturn(ex);
+ Endpoint e = control.createMock(Endpoint.class);
+ EasyMock.expect(ex.get(Endpoint.class)).andReturn(e);
+ EasyMock.expect(e.getBinding()).andReturn(null);
+ control.replay();
+ try {
+ interceptor.handleMessage(message);
+ fail("Expected Fault not thrown.");
+ } catch (Fault f) {
+ assertSame(sf, f.getCause());
+ }
+ }
+
+ @Test
+ public void testHandleMessageSequenceFault() {
RMInterceptor interceptor = new RMInterceptor();
Message message = control.createMock(Message.class);
SequenceFault sf = control.createMock(SequenceFault.class);
@@ -101,7 +121,21 @@
} catch (Fault f) {
assertSame(f, fault);
}
- control.verify();
+ }
+
+ @Test
+ public void testHandleMessageRMException() {
+ RMInterceptor interceptor = new RMInterceptor();
+ Message message = control.createMock(Message.class);
+ RMException rme = control.createMock(RMException.class);
+ interceptor.setRMException(rme);
+ control.replay();
+ try {
+ interceptor.handleMessage(message);
+ fail("Expected Fault not thrown.");
+ } catch (Fault f) {
+ assertSame(rme, f.getCause());
+ }
}
@Test
@@ -122,21 +156,29 @@
interceptor.assertReliability(message);
assertTrue(!ai.isAsserted());
ais.add(ai);
- interceptor.assertReliability(message);
+ interceptor.assertReliability(message);
+
}
class RMInterceptor extends AbstractRMInterceptor {
private SequenceFault sequenceFault;
+ private RMException rmException;
void setSequenceFault(SequenceFault sf) {
sequenceFault = sf;
}
+ void setRMException(RMException rme) {
+ rmException = rme;
+ }
+
@Override
- protected void handle(Message msg) throws SequenceFault {
+ protected void handle(Message msg) throws SequenceFault, RMException {
if (null != sequenceFault) {
throw sequenceFault;
+ } else if (null != rmException) {
+ throw rmException;
}
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Wed May 9 07:27:10 2007
@@ -21,18 +21,22 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Timer;
import javax.xml.namespace.QName;
+import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.addressing.v200408.AttributedURI;
import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
import org.apache.cxf.ws.rm.manager.AcksPolicyType;
import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.policy.RMAssertion;
import org.apache.cxf.ws.rm.policy.RMAssertion.AcknowledgementInterval;
import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
@@ -647,6 +651,110 @@
// ignore
}
+ control.verify();
+ }
+
+ @Test
+ public void testSequenceTermination() {
+ destination = control.createMock(Destination.class);
+ DestinationSequence seq = new DestinationSequence(id, ref, destination);
+ RMEndpoint rme = control.createMock(RMEndpoint.class);
+ EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme);
+ DestinationSequence.SequenceTermination st = seq.new SequenceTermination();
+ st.updateInactivityTimeout(30000L);
+ long lastAppMessage = System.currentTimeMillis() - 30000L;
+ EasyMock.expect(rme.getLastControlMessage()).andReturn(0L);
+ EasyMock.expect(rme.getLastApplicationMessage()).andReturn(lastAppMessage);
+ destination.removeSequence(seq);
+ EasyMock.expectLastCall();
+ control.replay();
+ st.run();
+ control.verify();
+ }
+
+ @Test
+ public void testSequenceTerminationNotNecessary() {
+ destination = control.createMock(Destination.class);
+ manager = control.createMock(RMManager.class);
+ EasyMock.expect(destination.getManager()).andReturn(manager);
+ Timer t = new Timer();
+ EasyMock.expect(manager.getTimer()).andReturn(t);
+ DestinationSequence seq = new DestinationSequence(id, ref, destination);
+ RMEndpoint rme = control.createMock(RMEndpoint.class);
+ EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme);
+ DestinationSequence.SequenceTermination st = seq.new SequenceTermination();
+ st.updateInactivityTimeout(30000L);
+ long lastAppMessage = System.currentTimeMillis() - 1000L;
+ EasyMock.expect(rme.getLastControlMessage()).andReturn(0L);
+ EasyMock.expect(rme.getLastApplicationMessage()).andReturn(lastAppMessage);
+ EasyMock.expectLastCall();
+ control.replay();
+ st.run();
+ control.verify();
+ }
+
+ @Test
+ public void testCanPiggybackAckOnPartialResponse() {
+ DestinationSequence seq = new DestinationSequence(id, ref, destination);
+ AttributedURI uri = control.createMock(AttributedURI.class);
+ EasyMock.expect(ref.getAddress()).andReturn(uri);
+ String addr = "http://localhost:9999/reponses";
+ EasyMock.expect(uri.getValue()).andReturn(addr);
+ control.replay();
+ assertTrue(!seq.canPiggybackAckOnPartialResponse());
+ control.verify();
+ control.reset();
+ EasyMock.expect(ref.getAddress()).andReturn(uri);
+ EasyMock.expect(uri.getValue()).andReturn(RMConstants.getAnonymousAddress());
+ control.replay();
+ assertTrue(seq.canPiggybackAckOnPartialResponse());
+ control.verify();
+ }
+
+ @Test
+ public void testPurgeAcknowledged() {
+ destination = control.createMock(Destination.class);
+ DestinationSequence seq = new DestinationSequence(id, ref, destination);
+ manager = control.createMock(RMManager.class);
+ EasyMock.expect(destination.getManager()).andReturn(manager);
+ RMStore store = control.createMock(RMStore.class);
+ EasyMock.expect(manager.getStore()).andReturn(store);
+ store.removeMessages(EasyMock.eq(id),
+ CastUtils.cast(EasyMock.isA(Collection.class), BigInteger.class), EasyMock.eq(false));
+ EasyMock.expectLastCall();
+ control.replay();
+ seq.purgeAcknowledged(BigInteger.ONE);
+ control.verify();
+ }
+
+ @Test
+ public void testCancelDeferredAcknowledgements() {
+ destination = control.createMock(Destination.class);
+ manager = control.createMock(RMManager.class);
+ EasyMock.expect(destination.getManager()).andReturn(manager);
+ Timer t = new Timer();
+ EasyMock.expect(manager.getTimer()).andReturn(t);
+ DestinationSequence seq = new DestinationSequence(id, ref, destination);
+ control.replay();
+ seq.scheduleDeferredAcknowledgement(30000L);
+ seq.cancelDeferredAcknowledgments();
+ seq.cancelDeferredAcknowledgments();
+ t.cancel();
+ control.verify();
+ }
+
+ @Test
+ public void testCancelTermination() {
+ destination = control.createMock(Destination.class);
+ manager = control.createMock(RMManager.class);
+ EasyMock.expect(destination.getManager()).andReturn(manager);
+ Timer t = new Timer();
+ EasyMock.expect(manager.getTimer()).andReturn(t);
+ DestinationSequence seq = new DestinationSequence(id, ref, destination);
+ control.replay();
+ seq.scheduleSequenceTermination(30000L);
+ seq.cancelTermination();
+ t.cancel();
control.verify();
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Wed May 9 07:27:10 2007
@@ -91,7 +91,7 @@
store.removeDestinationSequence(id);
EasyMock.expectLastCall();
control.replay();
- destination.addSequence(ds, true);
+ destination.addSequence(ds);
assertEquals(1, destination.getAllSequences().size());
assertSame(ds, destination.getSequence(id));
destination.removeSequence(ds);
@@ -175,7 +175,8 @@
BigInteger nr = BigInteger.TEN;
EasyMock.expect(st.getMessageNumber()).andReturn(nr).times(2);
DestinationSequence ds = control.createMock(DestinationSequence.class);
- EasyMock.expect(destination.getSequence(id)).andReturn(ds);
+ EasyMock.expect(destination.getSequence(id)).andReturn(ds);
+
ds.applyDeliveryAssurance(nr);
EasyMock.expectLastCall();
ds.acknowledge(message);
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java Wed May 9 07:27:10 2007
@@ -131,6 +131,46 @@
proxy.acknowledge(ds);
}
+ @Test
+ public void testLastMessage() throws NoSuchMethodException, RMException {
+ Method m = Proxy.class.getDeclaredMethod("invoke",
+ new Class[] {OperationInfo.class, Object[].class, Map.class});
+ Proxy proxy = control.createMock(Proxy.class, new Method[] {m});
+ proxy.setReliableEndpoint(rme);
+ SourceSequence ss = control.createMock(SourceSequence.class);
+ EasyMock.expect(ss.getTarget()).andReturn(null);
+ control.replay();
+ proxy.lastMessage(ss);
+ control.verify();
+
+ control.reset();
+ org.apache.cxf.ws.addressing.EndpointReferenceType target
+ = RMUtils.createAnonymousReference();
+ EasyMock.expect(ss.getTarget()).andReturn(target);
+ control.replay();
+ proxy.lastMessage(ss);
+ control.verify();
+
+ control.reset();
+ target = RMUtils.createReference("http://localhost:9000/greeterPort");
+ EasyMock.expect(ss.getTarget()).andReturn(target);
+ Endpoint endpoint = control.createMock(Endpoint.class);
+ EasyMock.expect(rme.getEndpoint()).andReturn(endpoint);
+ EndpointInfo epi = control.createMock(EndpointInfo.class);
+ EasyMock.expect(endpoint.getEndpointInfo()).andReturn(epi);
+ ServiceInfo si = control.createMock(ServiceInfo.class);
+ EasyMock.expect(epi.getService()).andReturn(si);
+ InterfaceInfo ii = control.createMock(InterfaceInfo.class);
+ EasyMock.expect(si.getInterface()).andReturn(ii);
+ OperationInfo oi = control.createMock(OperationInfo.class);
+ EasyMock.expect(ii.getOperation(RMConstants.getLastMessageOperationName())).andReturn(oi);
+ expectInvoke(proxy, oi, null);
+ control.replay();
+
+ proxy.lastMessage(ss);
+
+ }
+
@Test
public void testTerminate() throws NoSuchMethodException, RMException {
Method m = Proxy.class.getDeclaredMethod("invoke",
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java Wed May 9 07:27:10 2007
@@ -21,12 +21,16 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.List;
+import javax.wsdl.extensions.ExtensibilityElement;
import javax.xml.namespace.QName;
import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.service.Service;
import org.apache.cxf.service.model.BindingInfo;
import org.apache.cxf.service.model.BindingOperationInfo;
@@ -35,11 +39,13 @@
import org.apache.cxf.service.model.OperationInfo;
import org.apache.cxf.service.model.ServiceInfo;
import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.addressing.Names;
import org.apache.cxf.ws.policy.EffectivePolicy;
import org.apache.cxf.ws.policy.EndpointPolicy;
import org.apache.cxf.ws.policy.PolicyEngine;
import org.apache.cxf.ws.policy.PolicyInterceptorProviderRegistry;
import org.apache.neethi.Assertion;
+import org.apache.neethi.Policy;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
import org.junit.After;
@@ -48,12 +54,12 @@
import org.junit.Test;
public class RMEndpointTest extends Assert {
-
+
private IMocksControl control;
private RMManager manager;
private Endpoint ae;
private RMEndpoint rme;
-
+
@Before
public void setUp() {
control = EasyMock.createNiceControl();
@@ -61,12 +67,12 @@
ae = control.createMock(Endpoint.class);
rme = new RMEndpoint(manager, ae);
}
-
+
@After
public void tearDown() {
control.verify();
}
-
+
@Test
public void testConstructor() {
control.replay();
@@ -76,65 +82,65 @@
assertNull(rme.getConduit());
assertNull(rme.getReplyTo());
}
-
+
@Test
public void testGetName() {
- EndpointInfo aei = control.createMock(EndpointInfo.class);
+ EndpointInfo aei = control.createMock(EndpointInfo.class);
EasyMock.expect(ae.getEndpointInfo()).andReturn(aei);
QName qn = new QName("cxf");
EasyMock.expect(aei.getName()).andReturn(qn);
control.replay();
assertSame(qn, rme.getName());
}
-
+
@Test
public void testGetManager() {
control.replay();
assertSame(manager, rme.getManager());
}
-
+
@Test
public void testGetApplicationEndpoint() {
control.replay();
assertSame(ae, rme.getApplicationEndpoint());
}
-
+
@Test
public void testGetProxy() {
control.replay();
assertSame(rme, rme.getProxy().getReliableEndpoint());
}
-
+
@Test
public void testGetServant() {
control.replay();
assertNotNull(rme.getServant());
}
-
+
@Test
public void testGetSetDestination() {
Destination d = control.createMock(Destination.class);
control.replay();
assertSame(rme, rme.getDestination().getReliableEndpoint());
rme.setDestination(d);
- assertSame(d, rme.getDestination());
+ assertSame(d, rme.getDestination());
}
-
+
@Test
public void testGetSetSource() {
Source s = control.createMock(Source.class);
control.replay();
assertSame(rme, rme.getSource().getReliableEndpoint());
rme.setSource(s);
- assertSame(s, rme.getSource());
+ assertSame(s, rme.getSource());
}
-
+
@Test
public void testInitialise() throws NoSuchMethodException {
Method m1 = RMEndpoint.class.getDeclaredMethod("createService", new Class[] {});
Method m2 = RMEndpoint.class.getDeclaredMethod("createEndpoint", new Class[] {});
Method m3 = RMEndpoint.class.getDeclaredMethod("setPolicies", new Class[] {});
-
+
rme = control.createMock(RMEndpoint.class, new Method[] {m1, m2, m3});
rme.createService();
EasyMock.expectLastCall();
@@ -143,14 +149,14 @@
rme.setPolicies();
EasyMock.expectLastCall();
Conduit c = control.createMock(Conduit.class);
- org.apache.cxf.ws.addressing.EndpointReferenceType epr =
- control.createMock(org.apache.cxf.ws.addressing.EndpointReferenceType.class);
+ org.apache.cxf.ws.addressing.EndpointReferenceType epr = control
+ .createMock(org.apache.cxf.ws.addressing.EndpointReferenceType.class);
control.replay();
rme.initialise(c, epr);
assertSame(c, rme.getConduit());
- assertSame(epr, rme.getReplyTo());
+ assertSame(epr, rme.getReplyTo());
}
-
+
@Test
public void testCreateService() {
Service as = control.createMock(Service.class);
@@ -164,13 +170,13 @@
assertSame(rme.getServant(), s.getInvoker());
verifyService();
}
-
+
@Test
public void testCreateEndpoint() throws NoSuchMethodException {
Method m = RMEndpoint.class.getDeclaredMethod("getUsingAddressing", new Class[] {EndpointInfo.class});
rme = control.createMock(RMEndpoint.class, new Method[] {m});
rme.setAplicationEndpoint(ae);
- rme.setManager(manager);
+ rme.setManager(manager);
Service as = control.createMock(Service.class);
EasyMock.expect(ae.getService()).andReturn(as);
EndpointInfo aei = control.createMock(EndpointInfo.class);
@@ -189,22 +195,69 @@
rme.createEndpoint();
Endpoint e = rme.getEndpoint();
WrappedEndpoint we = (WrappedEndpoint)e;
- assertSame(ae, we.getWrappedEndpoint());
+ assertSame(ae, we.getWrappedEndpoint());
Service s = rme.getService();
assertEquals(1, s.getEndpoints().size());
- assertSame(e, s.getEndpoints().get(new QName(RMConstants.getWsdlNamespace(),
- "SequenceAbstractSoapPort")));
+ assertSame(e, s.getEndpoints().get(RMConstants.getPortName()));
}
-
+
+ @Test
+ public void testGetUsingAddressing() {
+ EndpointInfo ei = null;
+ control.replay();
+ assertNull(rme.getUsingAddressing(ei));
+ control.verify();
+
+ control.reset();
+ ExtensibilityElement ua = control.createMock(ExtensibilityElement.class);
+ ei = control.createMock(EndpointInfo.class);
+ List<ExtensibilityElement> noExts = new ArrayList<ExtensibilityElement>();
+ List<ExtensibilityElement> exts = new ArrayList<ExtensibilityElement>();
+ exts.add(ua);
+ EasyMock.expect(ei.getExtensors(ExtensibilityElement.class)).andReturn(noExts);
+ BindingInfo bi = control.createMock(BindingInfo.class);
+ EasyMock.expect(ei.getBinding()).andReturn(bi).times(2);
+ EasyMock.expect(bi.getExtensors(ExtensibilityElement.class)).andReturn(noExts);
+ ServiceInfo si = control.createMock(ServiceInfo.class);
+ EasyMock.expect(ei.getService()).andReturn(si).times(2);
+ EasyMock.expect(si.getExtensors(ExtensibilityElement.class)).andReturn(exts);
+ EasyMock.expect(ua.getElementType()).andReturn(Names.WSAW_USING_ADDRESSING_QNAME);
+ control.replay();
+ assertSame(ua, rme.getUsingAddressing(ei));
+ }
+
+ @Test
+ public void testGetUsingAddressingFromExtensions() {
+ List<ExtensibilityElement> exts = new ArrayList<ExtensibilityElement>();
+ ExtensibilityElement ua = control.createMock(ExtensibilityElement.class);
+ exts.add(ua);
+ EasyMock.expect(ua.getElementType()).andReturn(Names.WSAW_USING_ADDRESSING_QNAME);
+ control.replay();
+ assertSame(ua, rme.getUsingAddressing(exts));
+ }
+
+ @Test
+ public void testMessageArrivals() {
+ assertEquals(0L, rme.getLastApplicationMessage());
+ assertEquals(0L, rme.getLastControlMessage());
+ rme.receivedControlMessage();
+ assertEquals(0L, rme.getLastApplicationMessage());
+ assertTrue(rme.getLastControlMessage() > 0);
+ rme.receivedApplicationMessage();
+ assertTrue(rme.getLastApplicationMessage() > 0);
+ assertTrue(rme.getLastControlMessage() > 0);
+ control.replay();
+ }
+
@Test
public void testSetPoliciesNoEngine() {
Bus bus = control.createMock(Bus.class);
EasyMock.expect(manager.getBus()).andReturn(bus);
EasyMock.expect(bus.getExtension(PolicyEngine.class)).andReturn(null);
control.replay();
- rme.setPolicies();
+ rme.setPolicies();
}
-
+
@Test
public void testSetPoliciesEngineDisabled() {
Bus bus = control.createMock(Bus.class);
@@ -213,9 +266,9 @@
EasyMock.expect(bus.getExtension(PolicyEngine.class)).andReturn(pe);
EasyMock.expect(pe.isEnabled()).andReturn(false);
control.replay();
- rme.setPolicies();
+ rme.setPolicies();
}
-
+
@Test
public void testSetPolicies() throws NoSuchMethodException {
Method m = RMEndpoint.class.getDeclaredMethod("getEndpoint", new Class[] {});
@@ -238,38 +291,81 @@
EndpointPolicy epi = control.createMock(EndpointPolicy.class);
EasyMock.expect(pe.getServerEndpointPolicy(aei, null)).andReturn(epi);
EasyMock.expect(epi.getChosenAlternative()).andReturn(new ArrayList<Assertion>());
-
+
pe.setEndpointPolicy(ei, epi);
EasyMock.expectLastCall();
BindingInfo bi = control.createMock(BindingInfo.class);
EasyMock.expect(ei.getBinding()).andReturn(bi);
BindingOperationInfo boi = control.createMock(BindingOperationInfo.class);
EasyMock.expect(bi.getOperations()).andReturn(Collections.singletonList(boi));
- pe.setEffectiveServerRequestPolicy(EasyMock.eq(ei), EasyMock.eq(boi),
- EasyMock.isA(EffectivePolicy.class));
+ pe.setEffectiveServerRequestPolicy(EasyMock.eq(ei), EasyMock.eq(boi), EasyMock
+ .isA(EffectivePolicy.class));
EasyMock.expectLastCall();
- pe.setEffectiveServerResponsePolicy(EasyMock.eq(ei), EasyMock.eq(boi),
- EasyMock.isA(EffectivePolicy.class));
+ pe.setEffectiveServerResponsePolicy(EasyMock.eq(ei), EasyMock.eq(boi), EasyMock
+ .isA(EffectivePolicy.class));
EasyMock.expectLastCall();
- pe.setEffectiveClientRequestPolicy(EasyMock.eq(ei), EasyMock.eq(boi),
- EasyMock.isA(EffectivePolicy.class));
+ pe.setEffectiveClientRequestPolicy(EasyMock.eq(ei), EasyMock.eq(boi), EasyMock
+ .isA(EffectivePolicy.class));
EasyMock.expectLastCall();
- pe.setEffectiveClientResponsePolicy(EasyMock.eq(ei), EasyMock.eq(boi),
- EasyMock.isA(EffectivePolicy.class));
+ pe.setEffectiveClientResponsePolicy(EasyMock.eq(ei), EasyMock.eq(boi), EasyMock
+ .isA(EffectivePolicy.class));
EasyMock.expectLastCall();
control.replay();
rme.setPolicies();
}
-
- private void verifyService() {
+
+ @Test
+ public void testShutdown() {
+ DestinationSequence ds = control.createMock(DestinationSequence.class);
+ Identifier did = control.createMock(Identifier.class);
+ EasyMock.expect(ds.getIdentifier()).andReturn(did);
+ String d = "d";
+ EasyMock.expect(did.getValue()).andReturn(d);
+ SourceSequence ss = control.createMock(SourceSequence.class);
+ Identifier sid = control.createMock(Identifier.class);
+ EasyMock.expect(ss.getIdentifier()).andReturn(sid);
+ String s = "s";
+ EasyMock.expect(sid.getValue()).andReturn(s);
+ ds.cancelDeferredAcknowledgments();
+ EasyMock.expectLastCall();
+ ds.cancelTermination();
+ EasyMock.expectLastCall();
+ RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
+ EasyMock.expect(manager.getRetransmissionQueue()).andReturn(queue);
+ queue.stop(ss);
+ EasyMock.expectLastCall();
+ control.replay();
+ rme.getDestination().addSequence(ds, false);
+ rme.getSource().addSequence(ss, false);
+ rme.shutdown();
+ }
+
+ @Test
+ public void testEffectivePolicyImpl() {
+ EndpointPolicy ep = control.createMock(EndpointPolicy.class);
+ Collection<Assertion> alt = new ArrayList<Assertion>();
+ EasyMock.expect(ep.getChosenAlternative()).andReturn(alt).times(2);
+ PolicyInterceptorProviderRegistry reg = control.createMock(PolicyInterceptorProviderRegistry.class);
+ List<Interceptor> li = new ArrayList<Interceptor>();
+ EasyMock.expect(reg.getInterceptors(alt, true, false)).andReturn(li);
+ Policy p = control.createMock(Policy.class);
+ EasyMock.expect(ep.getPolicy()).andReturn(p);
+ control.replay();
+ EffectivePolicy effective = rme.new EffectivePolicyImpl(ep, reg, true, false);
+ assertSame(alt, effective.getChosenAlternative());
+ assertSame(li, effective.getInterceptors());
+ assertSame(p, effective.getPolicy());
+ }
+
+ private void verifyService() {
Service service = rme.getService();
ServiceInfo si = service.getServiceInfos().get(0);
assertNotNull("service info is null", si);
InterfaceInfo intf = si.getInterface();
-
- assertEquals(5, intf.getOperations().size());
-
+
+ assertEquals(7, intf.getOperations().size());
+
String ns = si.getName().getNamespaceURI();
OperationInfo oi = intf.getOperation(new QName(ns, "CreateSequence"));
assertNotNull("No operation info.", oi);
@@ -277,19 +373,27 @@
assertTrue("Operation is unwrapped.", !oi.isUnwrapped());
assertTrue("Operation is unwrappedCapable.", !oi.isUnwrappedCapable());
assertNull("Unexpected unwrapped operation.", oi.getUnwrappedOperation());
-
+
oi = intf.getOperation(new QName(ns, "TerminateSequence"));
assertNotNull("No operation info.", oi);
assertTrue("Operation is toway.", oi.isOneWay());
-
+
oi = intf.getOperation(new QName(ns, "SequenceAcknowledgement"));
assertNotNull("No operation info.", oi);
assertTrue("Operation is toway.", oi.isOneWay());
- oi = intf.getOperation(new QName(ns, "CreateSequenceOneway"));
+ oi = intf.getOperation(new QName(ns, "LastMessage"));
assertNotNull("No operation info.", oi);
assertTrue("Operation is toway.", oi.isOneWay());
+ oi = intf.getOperation(new QName(ns, "AckRequested"));
+ assertNotNull("No operation info.", oi);
+ assertTrue("Operation is toway.", oi.isOneWay());
+
+ oi = intf.getOperation(new QName(ns, "CreateSequenceOneway"));
+ assertNotNull("No operation info.", oi);
+ assertTrue("Operation is toway.", oi.isOneWay());
+
oi = intf.getOperation(new QName(ns, "CreateSequenceResponseOneway"));
assertNotNull("No operation info.", oi);
assertTrue("Operation is toway.", oi.isOneWay());
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java?view=diff&rev=536543&r1=536542&r2=536543
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java Wed May 9 07:27:10 2007
@@ -122,12 +122,14 @@
private void testHandleSequenceAck(boolean onServer) throws SequenceFault, RMException,
NoSuchMethodException {
Method m = RMInInterceptor.class.getDeclaredMethod("processAcknowledgments",
- new Class[] {RMProperties.class});
+ new Class[] {Source.class, RMProperties.class});
interceptor = control.createMock(RMInInterceptor.class, new Method[] {m});
Message message = setupInboundMessage(RMConstants.getSequenceAckAction(), onServer);
rme.receivedControlMessage();
EasyMock.expectLastCall();
- interceptor.processAcknowledgments(rmps);
+ Source s = control.createMock(Source.class);
+ EasyMock.expect(rme.getSource()).andReturn(s);
+ interceptor.processAcknowledgments(s, rmps);
EasyMock.expectLastCall();
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
@@ -168,9 +170,9 @@
private void testAppMessage(boolean onServer) throws SequenceFault, RMException, NoSuchMethodException {
Method m1 = RMInInterceptor.class.getDeclaredMethod("processAcknowledgments",
- new Class[] {RMProperties.class});
+ new Class[] {Source.class, RMProperties.class});
Method m2 = RMInInterceptor.class.getDeclaredMethod("processAcknowledgmentRequests",
- new Class[] {RMProperties.class});
+ new Class[] {Destination.class, Message.class});
Method m3 = RMInInterceptor.class.getDeclaredMethod("processSequence",
new Class[] {Destination.class, Message.class});
Method m4 = RMInInterceptor.class.getDeclaredMethod("processDeliveryAssurance",
@@ -180,9 +182,11 @@
Message message = setupInboundMessage("greetMe", true);
Destination d = control.createMock(Destination.class);
EasyMock.expect(manager.getDestination(message)).andReturn(d);
- interceptor.processAcknowledgments(rmps);
+ Source s = control.createMock(Source.class);
+ EasyMock.expect(rme.getSource()).andReturn(s);
+ interceptor.processAcknowledgments(s, rmps);
EasyMock.expectLastCall();
- interceptor.processAcknowledgmentRequests(rmps);
+ interceptor.processAcknowledgmentRequests(d, message);
EasyMock.expectLastCall();
interceptor.processSequence(d, message);
EasyMock.expectLastCall();
@@ -200,6 +204,7 @@
public void testProcessAcknowledgments() throws RMException {
interceptor = new RMInInterceptor();
manager = control.createMock(RMManager.class);
+ Source source = control.createMock(Source.class);
interceptor.setManager(manager);
SequenceAcknowledgement ack1 = control.createMock(SequenceAcknowledgement.class);
SequenceAcknowledgement ack2 = control.createMock(SequenceAcknowledgement.class);
@@ -210,16 +215,16 @@
Identifier id1 = control.createMock(Identifier.class);
EasyMock.expect(ack1.getIdentifier()).andReturn(id1);
SourceSequence ss1 = control.createMock(SourceSequence.class);
- EasyMock.expect(manager.getSourceSequence(id1)).andReturn(ss1);
+ EasyMock.expect(source.getSequence(id1)).andReturn(ss1);
ss1.setAcknowledged(ack1);
EasyMock.expectLastCall();
Identifier id2 = control.createMock(Identifier.class);
EasyMock.expect(ack2.getIdentifier()).andReturn(id2);
- EasyMock.expect(manager.getSourceSequence(id2)).andReturn(null);
+ EasyMock.expect(source.getSequence(id2)).andReturn(null);
control.replay();
try {
- interceptor.processAcknowledgments(rmps);
+ interceptor.processAcknowledgments(source, rmps);
fail("Expected SequenceFault not thrown");
} catch (SequenceFault sf) {
assertEquals(RMConstants.getUnknownSequenceFaultCode(), sf.getSequenceFault().getFaultCode());