You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ds...@apache.org on 2013/09/09 13:34:59 UTC
svn commit: r1521053 - in /cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/
systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/
Author: dsosnoski
Date: Mon Sep 9 11:34:58 2013
New Revision: 1521053
URL: http://svn.apache.org/r1521053
Log:
CXF-2118 Add JMX support for CloseSequence on source sequence and
TerminateSequence on destination sequence.
CXF-371 Correct CreateSequence/Offer to include required Endpoint
element (blocked interop with .Net)
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Mon Sep 9 11:34:58 2013
@@ -24,7 +24,10 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
+import javax.management.AttributeChangeNotification;
import javax.management.JMException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationBroadcasterSupport;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -50,7 +53,7 @@ import org.apache.cxf.ws.rm.v200702.Sequ
*/
@ManagedResource(componentName = "RMEndpoint",
description = "Responsible for Sources and Destinations.")
-public class ManagedRMEndpoint implements ManagedComponent {
+public class ManagedRMEndpoint extends NotificationBroadcasterSupport implements ManagedComponent {
private static final String[] SOURCE_SEQUENCE_NAMES =
{"sequenceId", "currentMessageNumber", "expires", "lastMessage", "queuedMessageCount",
@@ -487,6 +490,44 @@ public class ManagedRMEndpoint implement
return destination.getSequence(identifier);
}
+ @ManagedOperation(description = "Close Source Sequence")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public void closeSourceSequence(String sid) throws JMException {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new JMException("no source sequence");
+ }
+ RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+ rq.stop(ss);
+ Proxy proxy = endpoint.getProxy();
+ try {
+ proxy.lastMessage(ss);
+ } catch (RMException e) {
+ e.printStackTrace();
+ throw new JMException("Error closing sequence: " + e.getMessage());
+ }
+ }
+
+ @ManagedOperation(description = "Terminate Destination Sequence")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier")
+ })
+ public void terminateDestinationSequence(String sid) throws JMException {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new JMException("no destination sequence");
+ }
+ Proxy proxy = endpoint.getProxy();
+ try {
+ proxy.terminate(ds);
+ ds.getDestination().removeSequence(ds);
+ } catch (RMException e) {
+ throw new JMException("Error terminating sequence: " + e.getMessage());
+ }
+ }
+
@ManagedOperation(description = "Remove Source Sequence")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "sequenceId", description = "The destination identifier")
@@ -530,7 +571,7 @@ public class ManagedRMEndpoint implement
RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
rq.purgeAll(ss);
}
-
+
private static String getAddressValue(EndpointReferenceType epr) {
if (null != epr && null != epr.getAddress()) {
return epr.getAddress().getValue();
@@ -643,4 +684,15 @@ public class ManagedRMEndpoint implement
public int getCompletedDestinationSequenceCount() {
return endpoint.getCompletedDestinationSequenceCount();
}
+
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo() {
+ String[] types = new String[] {
+ AttributeChangeNotification.ATTRIBUTE_CHANGE
+ };
+ String name = AttributeChangeNotification.class.getName();
+ String description = "Message acknowledged";
+ MBeanNotificationInfo info = new MBeanNotificationInfo(types, name, description);
+ return new MBeanNotificationInfo[] {info};
+ }
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Mon Sep 9 11:34:58 2013
@@ -47,6 +47,7 @@ import org.apache.cxf.ws.addressing.Attr
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.RelatesToType;
import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
import org.apache.cxf.ws.rm.v200702.CreateSequenceType;
import org.apache.cxf.ws.rm.v200702.Expires;
@@ -88,7 +89,7 @@ public class Proxy {
}
void terminate(SourceSequence ss) throws RMException {
- final ProtocolVariation protocol = ss.getProtocol();
+ ProtocolVariation protocol = ss.getProtocol();
RMConstants constants = protocol.getConstants();
OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
.getService().getInterface().getOperation(constants.getTerminateSequenceOperationName());
@@ -100,6 +101,19 @@ public class Proxy {
invoke(oi, protocol, new Object[] {codec.convertToSend(ts)}, null);
}
+ void terminate(DestinationSequence ds) throws RMException {
+ ProtocolVariation protocol = ds.getProtocol();
+ RMConstants constants = protocol.getConstants();
+ OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
+ .getService().getInterface().getOperation(constants.getTerminateSequenceOperationName());
+
+ TerminateSequenceType ts = new TerminateSequenceType();
+ ts.setIdentifier(ds.getIdentifier());
+ ts.setLastMsgNumber(ds.getLastMessageNumber());
+ EncoderDecoder codec = protocol.getCodec();
+ invoke(oi, protocol, new Object[] {codec.convertToSend(ts)}, null);
+ }
+
void createSequenceResponse(final Object createResponse, ProtocolVariation protocol) throws RMException {
LOG.fine("sending CreateSequenceResponse from client side");
RMConstants constants = protocol.getConstants();
@@ -145,6 +159,7 @@ public class Proxy {
offer.setExpires(expires);
}
offer.setIdentifier(reliableEndpoint.getSource().generateSequenceIdentifier());
+ offer.setEndpoint(acksTo);
create.setOffer(offer);
setOfferedIdentifier(offer);
}
@@ -214,7 +229,14 @@ public class Proxy {
Collections.singletonMap(SourceSequence.class.getName(),
(Object)s));
- invoke(oi, protocol, new Object[] {}, context);
+ if (constants instanceof RM11Constants) {
+ CloseSequenceType csr = new CloseSequenceType();
+ csr.setIdentifier(s.getIdentifier());
+ csr.setLastMsgNumber(s.getCurrentMessageNr());
+ invoke(oi, protocol, new Object[] {csr}, context);
+ } else {
+ invoke(oi, protocol, new Object[] {}, context);
+ }
}
void ackRequested(SourceSequence s) throws RMException {
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java Mon Sep 9 11:34:58 2013
@@ -75,6 +75,8 @@ public final class RM11Constants extends
public static final QName CLOSE_SEQUENCE_QNAME = new QName(NAMESPACE_URI, "CloseSequence");
+ public static final QName CLOSE_SEQUENCE_RESPONSE_QNAME = new QName(NAMESPACE_URI, "CloseSequenceResponse");
+
public static final QName ACK_REQ_QNAME = new QName(NAMESPACE_URI, "AckRequested");
public static final QName CREATE_SEQUENCE_ONEWAY_QNAME =
@@ -192,6 +194,11 @@ public final class RM11Constants extends
return CLOSE_SEQUENCE_ACTION;
}
+ // only defined for WS-RM 1.1/1.2
+ public String getCloseSequenceResponseAction() {
+ return CLOSE_SEQUENCE_RESPONSE_ACTION;
+ }
+
public String getAckRequestedAction() {
return ACK_REQUESTED_ACTION;
}
@@ -242,6 +249,11 @@ public final class RM11Constants extends
return CLOSE_SEQUENCE_QNAME;
}
+ // not part of the interface, only in WS-RM 1.1/1.2
+ public QName getCloseSequenceResponseOperationName() {
+ return CLOSE_SEQUENCE_RESPONSE_QNAME;
+ }
+
public QName getSequenceAckOperationName() {
return SEQUENCE_ACKNOWLEDGEMENT_QNAME;
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Mon Sep 9 11:34:58 2013
@@ -69,6 +69,8 @@ import org.apache.cxf.ws.policy.PolicyEn
import org.apache.cxf.ws.policy.PolicyInterceptorProviderRegistry;
import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceResponseType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
import org.apache.neethi.Assertion;
import org.apache.neethi.Policy;
@@ -84,6 +86,8 @@ public class RMEndpoint {
private static final String CREATE_RESPONSE_PART_NAME = "createResponse";
private static final String TERMINATE_PART_NAME = "terminate";
private static final String TERMINATE_RESPONSE_PART_NAME = "terminateResponse";
+ private static final String CLOSE_PART_NAME = "close";
+ private static final String CLOSE_RESPONSE_PART_NAME = "closeResponse";
private static Schema rmSchema;
@@ -547,6 +551,21 @@ public class RMEndpoint {
messageInfo = operationInfo.createMessage(consts.getCloseSequenceOperationName(),
MessageInfo.Type.INPUT);
operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
+ if (RM11Constants.NAMESPACE_URI.equals(protocol.getWSRMNamespace())) {
+ MessagePartInfo partInfo = messageInfo.addMessagePart(CLOSE_PART_NAME);
+ partInfo.setElementQName(consts.getCloseSequenceOperationName());
+ partInfo.setElement(true);
+ partInfo.setTypeClass(CloseSequenceType.class);
+ messageInfo = operationInfo.createMessage(
+ RM11Constants.INSTANCE.getCloseSequenceResponseOperationName(),
+ MessageInfo.Type.OUTPUT);
+ operationInfo.setOutput(messageInfo.getName().getLocalPart(), messageInfo);
+ partInfo = messageInfo.addMessagePart(CLOSE_RESPONSE_PART_NAME);
+ partInfo.setElementQName(RM11Constants.INSTANCE.getCloseSequenceResponseOperationName());
+ partInfo.setElement(true);
+ partInfo.setTypeClass(CloseSequenceResponseType.class);
+ partInfo.setIndex(0);
+ }
}
void buildAckRequestedOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
@@ -603,7 +622,12 @@ public class RMEndpoint {
bi.addOperation(boi);
boi = bi.buildOperation(consts.getCloseSequenceOperationName(), null, null);
- addAction(boi, consts.getCloseSequenceAction());
+ if (RM11Constants.NAMESPACE_URI.equals(protocol.getWSRMNamespace())) {
+ addAction(boi, consts.getCloseSequenceAction(),
+ RM11Constants.INSTANCE.getCloseSequenceResponseAction());
+ } else {
+ addAction(boi, consts.getCloseSequenceAction());
+ }
bi.addOperation(boi);
boi = bi.buildOperation(consts.getAckRequestedOperationName(), null, null);
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Mon Sep 9 11:34:58 2013
@@ -63,6 +63,7 @@ import org.apache.cxf.ws.rm.manager.Acks
import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
import org.apache.cxf.ws.rm.manager.RM10AddressingNamespaceType;
+import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
import org.apache.cxf.ws.rm.manager.SourcePolicyType;
import org.apache.cxf.ws.rm.persistence.RMMessage;
import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -293,12 +294,11 @@ public class RMManager {
* @param sp The sourcePolicy to set.
*/
public void setSourcePolicy(SourcePolicyType sp) {
- org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
if (null == sp) {
- sp = factory.createSourcePolicyType();
+ sp = new SourcePolicyType();
}
if (sp.getSequenceTerminationPolicy() == null) {
- sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
+ sp.setSequenceTerminationPolicy(new SequenceTerminationPolicyType());
}
sourcePolicy = sp;
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Mon Sep 9 11:34:58 2013
@@ -101,17 +101,7 @@ public class RMOutInterceptor extends Ab
boolean isLastMessage = constants.getCloseSequenceAction().equals(action);
if (isApplicationMessage && !isPartialResponse) {
- RetransmissionInterceptor ri = new RetransmissionInterceptor();
- ri.setManager(getManager());
- // TODO:
- // On the server side: If a fault occurs after this interceptor we will switch
- // interceptor chains (if this is not already a fault message) and therefore need to
- // make sure the retransmission interceptor is added to the fault chain
- //
- msg.getInterceptorChain().add(ri);
- LOG.fine("Added RetransmissionInterceptor to chain.");
-
- getManager().getRetransmissionQueue().start();
+ addRetransmissionInterceptor(msg);
}
RMProperties rmpsOut = RMContextUtils.retrieveRMProperties(msg, true);
@@ -139,8 +129,8 @@ public class RMOutInterceptor extends Ab
ContextUtils.storeDeferUncorrelatedMessageAbort(msg);
}
- if ((isApplicationMessage || isLastMessage)
- && !isPartialResponse) {
+ Map<?, ?> invocationContext = (Map<?, ?>)msg.get(Message.INVOCATION_CONTEXT);
+ if ((isApplicationMessage || (isLastMessage && invocationContext != null)) && !isPartialResponse) {
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
}
@@ -150,7 +140,6 @@ public class RMOutInterceptor extends Ab
synchronized (source) {
SourceSequence seq = null;
if (isLastMessage) {
- Map<?, ?> invocationContext = (Map<?, ?>)msg.get(Message.INVOCATION_CONTEXT);
seq = (SourceSequence)invocationContext.get(SourceSequence.class.getName());
} else {
seq = getManager().getSequence(inSeqId, msg, maps);
@@ -212,6 +201,20 @@ public class RMOutInterceptor extends Ab
assertReliability(msg);
}
+ private void addRetransmissionInterceptor(Message msg) {
+ RetransmissionInterceptor ri = new RetransmissionInterceptor();
+ ri.setManager(getManager());
+ // TODO:
+ // On the server side: If a fault occurs after this interceptor we will switch
+ // interceptor chains (if this is not already a fault message) and therefore need to
+ // make sure the retransmission interceptor is added to the fault chain
+ //
+ msg.getInterceptorChain().add(ri);
+ LOG.fine("Added RetransmissionInterceptor to chain.");
+
+ getManager().getRetransmissionQueue().start();
+ }
+
private String getAddressingNamespace(AddressingProperties maps) {
String wsaNamespace = maps.getNamespaceURI();
if (wsaNamespace == null) {
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Mon Sep 9 11:34:58 2013
@@ -39,6 +39,8 @@ import org.apache.cxf.ws.addressing.Addr
import org.apache.cxf.ws.addressing.ContextUtils;
import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
import org.apache.cxf.ws.rm.v200702.AcceptType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceResponseType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
import org.apache.cxf.ws.rm.v200702.CreateSequenceType;
import org.apache.cxf.ws.rm.v200702.Expires;
@@ -106,6 +108,8 @@ public class Servant implements Invoker
if (tsr != null) {
return Collections.singletonList(tsr);
}
+ } else if (RM11Constants.INSTANCE.getCloseSequenceOperationName().equals(oi.getName())) {
+ return Collections.singletonList(closeSequence(exchange.getInMessage()));
}
return null;
@@ -290,6 +294,40 @@ public class Servant implements Invoker
return terminateResponse;
}
+ public Object closeSequence(Message message) {
+ LOG.fine("Closing sequence");
+
+ CloseSequenceType close = (CloseSequenceType)getParameter(message);
+
+ // check if the terminated sequence was created in response to a a createSequence
+ // request
+
+ Destination destination = reliableEndpoint.getDestination();
+ Identifier sid = close.getIdentifier();
+ DestinationSequence closedSeq = destination.getSequence(sid);
+ if (null == closedSeq) {
+ // TODO
+ LOG.severe("No such sequence.");
+ return null;
+ }
+ closedSeq.scheduleImmediateAcknowledgement();
+ closedSeq.setLastMessageNumber(close.getLastMsgNumber());
+ CloseSequenceResponseType closeResponse = new CloseSequenceResponseType();
+ closeResponse.setIdentifier(close.getIdentifier());
+ AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
+ Message outMessage = message.getExchange().getOutMessage();
+
+ if (null == outMessage) {
+ // outMessage may be null e.g. if ReplyTo is not set for TS
+ outMessage = ContextUtils.createMessage(message.getExchange());
+ message.getExchange().setOutMessage(outMessage);
+ }
+ if (null != outMessage) {
+ RMContextUtils.storeMAPs(maps, outMessage, false, false);
+ }
+ return closeResponse;
+ }
+
Object getParameter(Message message) {
List<?> resList = null;
// assert message == message.getExchange().getInMessage();
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java Mon Sep 9 11:34:58 2013
@@ -513,7 +513,7 @@ public final class VersionTransformer {
}
/**
- * Convert 200502 Expires with 200508 WS-Addressing namespace to internal form.
+ * Convert 200502 OfferType with 200508 WS-Addressing namespace to internal form.
*
* @param exposed (may be <code>null</code>)
* @return converted (<code>null</code> if internal is <code>null</code>)
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java Mon Sep 9 11:34:58 2013
@@ -420,6 +420,9 @@ public class RMSoapInterceptor extends A
isOneway = false;
} else if (consts.getCloseSequenceAction().equals(action)) {
boi = bi.getOperation(consts.getCloseSequenceOperationName());
+ } else if (RM11Constants.INSTANCE.getCloseSequenceResponseAction().equals(action)) {
+ boi = bi.getOperation(RM11Constants.INSTANCE.getCloseSequenceOperationName());
+ isOneway = false;
}
assert boi != null;
exchange.put(BindingOperationInfo.class, boi);
@@ -434,7 +437,8 @@ public class RMSoapInterceptor extends A
//
if (!consts.getCreateSequenceResponseAction().equals(action)
- && !RM11Constants.INSTANCE.getTerminateSequenceResponseAction().equals(action)) {
+ && !RM11Constants.INSTANCE.getTerminateSequenceResponseAction().equals(action)
+ && !RM11Constants.INSTANCE.getCloseSequenceResponseAction().equals(action)) {
LOG.fine("Changing requestor role from " + message.get(Message.REQUESTOR_ROLE)
+ " to false");
Object originalRequestorRole = message.get(Message.REQUESTOR_ROLE);
Modified: cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java (original)
+++ cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java Mon Sep 9 11:34:58 2013
@@ -39,6 +39,7 @@ import org.apache.cxf.greeter_control.Gr
import org.apache.cxf.management.InstrumentationManager;
import org.apache.cxf.management.ManagementConstants;
import org.apache.cxf.testutil.common.AbstractClientServerTestBase;
+import org.apache.cxf.ws.rm.RM11Constants;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMUtils;
@@ -51,8 +52,13 @@ import org.junit.Test;
*
*/
public class ManagedEndpointsTest extends AbstractClientServerTestBase {
+
public static final String PORT = allocatePort(ManagedEndpointsTest.class);
+ private static final String[] EMPTY_SIGNATURE = new String[0];
+ private static final String[] ONESTRING_SIGNATURE = new String[]{"java.lang.String"};
+ private static final String[] ONEBOOLEAN_SIGNATURE = new String[]{"boolean"};
+
private static final String SERVER_CFG = "/org/apache/cxf/systest/ws/rm/managed-server.xml";
private static final String CLIENT_CFG = "/org/apache/cxf/systest/ws/rm/managed-client.xml";
@@ -107,14 +113,7 @@ public class ManagedEndpointsTest extend
@Test
public void testManagedEndpointsOneway() throws Exception {
- checkServerReady(30000);
-
- SpringBusFactory bf = new SpringBusFactory();
- clientBus = bf.createBus(CLIENT_CFG);
- MessageLossSimulator mls = new MessageLossSimulator();
- clientBus.getOutInterceptors().add(mls);
-
- BusFactory.setDefaultBus(clientBus);
+ prepareClient();
RMManager clientManager = clientBus.getExtension(RMManager.class);
RMManager serverManager = serverBus.getExtension(RMManager.class);
@@ -147,7 +146,7 @@ public class ManagedEndpointsTest extend
ObjectName serverEndpointName = getEndpointName(mbs, serverManager);
o = mbs.invoke(clientEndpointName, "getSourceSequenceIds",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
String sseqId = ((String[])o)[0];
@@ -162,11 +161,11 @@ public class ManagedEndpointsTest extend
String dseqId = ((String[])o)[0];
o = mbs.invoke(serverEndpointName, "getSourceSequenceIds",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
verifyArray("Expected sequence identifier", o, new String[]{dseqId}, false);
o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
o = mbs.invoke(clientEndpointName, "getQueuedMessageCount",
@@ -177,31 +176,31 @@ public class ManagedEndpointsTest extend
verifySourceSequence(o, sseqId, 1, 0);
o = mbs.invoke(clientEndpointName, "getSourceSequences",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
assertTrue("One sequence message", o instanceof CompositeData[] && 1 == ((CompositeData[])o).length);
verifySourceSequence(((CompositeData[])o)[0], sseqId, 1, 0);
o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
verifyArray("Expected range", o, new Long[]{1L, 1L}, true);
o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
greeter.greetMeOneWay("two"); // getting lost
greeter.greetMeOneWay("three"); // sent
o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
assertTrue("One unacknowledged message", o instanceof Long[] && 1 == ((Long[])o).length);
o = mbs.invoke(clientEndpointName, "getRetransmissionStatus",
@@ -209,7 +208,7 @@ public class ManagedEndpointsTest extend
verifyRetransmissionStatus(o, 2L, 0);
o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
// 7 sec retry interval + 5 sec
@@ -217,33 +216,36 @@ public class ManagedEndpointsTest extend
Thread.sleep(12000);
o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
-
+
+ mbs.invoke(clientEndpointName, "closeSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ o = mbs.invoke(clientEndpointName, "getSourceSequenceIds",
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+ assertTrue("Source sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
+
+ mbs.invoke(clientEndpointName, "terminateDestinationSequence", new Object[]{dseqId}, ONESTRING_SIGNATURE);
+ o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds",
+ new Object[]{}, EMPTY_SIGNATURE);
+ assertTrue("Destination sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
+
}
@Test
public void testSuspendAndResumeSourceSequence() throws Exception {
- checkServerReady(30000);
-
- SpringBusFactory bf = new SpringBusFactory();
- clientBus = bf.createBus(CLIENT_CFG);
- MessageLossSimulator mls = new MessageLossSimulator();
- clientBus.getOutInterceptors().add(mls);
-
- BusFactory.setDefaultBus(clientBus);
+ prepareClient();
RMManager clientManager = clientBus.getExtension(RMManager.class);
@@ -268,18 +270,18 @@ public class ManagedEndpointsTest extend
String sseqId = (String)o;
o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
greeter.greetMeOneWay("two"); // sent but suspended
greeter.greetMeOneWay("three"); // sent but suspended
o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
mbs.invoke(clientEndpointName, "suspendSourceQueue",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
LOG.info("suspended the source queue: " + sseqId);
@@ -288,21 +290,166 @@ public class ManagedEndpointsTest extend
Thread.sleep(10000);
o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
mbs.invoke(clientEndpointName, "resumeSourceQueue",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
LOG.info("resumed the source queue: " + sseqId);
LOG.info("waiting for 15 secs for the retry (resumed)...");
Thread.sleep(15000);
o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
- new Object[]{true}, new String[]{"boolean"});
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
assertTrue("No queued messages", o instanceof Integer && 0 == ((Integer)o).intValue());
}
+
+ @Test
+ public void testManagedEndpointsOneway12() throws Exception {
+ prepareClient();
+
+ RMManager clientManager = clientBus.getExtension(RMManager.class);
+ RMManager serverManager = serverBus.getExtension(RMManager.class);
+
+ InstrumentationManager serverIM = serverBus.getExtension(InstrumentationManager.class);
+ MBeanServer mbs = serverIM.getMBeanServer();
+ assertNotNull("MBeanServer must be available.", mbs);
+
+ ObjectName clientManagerName = RMUtils.getManagedObjectName(clientManager);
+ ObjectName serverManagerName = RMUtils.getManagedObjectName(serverManager);
+
+ Object o;
+ GreeterService gs = new GreeterService();
+ final Greeter greeter = gs.getGreeterPort();
+ updateAddressPort(greeter, ManagedEndpointsTest.PORT);
+ LOG.fine("Created greeter client.");
+
+ ClientProxy.getClient(greeter).getRequestContext().put(RMManager.WSRM_VERSION_PROPERTY,
+ RM11Constants.NAMESPACE_URI);
+
+ org.apache.cxf.endpoint.Endpoint ep = ClientProxy.getClient(greeter).getEndpoint();
+ String epId = RMUtils.getEndpointIdentifier(ep, clientBus);
+ greeter.greetMeOneWay("one"); // sent
+
+ o = mbs.invoke(clientManagerName, "getEndpointIdentifiers", null, null);
+ verifyArray("Expected endpoint identifier", o, new String[]{epId}, true);
+
+ o = mbs.invoke(serverManagerName, "getEndpointIdentifiers", null, null);
+ verifyArray("Expected endpoint identifier", o, new String[]{epId}, true);
+
+ ObjectName clientEndpointName = RMUtils.getManagedObjectName(clientManager, ep);
+ // we need to find out serverEndpointName by using the query name
+ ObjectName serverEndpointName = getEndpointName(mbs, serverManager);
+
+ o = mbs.invoke(clientEndpointName, "getSourceSequenceIds",
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+ assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
+ String sseqId = ((String[])o)[0];
+
+ o = mbs.invoke(clientEndpointName, "getCurrentSourceSequenceId", null, null);
+ assertTrue("Expected sequence identifier", o instanceof String && sseqId.equals(o));
+
+ o = mbs.invoke(serverEndpointName, "getDestinationSequenceIds", null, null);
+ verifyArray("Expected sequence identifier", o, new String[]{sseqId}, false);
+
+ o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", null, null);
+ assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
+ String dseqId = ((String[])o)[0];
+ o = mbs.invoke(serverEndpointName, "getSourceSequenceIds",
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+ verifyArray("Expected sequence identifier", o, new String[]{dseqId}, false);
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+ assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageCount",
+ new Object[]{sseqId, true}, new String[]{"java.lang.String", "boolean"});
+ assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
+
+ o = mbs.invoke(clientEndpointName, "getCurrentSourceSequence", null, null);
+ verifySourceSequence(o, sseqId, 1, 0);
+
+ o = mbs.invoke(clientEndpointName, "getSourceSequences",
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+ assertTrue("One sequence message", o instanceof CompositeData[] && 1 == ((CompositeData[])o).length);
+ verifySourceSequence(((CompositeData[])o)[0], sseqId, 1, 0);
+
+ o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange",
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ verifyArray("Expected range", o, new Long[]{1L, 1L}, true);
+
+ o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers",
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
+
+ greeter.greetMeOneWay("two"); // getting lost
+ greeter.greetMeOneWay("three"); // sent
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+ assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
+
+ o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange",
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
+
+ o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers",
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ assertTrue("One unacknowledged message", o instanceof Long[] && 1 == ((Long[])o).length);
+
+ o = mbs.invoke(clientEndpointName, "getRetransmissionStatus",
+ new Object[]{sseqId, 2}, new String[]{"java.lang.String", "long"});
+ verifyRetransmissionStatus(o, 2L, 0);
+
+ o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange",
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
+
+ // 7 sec retry interval + 5 sec
+ LOG.info("waiting for 12 secs for the retry to complete ...");
+ Thread.sleep(12000);
+
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+ assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
+
+ o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange",
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
+
+ o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange",
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
+
+ o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers",
+ new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
+
+ mbs.invoke(clientEndpointName, "closeSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
+ o = mbs.invoke(clientEndpointName, "getSourceSequenceIds",
+ new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+
+ mbs.invoke(clientEndpointName, "terminateDestinationSequence", new Object[]{dseqId}, ONESTRING_SIGNATURE);
+ o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds",
+ new Object[]{}, EMPTY_SIGNATURE);
+ assertTrue("Destination sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
+
+ }
+
+ private void prepareClient() {
+ checkServerReady(30000);
+
+ SpringBusFactory bf = new SpringBusFactory();
+ clientBus = bf.createBus(CLIENT_CFG);
+ MessageLossSimulator mls = new MessageLossSimulator();
+ clientBus.getOutInterceptors().add(mls);
+
+ BusFactory.setDefaultBus(clientBus);
+ }
+
private void checkServerReady(long max) {
long waited = 0;
while (waited < max) {