You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ds...@apache.org on 2013/09/09 13:34:59 UTC

svn commit: r1521053 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/ systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/

Author: dsosnoski
Date: Mon Sep  9 11:34:58 2013
New Revision: 1521053

URL: http://svn.apache.org/r1521053
Log:
CXF-2118 Add JMX support for CloseSequence on source sequence and
TerminateSequence on destination sequence.
CXF-371 Correct CreateSequence/Offer to include required Endpoint
element (blocked interop with .Net)

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
    cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Mon Sep  9 11:34:58 2013
@@ -24,7 +24,10 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
+import javax.management.AttributeChangeNotification;
 import javax.management.JMException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationBroadcasterSupport;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -50,7 +53,7 @@ import org.apache.cxf.ws.rm.v200702.Sequ
  */
 @ManagedResource(componentName = "RMEndpoint", 
                  description = "Responsible for Sources and Destinations.")
-public class ManagedRMEndpoint implements ManagedComponent {
+public class ManagedRMEndpoint extends NotificationBroadcasterSupport implements ManagedComponent {
 
     private static final String[] SOURCE_SEQUENCE_NAMES = 
     {"sequenceId", "currentMessageNumber", "expires", "lastMessage", "queuedMessageCount", 
@@ -487,6 +490,44 @@ public class ManagedRMEndpoint implement
         return destination.getSequence(identifier);
     }
 
+    @ManagedOperation(description = "Close Source Sequence")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public void closeSourceSequence(String sid) throws JMException {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new JMException("no source sequence");
+        }
+        RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+        rq.stop(ss);
+        Proxy proxy = endpoint.getProxy();
+        try {
+            proxy.lastMessage(ss);
+        } catch (RMException e) {
+            e.printStackTrace();
+            throw new JMException("Error closing sequence: " + e.getMessage());
+        }
+    }
+
+    @ManagedOperation(description = "Terminate Destination Sequence")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier") 
+    })
+    public void terminateDestinationSequence(String sid) throws JMException {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new JMException("no destination sequence");
+        }
+        Proxy proxy = endpoint.getProxy();
+        try {
+            proxy.terminate(ds);
+            ds.getDestination().removeSequence(ds);
+        } catch (RMException e) {
+            throw new JMException("Error terminating sequence: " + e.getMessage());
+        }
+    }
+
     @ManagedOperation(description = "Remove Source Sequence")
     @ManagedOperationParameters({
         @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier") 
@@ -530,7 +571,7 @@ public class ManagedRMEndpoint implement
         RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
         rq.purgeAll(ss);
     }
-
+    
     private static String getAddressValue(EndpointReferenceType epr) {
         if (null != epr && null != epr.getAddress()) {
             return epr.getAddress().getValue();
@@ -643,4 +684,15 @@ public class ManagedRMEndpoint implement
     public int getCompletedDestinationSequenceCount() {
         return endpoint.getCompletedDestinationSequenceCount();
     }
+    
+    @Override
+    public MBeanNotificationInfo[] getNotificationInfo() {
+        String[] types = new String[] {
+            AttributeChangeNotification.ATTRIBUTE_CHANGE
+        };
+        String name = AttributeChangeNotification.class.getName();
+        String description = "Message acknowledged";
+        MBeanNotificationInfo info =  new MBeanNotificationInfo(types, name, description);
+        return new MBeanNotificationInfo[] {info};
+    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Mon Sep  9 11:34:58 2013
@@ -47,6 +47,7 @@ import org.apache.cxf.ws.addressing.Attr
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.addressing.RelatesToType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceType;
 import org.apache.cxf.ws.rm.v200702.Expires;
@@ -88,7 +89,7 @@ public class Proxy {
     }
     
     void terminate(SourceSequence ss) throws RMException {
-        final ProtocolVariation protocol = ss.getProtocol(); 
+        ProtocolVariation protocol = ss.getProtocol(); 
         RMConstants constants = protocol.getConstants();
         OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
             .getService().getInterface().getOperation(constants.getTerminateSequenceOperationName());
@@ -100,6 +101,19 @@ public class Proxy {
         invoke(oi, protocol, new Object[] {codec.convertToSend(ts)}, null);
     }
     
+    void terminate(DestinationSequence ds) throws RMException {
+        ProtocolVariation protocol = ds.getProtocol(); 
+        RMConstants constants = protocol.getConstants();
+        OperationInfo oi = reliableEndpoint.getEndpoint(protocol).getEndpointInfo()
+            .getService().getInterface().getOperation(constants.getTerminateSequenceOperationName());
+        
+        TerminateSequenceType ts = new TerminateSequenceType();
+        ts.setIdentifier(ds.getIdentifier());
+        ts.setLastMsgNumber(ds.getLastMessageNumber());
+        EncoderDecoder codec = protocol.getCodec();
+        invoke(oi, protocol, new Object[] {codec.convertToSend(ts)}, null);
+    }
+    
     void createSequenceResponse(final Object createResponse, ProtocolVariation protocol) throws RMException {
         LOG.fine("sending CreateSequenceResponse from client side");
         RMConstants constants = protocol.getConstants();
@@ -145,6 +159,7 @@ public class Proxy {
                 offer.setExpires(expires);
             }
             offer.setIdentifier(reliableEndpoint.getSource().generateSequenceIdentifier());
+            offer.setEndpoint(acksTo);
             create.setOffer(offer);
             setOfferedIdentifier(offer);
         }
@@ -214,7 +229,14 @@ public class Proxy {
                 Collections.singletonMap(SourceSequence.class.getName(), 
                                          (Object)s));
 
-        invoke(oi, protocol, new Object[] {}, context);
+        if (constants instanceof RM11Constants) {
+            CloseSequenceType csr = new CloseSequenceType();
+            csr.setIdentifier(s.getIdentifier());
+            csr.setLastMsgNumber(s.getCurrentMessageNr());
+            invoke(oi, protocol, new Object[] {csr}, context);
+        } else {
+            invoke(oi, protocol, new Object[] {}, context);
+        }
     }
     
     void ackRequested(SourceSequence s) throws RMException {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java Mon Sep  9 11:34:58 2013
@@ -75,6 +75,8 @@ public final class RM11Constants extends
     
     public static final QName CLOSE_SEQUENCE_QNAME = new QName(NAMESPACE_URI, "CloseSequence");
     
+    public static final QName CLOSE_SEQUENCE_RESPONSE_QNAME = new QName(NAMESPACE_URI, "CloseSequenceResponse");
+    
     public static final QName ACK_REQ_QNAME = new QName(NAMESPACE_URI, "AckRequested");
     
     public static final QName CREATE_SEQUENCE_ONEWAY_QNAME =
@@ -192,6 +194,11 @@ public final class RM11Constants extends
         return CLOSE_SEQUENCE_ACTION;
     }
     
+    // only defined for WS-RM 1.1/1.2
+    public String getCloseSequenceResponseAction() {
+        return CLOSE_SEQUENCE_RESPONSE_ACTION;
+    }
+    
     public String getAckRequestedAction() {
         return ACK_REQUESTED_ACTION;
     }
@@ -242,6 +249,11 @@ public final class RM11Constants extends
         return CLOSE_SEQUENCE_QNAME;
     }
     
+    // not part of the interface, only in WS-RM 1.1/1.2
+    public QName getCloseSequenceResponseOperationName() {
+        return CLOSE_SEQUENCE_RESPONSE_QNAME;
+    }
+    
     public QName getSequenceAckOperationName() {
         return SEQUENCE_ACKNOWLEDGEMENT_QNAME;
     }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Mon Sep  9 11:34:58 2013
@@ -69,6 +69,8 @@ import org.apache.cxf.ws.policy.PolicyEn
 import org.apache.cxf.ws.policy.PolicyInterceptorProviderRegistry;
 import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceResponseType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
 import org.apache.neethi.Assertion;
 import org.apache.neethi.Policy;
 
@@ -84,6 +86,8 @@ public class RMEndpoint {
     private static final String CREATE_RESPONSE_PART_NAME = "createResponse";
     private static final String TERMINATE_PART_NAME = "terminate";
     private static final String TERMINATE_RESPONSE_PART_NAME = "terminateResponse";
+    private static final String CLOSE_PART_NAME = "close";
+    private static final String CLOSE_RESPONSE_PART_NAME = "closeResponse";
     
     private static Schema rmSchema;
 
@@ -547,6 +551,21 @@ public class RMEndpoint {
         messageInfo = operationInfo.createMessage(consts.getCloseSequenceOperationName(),
                                                   MessageInfo.Type.INPUT);
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
+        if (RM11Constants.NAMESPACE_URI.equals(protocol.getWSRMNamespace())) {
+            MessagePartInfo partInfo = messageInfo.addMessagePart(CLOSE_PART_NAME);
+            partInfo.setElementQName(consts.getCloseSequenceOperationName());
+            partInfo.setElement(true);
+            partInfo.setTypeClass(CloseSequenceType.class);
+            messageInfo = operationInfo.createMessage(
+                RM11Constants.INSTANCE.getCloseSequenceResponseOperationName(),
+                MessageInfo.Type.OUTPUT);
+            operationInfo.setOutput(messageInfo.getName().getLocalPart(), messageInfo);
+            partInfo = messageInfo.addMessagePart(CLOSE_RESPONSE_PART_NAME);
+            partInfo.setElementQName(RM11Constants.INSTANCE.getCloseSequenceResponseOperationName());
+            partInfo.setElement(true);
+            partInfo.setTypeClass(CloseSequenceResponseType.class);
+            partInfo.setIndex(0);
+        }
     }
 
     void buildAckRequestedOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
@@ -603,7 +622,12 @@ public class RMEndpoint {
             bi.addOperation(boi);
 
             boi = bi.buildOperation(consts.getCloseSequenceOperationName(), null, null);
-            addAction(boi, consts.getCloseSequenceAction());
+            if (RM11Constants.NAMESPACE_URI.equals(protocol.getWSRMNamespace())) {
+                addAction(boi, consts.getCloseSequenceAction(), 
+                          RM11Constants.INSTANCE.getCloseSequenceResponseAction());
+            } else {
+                addAction(boi, consts.getCloseSequenceAction());
+            }
             bi.addOperation(boi);
 
             boi = bi.buildOperation(consts.getAckRequestedOperationName(), null, null);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Mon Sep  9 11:34:58 2013
@@ -63,6 +63,7 @@ import org.apache.cxf.ws.rm.manager.Acks
 import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
 import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
 import org.apache.cxf.ws.rm.manager.RM10AddressingNamespaceType;
+import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
 import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -293,12 +294,11 @@ public class RMManager {
      * @param sp The sourcePolicy to set.
      */
     public void setSourcePolicy(SourcePolicyType sp) {
-        org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
         if (null == sp) {
-            sp = factory.createSourcePolicyType();
+            sp = new SourcePolicyType();
         }
         if (sp.getSequenceTerminationPolicy() == null) {
-            sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
+            sp.setSequenceTerminationPolicy(new SequenceTerminationPolicyType());
         }
         sourcePolicy = sp;
     }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Mon Sep  9 11:34:58 2013
@@ -101,17 +101,7 @@ public class RMOutInterceptor extends Ab
         boolean isLastMessage = constants.getCloseSequenceAction().equals(action);
         
         if (isApplicationMessage && !isPartialResponse) {
-            RetransmissionInterceptor ri = new RetransmissionInterceptor();
-            ri.setManager(getManager());
-            // TODO:
-            // On the server side: If a fault occurs after this interceptor we will switch 
-            // interceptor chains (if this is not already a fault message) and therefore need to 
-            // make sure the retransmission interceptor is added to the fault chain
-            // 
-            msg.getInterceptorChain().add(ri);
-            LOG.fine("Added RetransmissionInterceptor to chain.");
-            
-            getManager().getRetransmissionQueue().start();
+            addRetransmissionInterceptor(msg);
         }
         
         RMProperties rmpsOut = RMContextUtils.retrieveRMProperties(msg, true);
@@ -139,8 +129,8 @@ public class RMOutInterceptor extends Ab
             ContextUtils.storeDeferUncorrelatedMessageAbort(msg);
         }
         
-        if ((isApplicationMessage || isLastMessage)
-            && !isPartialResponse) {
+        Map<?, ?> invocationContext = (Map<?, ?>)msg.get(Message.INVOCATION_CONTEXT);
+        if ((isApplicationMessage || (isLastMessage && invocationContext != null)) && !isPartialResponse) {
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
             }
@@ -150,7 +140,6 @@ public class RMOutInterceptor extends Ab
             synchronized (source) {
                 SourceSequence seq = null;
                 if (isLastMessage) {
-                    Map<?, ?> invocationContext = (Map<?, ?>)msg.get(Message.INVOCATION_CONTEXT);
                     seq = (SourceSequence)invocationContext.get(SourceSequence.class.getName());
                 } else {
                     seq = getManager().getSequence(inSeqId, msg, maps);
@@ -212,6 +201,20 @@ public class RMOutInterceptor extends Ab
         assertReliability(msg);
     }
 
+    private void addRetransmissionInterceptor(Message msg) {
+        RetransmissionInterceptor ri = new RetransmissionInterceptor();
+        ri.setManager(getManager());
+        // TODO:
+        // On the server side: If a fault occurs after this interceptor we will switch 
+        // interceptor chains (if this is not already a fault message) and therefore need to 
+        // make sure the retransmission interceptor is added to the fault chain
+        // 
+        msg.getInterceptorChain().add(ri);
+        LOG.fine("Added RetransmissionInterceptor to chain.");
+        
+        getManager().getRetransmissionQueue().start();
+    }
+
     private String getAddressingNamespace(AddressingProperties maps) {
         String wsaNamespace = maps.getNamespaceURI();
         if (wsaNamespace == null) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Mon Sep  9 11:34:58 2013
@@ -39,6 +39,8 @@ import org.apache.cxf.ws.addressing.Addr
 import org.apache.cxf.ws.addressing.ContextUtils;
 import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
 import org.apache.cxf.ws.rm.v200702.AcceptType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceResponseType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceType;
 import org.apache.cxf.ws.rm.v200702.Expires;
@@ -106,6 +108,8 @@ public class Servant implements Invoker 
             if (tsr != null) {
                 return Collections.singletonList(tsr);
             }
+        } else if (RM11Constants.INSTANCE.getCloseSequenceOperationName().equals(oi.getName())) {
+            return Collections.singletonList(closeSequence(exchange.getInMessage()));
         }
         
         return null;
@@ -290,6 +294,40 @@ public class Servant implements Invoker 
         return terminateResponse;
     }
 
+    public Object closeSequence(Message message) {
+        LOG.fine("Closing sequence");
+        
+        CloseSequenceType close = (CloseSequenceType)getParameter(message);
+        
+        // check if the terminated sequence was created in response to a a createSequence
+        // request
+        
+        Destination destination = reliableEndpoint.getDestination();
+        Identifier sid = close.getIdentifier();
+        DestinationSequence closedSeq = destination.getSequence(sid);
+        if (null == closedSeq) {
+            //  TODO
+            LOG.severe("No such sequence.");
+            return null;
+        } 
+        closedSeq.scheduleImmediateAcknowledgement();
+        closedSeq.setLastMessageNumber(close.getLastMsgNumber());
+        CloseSequenceResponseType closeResponse = new CloseSequenceResponseType();
+        closeResponse.setIdentifier(close.getIdentifier());
+        AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);        
+        Message outMessage = message.getExchange().getOutMessage();
+
+        if (null == outMessage) {
+            // outMessage may be null e.g. if ReplyTo is not set for TS 
+            outMessage = ContextUtils.createMessage(message.getExchange());
+            message.getExchange().setOutMessage(outMessage);
+        }
+        if (null != outMessage) {
+            RMContextUtils.storeMAPs(maps, outMessage, false, false);
+        }
+        return closeResponse;
+    }
+
     Object getParameter(Message message) {
         List<?> resList = null;
         // assert message == message.getExchange().getInMessage();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/VersionTransformer.java Mon Sep  9 11:34:58 2013
@@ -513,7 +513,7 @@ public final class VersionTransformer {
     }
     
     /**
-     * Convert 200502 Expires with 200508 WS-Addressing namespace to internal form.
+     * Convert 200502 OfferType with 200508 WS-Addressing namespace to internal form.
      * 
      * @param exposed (may be <code>null</code>)
      * @return converted (<code>null</code> if internal is <code>null</code>)

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java Mon Sep  9 11:34:58 2013
@@ -420,6 +420,9 @@ public class RMSoapInterceptor extends A
             isOneway = false;
         } else if (consts.getCloseSequenceAction().equals(action)) {
             boi = bi.getOperation(consts.getCloseSequenceOperationName()); 
+        } else if (RM11Constants.INSTANCE.getCloseSequenceResponseAction().equals(action)) {
+            boi = bi.getOperation(RM11Constants.INSTANCE.getCloseSequenceOperationName());
+            isOneway = false;
         }
         assert boi != null;
         exchange.put(BindingOperationInfo.class, boi);
@@ -434,7 +437,8 @@ public class RMSoapInterceptor extends A
         // 
         
         if (!consts.getCreateSequenceResponseAction().equals(action)
-            && !RM11Constants.INSTANCE.getTerminateSequenceResponseAction().equals(action)) {
+            && !RM11Constants.INSTANCE.getTerminateSequenceResponseAction().equals(action)
+            && !RM11Constants.INSTANCE.getCloseSequenceResponseAction().equals(action)) {
             LOG.fine("Changing requestor role from " + message.get(Message.REQUESTOR_ROLE)
                      + " to false");
             Object originalRequestorRole = message.get(Message.REQUESTOR_ROLE);

Modified: cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java?rev=1521053&r1=1521052&r2=1521053&view=diff
==============================================================================
--- cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java (original)
+++ cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java Mon Sep  9 11:34:58 2013
@@ -39,6 +39,7 @@ import org.apache.cxf.greeter_control.Gr
 import org.apache.cxf.management.InstrumentationManager;
 import org.apache.cxf.management.ManagementConstants;
 import org.apache.cxf.testutil.common.AbstractClientServerTestBase;
+import org.apache.cxf.ws.rm.RM11Constants;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMUtils;
 
@@ -51,8 +52,13 @@ import org.junit.Test;
  * 
  */
 public class ManagedEndpointsTest extends AbstractClientServerTestBase {
+
     public static final String PORT = allocatePort(ManagedEndpointsTest.class);
 
+    private static final String[] EMPTY_SIGNATURE = new String[0];
+    private static final String[] ONESTRING_SIGNATURE = new String[]{"java.lang.String"};
+    private static final String[] ONEBOOLEAN_SIGNATURE = new String[]{"boolean"};
+    
     private static final String SERVER_CFG = "/org/apache/cxf/systest/ws/rm/managed-server.xml"; 
     private static final String CLIENT_CFG = "/org/apache/cxf/systest/ws/rm/managed-client.xml"; 
         
@@ -107,14 +113,7 @@ public class ManagedEndpointsTest extend
     
     @Test
     public void testManagedEndpointsOneway() throws Exception {
-        checkServerReady(30000);
-        
-        SpringBusFactory bf = new SpringBusFactory();
-        clientBus = bf.createBus(CLIENT_CFG);
-        MessageLossSimulator mls = new MessageLossSimulator();
-        clientBus.getOutInterceptors().add(mls);
-        
-        BusFactory.setDefaultBus(clientBus);
+        prepareClient();
         
         RMManager clientManager = clientBus.getExtension(RMManager.class);
         RMManager serverManager = serverBus.getExtension(RMManager.class);
@@ -147,7 +146,7 @@ public class ManagedEndpointsTest extend
         ObjectName serverEndpointName = getEndpointName(mbs, serverManager);
         
         o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
         String sseqId = ((String[])o)[0];
         
@@ -162,11 +161,11 @@ public class ManagedEndpointsTest extend
         String dseqId = ((String[])o)[0];
 
         o = mbs.invoke(serverEndpointName, "getSourceSequenceIds", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         verifyArray("Expected sequence identifier", o, new String[]{dseqId}, false); 
         
         o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
 
         o = mbs.invoke(clientEndpointName, "getQueuedMessageCount",
@@ -177,31 +176,31 @@ public class ManagedEndpointsTest extend
         verifySourceSequence(o, sseqId, 1, 0);
 
         o = mbs.invoke(clientEndpointName, "getSourceSequences", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("One sequence message", o instanceof CompositeData[] && 1 == ((CompositeData[])o).length);
         verifySourceSequence(((CompositeData[])o)[0], sseqId, 1, 0);
 
         o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         verifyArray("Expected range", o, new Long[]{1L, 1L}, true);
         
         o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
         
         greeter.greetMeOneWay("two"); // getting lost
         greeter.greetMeOneWay("three"); // sent
         
         o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
 
         o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
         
         o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         assertTrue("One unacknowledged message", o instanceof Long[] && 1 == ((Long[])o).length);
                 
         o = mbs.invoke(clientEndpointName, "getRetransmissionStatus", 
@@ -209,7 +208,7 @@ public class ManagedEndpointsTest extend
         verifyRetransmissionStatus(o, 2L, 0);
 
         o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
 
         // 7 sec retry interval + 5 sec
@@ -217,33 +216,36 @@ public class ManagedEndpointsTest extend
         Thread.sleep(12000);
 
         o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
 
         o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
         
         o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
 
         o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
-
+        
+        mbs.invoke(clientEndpointName, "closeSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
+            new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("Source sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
+        
+        mbs.invoke(clientEndpointName, "terminateDestinationSequence", new Object[]{dseqId}, ONESTRING_SIGNATURE);
+        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", 
+            new Object[]{}, EMPTY_SIGNATURE);
+        assertTrue("Destination sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
+        
     }
     
     @Test
     public void testSuspendAndResumeSourceSequence() throws Exception {
-        checkServerReady(30000);
-        
-        SpringBusFactory bf = new SpringBusFactory();
-        clientBus = bf.createBus(CLIENT_CFG);
-        MessageLossSimulator mls = new MessageLossSimulator();
-        clientBus.getOutInterceptors().add(mls);
-        
-        BusFactory.setDefaultBus(clientBus);
+        prepareClient();
         
         RMManager clientManager = clientBus.getExtension(RMManager.class);
         
@@ -268,18 +270,18 @@ public class ManagedEndpointsTest extend
         String sseqId = (String)o;
 
         o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
-                       new Object[]{sseqId}, new String[]{"java.lang.String"});
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
         assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
 
         greeter.greetMeOneWay("two"); // sent but suspended
         greeter.greetMeOneWay("three"); // sent but suspended
 
         o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
 
         mbs.invoke(clientEndpointName, "suspendSourceQueue", 
-                   new Object[]{sseqId}, new String[]{"java.lang.String"});
+                   new Object[]{sseqId}, ONESTRING_SIGNATURE);
         LOG.info("suspended the source queue: " + sseqId);
         
 
@@ -288,21 +290,166 @@ public class ManagedEndpointsTest extend
         Thread.sleep(10000);
 
         o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
 
         mbs.invoke(clientEndpointName, "resumeSourceQueue", 
-                   new Object[]{sseqId}, new String[]{"java.lang.String"});
+                   new Object[]{sseqId}, ONESTRING_SIGNATURE);
         LOG.info("resumed the source queue: " + sseqId);
         
         LOG.info("waiting for 15 secs for the retry (resumed)...");
         Thread.sleep(15000);
 
         o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, new String[]{"boolean"});
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("No queued messages", o instanceof Integer && 0 == ((Integer)o).intValue());
     }
+    
+    @Test
+    public void testManagedEndpointsOneway12() throws Exception {
+        prepareClient();
+        
+        RMManager clientManager = clientBus.getExtension(RMManager.class);
+        RMManager serverManager = serverBus.getExtension(RMManager.class);
+        
+        InstrumentationManager serverIM = serverBus.getExtension(InstrumentationManager.class);
+        MBeanServer mbs = serverIM.getMBeanServer();
+        assertNotNull("MBeanServer must be available.", mbs);
+
+        ObjectName clientManagerName = RMUtils.getManagedObjectName(clientManager);
+        ObjectName serverManagerName = RMUtils.getManagedObjectName(serverManager);
+
+        Object o;
+        GreeterService gs = new GreeterService();
+        final Greeter greeter = gs.getGreeterPort();
+        updateAddressPort(greeter, ManagedEndpointsTest.PORT);
+        LOG.fine("Created greeter client.");
+
+        ClientProxy.getClient(greeter).getRequestContext().put(RMManager.WSRM_VERSION_PROPERTY,
+            RM11Constants.NAMESPACE_URI);
+
+        org.apache.cxf.endpoint.Endpoint ep = ClientProxy.getClient(greeter).getEndpoint();
+        String epId = RMUtils.getEndpointIdentifier(ep, clientBus);
+        greeter.greetMeOneWay("one"); // sent
+
+        o = mbs.invoke(clientManagerName, "getEndpointIdentifiers", null, null);
+        verifyArray("Expected endpoint identifier", o, new String[]{epId}, true);
+
+        o = mbs.invoke(serverManagerName, "getEndpointIdentifiers", null, null);
+        verifyArray("Expected endpoint identifier", o, new String[]{epId}, true);
+        
+        ObjectName clientEndpointName = RMUtils.getManagedObjectName(clientManager, ep);
+        // we need to find out serverEndpointName by using the query name
+        ObjectName serverEndpointName = getEndpointName(mbs, serverManager);
+        
+        o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
+        String sseqId = ((String[])o)[0];
+        
+        o = mbs.invoke(clientEndpointName, "getCurrentSourceSequenceId", null, null);
+        assertTrue("Expected sequence identifier", o instanceof String && sseqId.equals(o));
+        
+        o = mbs.invoke(serverEndpointName, "getDestinationSequenceIds", null, null);
+        verifyArray("Expected sequence identifier", o, new String[]{sseqId}, false); 
+        
+        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", null, null);
+        assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
+        String dseqId = ((String[])o)[0];
 
+        o = mbs.invoke(serverEndpointName, "getSourceSequenceIds", 
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        verifyArray("Expected sequence identifier", o, new String[]{dseqId}, false); 
+        
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
+
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageCount",
+                       new Object[]{sseqId, true}, new String[]{"java.lang.String", "boolean"});
+        assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
+
+        o = mbs.invoke(clientEndpointName, "getCurrentSourceSequence", null, null);
+        verifySourceSequence(o, sseqId, 1, 0);
+
+        o = mbs.invoke(clientEndpointName, "getSourceSequences", 
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("One sequence message", o instanceof CompositeData[] && 1 == ((CompositeData[])o).length);
+        verifySourceSequence(((CompositeData[])o)[0], sseqId, 1, 0);
+
+        o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        verifyArray("Expected range", o, new Long[]{1L, 1L}, true);
+        
+        o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
+        
+        greeter.greetMeOneWay("two"); // getting lost
+        greeter.greetMeOneWay("three"); // sent
+        
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
+
+        o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
+        
+        o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        assertTrue("One unacknowledged message", o instanceof Long[] && 1 == ((Long[])o).length);
+                
+        o = mbs.invoke(clientEndpointName, "getRetransmissionStatus", 
+                       new Object[]{sseqId, 2}, new String[]{"java.lang.String", "long"});
+        verifyRetransmissionStatus(o, 2L, 0);
+
+        o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange", 
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
+
+        // 7 sec retry interval + 5 sec
+        LOG.info("waiting for 12 secs for the retry to complete ...");
+        Thread.sleep(12000);
+
+        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
+
+        o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
+        
+        o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange", 
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
+
+        o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
+                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
+        
+        mbs.invoke(clientEndpointName, "closeSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
+            new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        
+        mbs.invoke(clientEndpointName, "terminateDestinationSequence", new Object[]{dseqId}, ONESTRING_SIGNATURE);
+        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", 
+            new Object[]{}, EMPTY_SIGNATURE);
+        assertTrue("Destination sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
+        
+    }
+    
+    private void prepareClient() {
+        checkServerReady(30000);
+        
+        SpringBusFactory bf = new SpringBusFactory();
+        clientBus = bf.createBus(CLIENT_CFG);
+        MessageLossSimulator mls = new MessageLossSimulator();
+        clientBus.getOutInterceptors().add(mls);
+        
+        BusFactory.setDefaultBus(clientBus);
+    }
+    
     private void checkServerReady(long max) {
         long waited = 0;
         while (waited < max) {