You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2011/08/23 17:41:22 UTC

svn commit: r1160748 [1/2] - in /cxf/trunk: rt/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/ rt/ws/rm/src/main/java/org/apache/...

Author: ay
Date: Tue Aug 23 15:41:21 2011
New Revision: 1160748

URL: http://svn.apache.org/viewvc?rev=1160748&view=rev
Log:
[CXF-3757] jmx instrumentaiton for ws-rm

Added:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java   (with props)
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java   (with props)
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java   (with props)
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java   (with props)
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-client.xml   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-server.xml   (with props)
Modified:
    cxf/trunk/rt/ws/rm/pom.xml
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java

Modified: cxf/trunk/rt/ws/rm/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/pom.xml?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/pom.xml (original)
+++ cxf/trunk/rt/ws/rm/pom.xml Tue Aug 23 15:41:21 2011
@@ -64,6 +64,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-management</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-common-utilities</artifactId>
             <version>${project.version}</version>
         </dependency>

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Tue Aug 23 15:41:21 2011
@@ -327,7 +327,7 @@ public class DestinationSequence extends
         if (null == store) {
             return;
         }
-        store.removeMessages(getIdentifier(), Collections.singleton(new Long(messageNr)), false);
+        store.removeMessages(getIdentifier(), Collections.singleton(messageNr), false);
     }
 
     /**
@@ -341,6 +341,10 @@ public class DestinationSequence extends
         return acknowledgeOnNextOccasion;
     }
     
+    List<DeferredAcknowledgment> getDeferredAcknowledgements() {
+        return deferredAcknowledgments;
+    }
+    
     /**
      * The correlation of the incoming CreateSequence call used to create this
      * sequence is recorded so that in the absence of an offer, the corresponding

Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Tue Aug 23 15:41:21 2011
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import org.apache.cxf.management.ManagedComponent;
+import org.apache.cxf.management.annotation.ManagedAttribute;
+import org.apache.cxf.management.annotation.ManagedOperation;
+import org.apache.cxf.management.annotation.ManagedOperationParameter;
+import org.apache.cxf.management.annotation.ManagedOperationParameters;
+import org.apache.cxf.management.annotation.ManagedResource;
+import org.apache.cxf.ws.rm.DestinationSequence.DeferredAcknowledgment;
+import org.apache.cxf.ws.rm.v200702.Identifier;
+import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange;
+
+/**
+ * The ManagedRMEndpoint is a JMX managed bean for RMEndpoint.
+ *
+ */
+@ManagedResource(componentName = "RMEndpoint", 
+                 description = "Responsible for Sources and Destinations.")
+public class ManagedRMEndpoint implements ManagedComponent {
+
+    private static final String[] SOURCE_SEQUENCE_NAMES = 
+    {"sequenceId", "currentMessageNumber", "expires", "lastMessage", "queuedMessageCount", 
+     "target"};
+    private static final String[] SOURCE_SEQUENCE_DESCRIPTIONS = SOURCE_SEQUENCE_NAMES;
+    private static final OpenType<?>[] SOURCE_SEQUENCE_TYPES =  
+    {SimpleType.STRING, SimpleType.LONG, SimpleType.DATE, SimpleType.BOOLEAN, SimpleType.INTEGER, 
+     SimpleType.STRING};
+    
+    private static final String[] DESTINATION_SEQUENCE_NAMES = 
+    {"sequenceId", "lastMessageNumber", "correlationId", "ackTo"};
+    private static final String[] DESTINATION_SEQUENCE_DESCRIPTIONS = DESTINATION_SEQUENCE_NAMES;
+    private static final OpenType<?>[] DESTINATION_SEQUENCE_TYPES =  
+    {SimpleType.STRING, SimpleType.LONG, SimpleType.STRING, 
+     SimpleType.STRING};
+
+    private static final String[] RETRANSMISSION_STATUS_NAMES = 
+    {"messageNumber", "resends", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
+    private static final String[] RETRANSMISSION_STATUS_DESCRIPTIONS = RETRANSMISSION_STATUS_NAMES;
+    private static final OpenType<?>[] RETRANSMISSION_STATUS_TYPES =  
+    {SimpleType.LONG, SimpleType.INTEGER, SimpleType.DATE, SimpleType.DATE, SimpleType.LONG, SimpleType.LONG, 
+     SimpleType.BOOLEAN, SimpleType.BOOLEAN};
+
+    private static CompositeType sourceSequenceType;
+    private static CompositeType destinationSequenceType;
+
+    private static CompositeType retransmissionStatusType;
+
+    private RMEndpoint endpoint;
+    
+
+    static {
+        try {
+            sourceSequenceType = new CompositeType("sourceSequence",
+                                                   "sourceSequence",
+                                                   SOURCE_SEQUENCE_NAMES,
+                                                   SOURCE_SEQUENCE_DESCRIPTIONS,
+                                                   SOURCE_SEQUENCE_TYPES);
+            
+            destinationSequenceType = new CompositeType("destinationSequence",
+                                                        "destinationSequence",
+                                                        DESTINATION_SEQUENCE_NAMES,
+                                                        DESTINATION_SEQUENCE_DESCRIPTIONS,
+                                                        DESTINATION_SEQUENCE_TYPES);
+
+            retransmissionStatusType = new CompositeType("retransmissionStatus",
+                                                         "retransmissionStatus",
+                                                         RETRANSMISSION_STATUS_NAMES,
+                                                         RETRANSMISSION_STATUS_DESCRIPTIONS,
+                                                         RETRANSMISSION_STATUS_TYPES);
+            
+        } catch (OpenDataException e) {
+            // ignore and handle it later
+        }
+    }
+    
+    public ManagedRMEndpoint(RMEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.cxf.management.ManagedComponent#getObjectName()
+     */
+    public ObjectName getObjectName() throws JMException {
+        return RMUtils.getManagedObjectName(endpoint);
+    }
+
+    @ManagedOperation(description = "Total Number of Queued Messages")
+    public int getQueuedMessageTotalCount() {
+        Source source = endpoint.getSource();
+        RetransmissionQueue queue = endpoint.getManager().getRetransmissionQueue();
+        int count = 0;
+        for (SourceSequence ss : source.getAllSequences()) {
+            count += queue.countUnacknowledged(ss);
+        }
+
+        return count;
+    }
+
+    @ManagedOperation(description = "Number of Queued Messages")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public int getQueuedMessageCount(String sid) {
+        RMManager manager = endpoint.getManager();
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new IllegalArgumentException("no sequence");
+        }
+
+        return manager.getRetransmissionQueue().countUnacknowledged(ss);
+    }
+
+    @ManagedOperation(description = "List of UnAcknowledged Message Numbers")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public Long[] getUnAcknowledgedMessageIdentifiers(String sid) {
+        RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        
+        List<Long> numbers = rq.getUnacknowledgedMessageNumbers(ss);
+        return numbers.toArray(new Long[numbers.size()]);
+    }
+
+    @ManagedOperation(description = "Total Number of Deferred Acknowledgements")
+    public int getDeferredAcknowledgementTotalCount() {
+        Destination destination = endpoint.getDestination();
+
+        int count = 0;
+        for (DestinationSequence ds : destination.getAllSequences()) {
+            List<DeferredAcknowledgment> das = ds.getDeferredAcknowledgements();
+            if (null != das) {
+                count += das.size();
+            }
+        }
+
+        return count;
+    }
+
+    @ManagedOperation(description = "Number of Deferred Acknowledgements")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public int getDeferredAcknowledgementCount(String sid) {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new IllegalArgumentException("no sequence");
+        }
+
+        return ds.getDeferredAcknowledgements().size();
+    }
+
+    @ManagedOperation(description = "Source Sequence Acknowledged Range")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public Long[] getSourceSequenceAcknowledgedRange(String sid) {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        
+        List<Long> list = new ArrayList<Long>();
+        
+        for (AcknowledgementRange r : ss.getAcknowledgement().getAcknowledgementRange()) {
+            list.add(r.getLower());
+            list.add(r.getUpper());
+        }
+        return list.toArray(new Long[list.size()]);
+    }
+
+    @ManagedOperation(description = "Destination Sequence Acknowledged Range")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public Long[] getDestinationSequenceAcknowledgedRange(String sid) {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        
+        List<Long> list = new ArrayList<Long>();
+        
+        for (AcknowledgementRange r : ds.getAcknowledgment().getAcknowledgementRange()) {
+            list.add(r.getLower());
+            list.add(r.getUpper());
+        }
+        return list.toArray(new Long[list.size()]);
+    }
+
+    @ManagedOperation(description = "Retransmission Status")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
+        @ManagedOperationParameter(name = "messageNumber", description = "The message number")
+    })
+    public CompositeData getRetransmissionStatus(String sid, long num) throws JMException {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+        RetransmissionStatus rs = rq.getRetransmissionStatus(ss, num);
+        return getRetransmissionStatusProperties(num, rs);
+    }
+
+    @ManagedOperation(description = "Retransmission Statuses")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+    })
+    public CompositeData[] getRetransmissionStatuses(String sid) throws JMException {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+        Map<Long, RetransmissionStatus> rsmap = rq.getRetransmissionStatuses(ss);
+
+        CompositeData[] rsps = new CompositeData[rsmap.size()];
+        int i = 0;
+        for (Map.Entry<Long, RetransmissionStatus> rs : rsmap.entrySet()) {
+            rsps[i++] = getRetransmissionStatusProperties(rs.getKey(), rs.getValue());
+        }
+        return rsps;
+    }
+
+    @ManagedOperation(description = "List of Source Sequence IDs")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "expired", description = "The expired sequences included") 
+    })
+    public String[] getSourceSequenceIds(boolean expired) {
+        Source source = endpoint.getSource();
+        List<String> list = new ArrayList<String>();
+        for (SourceSequence ss : source.getAllSequences()) {
+            if (expired || !ss.isExpired()) {
+                list.add(ss.getIdentifier().getValue());
+            }
+        }
+        return list.toArray(new String[list.size()]);
+    }
+    
+    
+    @ManagedOperation(description = "List of Destination Sequence IDs")
+    public String[] getDestinationSequenceIds() {
+        Destination destination = endpoint.getDestination();
+        List<String> list = new ArrayList<String>();
+        for (DestinationSequence ds : destination.getAllSequences()) {
+            list.add(ds.getIdentifier().getValue());
+        }
+        return list.toArray(new String[list.size()]);
+    }
+    
+    
+    @ManagedOperation(description = "Suspend Retransmission Queue")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public void suspendSourceQueue(String sid) throws JMException {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+        rq.suspend(ss);
+    }
+    
+    @ManagedOperation(description = "Resume Retransmission Queue")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public void resumeSourceQueue(String sid) throws JMException {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new JMException("no source sequence");
+        }
+        RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+        rq.resume(ss);
+    }
+    
+    @ManagedOperation(description = "Current Source Sequence Properties")
+    public CompositeData getCurrentSourceSequence() throws JMException {
+        Source source = endpoint.getSource();
+        SourceSequence ss = source.getCurrent();
+
+        return getSourceSequenceProperties(ss);
+    }
+
+    @ManagedOperation(description = "Current Source Sequence Identifier")
+    public String getCurrentSourceSequenceId() throws JMException {
+        Source source = endpoint.getSource();
+        SourceSequence ss = source.getCurrent();
+
+        if (null == ss) {
+            throw new JMException("no source sequence");
+        }
+        
+        return ss.getIdentifier().getValue();
+    }
+
+    @ManagedOperation(description = "Source Sequence Properties")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public CompositeData getSourceSequence(String sid) throws JMException {
+        SourceSequence ss = getSourceSeq(sid);
+        
+        return getSourceSequenceProperties(ss);
+    }
+    
+    @ManagedOperation(description = "Source Sequences Properties")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "expired", description = "The expired sequences included") 
+    })
+    public CompositeData[] getSourceSequences(boolean expired) throws JMException {
+        List<CompositeData> sps = new ArrayList<CompositeData>();
+        
+        Source source = endpoint.getSource();
+        for (SourceSequence ss : source.getAllSequences()) {
+            if (expired || !ss.isExpired()) {
+                sps.add(getSourceSequenceProperties(ss));
+            }
+        }
+        
+        return sps.toArray(new CompositeData[sps.size()]);
+    }
+    
+    
+    @ManagedOperation(description = "Destination Sequence Properties")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier") 
+    })
+    public CompositeData getDestinationSequence(String sid) throws JMException {
+        DestinationSequence ds = getDestinationSeq(sid);
+        
+        return getDestinationSequenceProperties(ds);
+    }
+    
+    @ManagedOperation(description = "Destination Sequences Properties")
+    public CompositeData[] getDestinationSequences() throws JMException {
+        List<CompositeData> sps = new ArrayList<CompositeData>();
+        
+        Destination destination = endpoint.getDestination();
+        for (DestinationSequence ds : destination.getAllSequences()) {
+            sps.add(getDestinationSequenceProperties(ds));
+        }
+        
+        return sps.toArray(new CompositeData[sps.size()]);
+    }
+
+    private SourceSequence getSourceSeq(String sid) {
+        Source source = endpoint.getSource();
+        Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
+        identifier.setValue(sid);
+        return source.getSequence(identifier);
+    }
+    
+    private DestinationSequence getDestinationSeq(String sid) {
+        Destination destination = endpoint.getDestination();
+        Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
+        identifier.setValue(sid);
+        return destination.getSequence(identifier);
+    }
+
+    private CompositeData getSourceSequenceProperties(SourceSequence ss) throws JMException {
+        if (null == ss) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RMManager manager = endpoint.getManager();
+        Object[] ssv = new Object[]{ss.getIdentifier().getValue(), ss.getCurrentMessageNr(), 
+                                    ss.getExpires(), ss.isLastMessage(),
+                                    manager.getRetransmissionQueue().countUnacknowledged(ss),
+                                    ss.getTarget().getAddress().getValue()};
+        
+        CompositeData ssps = new CompositeDataSupport(sourceSequenceType, 
+                                                      SOURCE_SEQUENCE_NAMES, ssv);
+        return ssps;
+    }
+    
+    private CompositeData getDestinationSequenceProperties(DestinationSequence ds) throws JMException {
+        if (null == ds) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        Object[] dsv = new Object[]{ds.getIdentifier().getValue(), ds.getLastMessageNumber(), 
+                                    ds.getCorrelationID(),
+                                    ds.getAcksTo().getAddress().getValue()};
+        
+        CompositeData dsps = new CompositeDataSupport(destinationSequenceType, 
+                                                      DESTINATION_SEQUENCE_NAMES, dsv);
+        return dsps;
+    }
+    
+    private CompositeData getRetransmissionStatusProperties(long num, 
+                                                            RetransmissionStatus rs) throws JMException {
+        CompositeData rsps = null;
+        if (null != rs) {
+            Object[] rsv = new Object[] {num, rs.getResends(), rs.getPrevious(), 
+                                         rs.getNext(), rs.getNextInterval(),
+                                         rs.getBackoff(), rs.isPending(), rs.isSuspended()};
+            rsps = new CompositeDataSupport(retransmissionStatusType,
+                                            RETRANSMISSION_STATUS_NAMES, rsv);
+        }
+        return rsps;
+    }
+
+    @ManagedAttribute(description = "Address Attribute", currencyTimeLimit = 60)
+    public String getAddress() {
+        return endpoint.getApplicationEndpoint().getEndpointInfo().getAddress();
+    }
+
+    //Not relevant unless ws-rm is used for non-http protocols
+//    @ManagedAttribute(description = "TransportId Attribute", currencyTimeLimit = 60)
+//    public String getTransportId() {
+//        return endpoint.getApplicationEndpoint().getEndpointInfo().getTransportId();
+//    }
+
+    @ManagedAttribute(description = "Application Message Last Received", currencyTimeLimit = 60)
+    public Date getLastApplicationMessage() {
+        return endpoint.getLastApplicationMessage() == 0L ? null 
+            : new Date(endpoint.getLastApplicationMessage());
+    }
+    
+    @ManagedAttribute(description = "Protocol Message Last Received", currencyTimeLimit = 60)
+    public Date getLastControlMessage() {
+        return endpoint.getLastControlMessage() == 0L ? null 
+            : new Date(endpoint.getLastControlMessage());
+    }
+    
+}

Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
------------------------------------------------------------------------------
    svn:executable = *

Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java Tue Aug 23 15:41:21 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.JMException;
+import javax.management.ObjectName;
+
+//import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.management.ManagedComponent;
+import org.apache.cxf.management.annotation.ManagedAttribute;
+import org.apache.cxf.management.annotation.ManagedOperation;
+import org.apache.cxf.management.annotation.ManagedResource;
+
+/**
+ * The ManagedRMManager is a JMX managed bean for RMManager.
+ *
+ */
+@ManagedResource(componentName = "RMManager", 
+                 description = "Responsible for managing RMEndpoints.")
+public class ManagedRMManager implements ManagedComponent {
+
+    private RMManager manager;
+    
+    public ManagedRMManager(RMManager manager) {
+        this.manager = manager;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.cxf.management.ManagedComponent#getObjectName()
+     */
+    public ObjectName getObjectName() throws JMException {
+        return RMUtils.getManagedObjectName(manager);
+    }
+
+    @ManagedOperation       
+    public void shutdown() {
+        manager.shutdown();
+    }
+    
+    @ManagedOperation
+    public String[] getEndpointIdentifiers() {
+        Set<String> identifiers = new HashSet<String>();
+        //FIXME find this method for 2.5
+//        for (Endpoint ep : manager.getReliableEndpointsMap().keySet()) {
+        for (Endpoint ep : getReliableEndpointsMap().keySet()) {
+            identifiers.add(RMUtils.getEndpointIdentifier(ep));
+        }
+        return identifiers.toArray(new String[identifiers.size()]);
+    }
+
+    //TODO see the comment above
+    private Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
+        Map<Endpoint, RMEndpoint> epmap = new HashMap<Endpoint, RMEndpoint>();
+        for (ProtocolVariation pv : manager.getEndpointMaps().keySet()) {
+            epmap.putAll(manager.getEndpointMaps().get(pv));
+        }
+        return epmap;
+    }
+    
+    @ManagedAttribute(description = "Using Store")
+    public boolean isUsingStore() {
+        return manager.getStore() != null;
+    }
+}

Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Tue Aug 23 15:41:21 2011
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.management.JMException;
 import javax.wsdl.extensions.ExtensibilityElement;
 import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
@@ -42,6 +43,7 @@ import org.apache.cxf.databinding.DataBi
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.jaxb.JAXBDataBinding;
+import org.apache.cxf.management.InstrumentationManager;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.factory.ServiceConstructionException;
@@ -97,7 +99,9 @@ public class RMEndpoint {
     private QName interfaceQName;
     private long lastApplicationMessage;
     private long lastControlMessage;
-
+    private InstrumentationManager instrumentationManager;
+    private ManagedRMEndpoint managedEndpoint;
+    
     /**
      * Constructor.
      * 
@@ -260,6 +264,17 @@ public class RMEndpoint {
         createService();
         createEndpoint(d);
         setPolicies();
+        if (manager != null && manager.getBus() != null) {
+            managedEndpoint = new ManagedRMEndpoint(this);
+            instrumentationManager = manager.getBus().getExtension(InstrumentationManager.class);        
+            if (instrumentationManager != null) {   
+                try {
+                    instrumentationManager.register(managedEndpoint);
+                } catch (JMException jmex) {
+                    LOG.log(Level.WARNING, "Registering ManagedRMEndpoint failed.", jmex);
+                }
+            }
+        }
     }
 
     void createService() {
@@ -645,6 +660,8 @@ public class RMEndpoint {
         for (SourceSequence ss : getSource().getAllSequences()) {
             manager.getRetransmissionQueue().stop(ss);
         }
+
+        // unregistering of this managed bean from the server is done by the bus itself
     }
 
     class EffectivePolicyImpl implements EffectivePolicy {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Tue Aug 23 15:41:21 2011
@@ -30,6 +30,7 @@ import java.util.logging.Logger;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
+import javax.management.JMException;
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
@@ -42,6 +43,7 @@ import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.endpoint.ServerLifeCycleListener;
 import org.apache.cxf.endpoint.ServerLifeCycleManager;
+import org.apache.cxf.management.InstrumentationManager;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -100,6 +102,8 @@ public class RMManager {
     private DeliveryAssuranceType deliveryAssurance;
     private SourcePolicyType sourcePolicy;
     private DestinationPolicyType destinationPolicy;
+    private InstrumentationManager instrumentationManager;
+    private ManagedRMManager managedManager;
     private String rmNamespace = RM10Constants.NAMESPACE_URI;
     private String rmAddressingNamespace = Names200408.WSA_NAMESPACE_NAME;
     
@@ -466,6 +470,8 @@ public class RMManager {
             t.purge();
             t.cancel();
         }
+
+        // unregistring of this managed bean from the server is done by the bus itself
     }
     
     synchronized void shutdownReliableEndpoint(Endpoint e) {
@@ -623,6 +629,17 @@ public class RMManager {
         if (null == idGenerator) {
             idGenerator = new DefaultSequenceIdentifierGenerator();
         }
+        if (null != bus) {
+            managedManager = new ManagedRMManager(this);
+            instrumentationManager = bus.getExtension(InstrumentationManager.class);        
+            if (instrumentationManager != null) {   
+                try {
+                    instrumentationManager.register(managedManager);
+                } catch (JMException jmex) {
+                    LOG.log(Level.WARNING, "Registering ManagedRMManager failed.", jmex);
+                }
+            }
+        }
     }
     
     @PostConstruct

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java Tue Aug 23 15:41:21 2011
@@ -21,8 +21,13 @@ package org.apache.cxf.ws.rm;
 
 import java.io.OutputStream;
 
+import javax.management.JMException;
+import javax.management.ObjectName;
+
+import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.io.WriteOnCloseOutputStream;
+import org.apache.cxf.management.ManagementConstants;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.addressing.AddressingConstants;
 import org.apache.cxf.ws.addressing.AddressingConstantsImpl;
@@ -124,4 +129,40 @@ public final class RMUtils {
         }
         return (WriteOnCloseOutputStream) os;
     }
+    
+    public static ObjectName getManagedObjectName(RMManager manager) throws JMException {
+        StringBuilder buffer = new StringBuilder();
+        writeTypeProperty(buffer, manager.getBus(), "WSRM.Manager");
+        return new ObjectName(buffer.toString());
+    }
+    
+    public static ObjectName getManagedObjectName(RMEndpoint endpoint) throws JMException {
+        StringBuilder buffer = new StringBuilder();
+        writeTypeProperty(buffer, endpoint.getManager().getBus(), "WSRM.Endpoint");
+        Endpoint ep = endpoint.getApplicationEndpoint();
+        writeEndpointProperty(buffer, ep);
+        return new ObjectName(buffer.toString());
+    }
+    
+    public static ObjectName getManagedObjectName(RMManager manager, Endpoint ep) throws JMException {
+        StringBuilder buffer = new StringBuilder();
+        writeTypeProperty(buffer, manager.getBus(), "WSRM.Endpoint");
+        writeEndpointProperty(buffer, ep);
+        return new ObjectName(buffer.toString());
+    }
+
+    private static void writeTypeProperty(StringBuilder buffer, Bus bus, String type) {
+        String busId = bus.getId();
+        buffer.append(ManagementConstants.DEFAULT_DOMAIN_NAME + ":");
+        buffer.append(ManagementConstants.BUS_ID_PROP + "=" + busId + ",");
+        buffer.append(ManagementConstants.TYPE_PROP + "=" + type);
+    }
+
+    private static void writeEndpointProperty(StringBuilder buffer, Endpoint ep) {
+        String serviceName = ObjectName.quote(ep.getService().getName().toString());
+        buffer.append(",");
+        buffer.append(ManagementConstants.SERVICE_NAME_PROP + "=" + serviceName + ",");
+        String endpointName = ObjectName.quote(ep.getEndpointInfo().getName().toString());
+        buffer.append(ManagementConstants.PORT_NAME_PROP + "=" + endpointName);
+    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Tue Aug 23 15:41:21 2011
@@ -19,6 +19,9 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.util.List;
+import java.util.Map;
+
 import org.apache.cxf.message.Message;
 
 public interface RetransmissionQueue {
@@ -39,8 +42,8 @@ public interface RetransmissionQueue {
     boolean isEmpty();
     
     /**
-     * Accepts a new context for posible future retransmission. 
-     * @param ctx the message context.
+     * Accepts a new message for possible future retransmission. 
+     * @param message the message context.
      */
     void addUnacknowledged(Message message);
     
@@ -52,15 +55,47 @@ public interface RetransmissionQueue {
     void purgeAcknowledged(SourceSequence seq);
     
     /**
+     * 
+     * @param seq
+     * @return
+     */
+    List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq);
+    
+    /**
+     * Returns the retransmission status for the specified message.
+     * @param seq
+     * @param num
+     * @return
+     */
+    RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num);
+    
+    /**
+     * Return the retransmission status of all the messages assigned to the sequence.
+     * @param seq
+     * @return
+     */
+    Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq);
+        
+    /**
      * Initiate resends.
-     *
      */
     void start();
     
     /**
      * Stops retransmission queue.
+     * @param seq
      */
     void stop(SourceSequence seq);
     
-    
+    /**
+     * Suspends the retransmission attempts for the specified sequence
+     * @param seq
+     */
+    void suspend(SourceSequence seq);
+
+    /**
+     * Resumes the retransmission attempts for the specified sequence
+     * @param seq
+     */
+    void resume(SourceSequence seq);
 }

Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java Tue Aug 23 15:41:21 2011
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.Date;
+
+/**
+ * 
+ */
+public interface RetransmissionStatus {
+    /**
+     * @return the next transmission time
+     */
+    Date getNext();
+
+    /**
+     * @return the previous transmission time
+     */
+    Date getPrevious();
+    
+    /**
+     * @return the resends
+     */
+    int getResends();
+    
+    /**
+     * @return the nextInterval
+     */
+    long getNextInterval();
+    
+    /**
+     * @return the backoff
+     */
+    long getBackoff();
+    
+    /**
+     * @return the pending
+     */
+    boolean isPending();
+    
+    /**
+     * @return the suspended
+     */
+    boolean isSuspended();
+}

Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java Tue Aug 23 15:41:21 2011
@@ -42,6 +42,20 @@ public interface RMStore {
      * @param seq the sequence
      */
     void createDestinationSequence(DestinationSequence seq);
+    
+    /**
+     * Retrieve the source sequence with the specified identifier from persistent store. 
+     * @param seq the sequence
+     * @return the sequence if present; otherwise null
+     */
+    SourceSequence getSourceSequence(Identifier seq, ProtocolVariation protocol);
+    
+    /**
+     * Retrieve the destination sequence with the specified identifier from persistent store. 
+     * @param seq the sequence
+     * @return the sequence if present; otherwise null
+     */
+    DestinationSequence getDestinationSequence(Identifier seq, ProtocolVariation protocol);
 
     /**
      * Remove the source sequence with the specified identifier from persistent store. 

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Tue Aug 23 15:41:21 2011
@@ -101,6 +101,12 @@ public class RMTxStore implements RMStor
         = "INSERT INTO {0} VALUES(?, ?, ?, ?)";
     private static final String DELETE_MESSAGE_STMT_STR =
         "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
+    private static final String SELECT_DEST_SEQUENCE_STMT_STR =
+        "SELECT ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+        + "WHERE SEQ_ID = ?";
+    private static final String SELECT_SRC_SEQUENCE_STMT_STR =
+        "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CXF_RM_SRC_SEQUENCES "
+        + "WHERE SEQ_ID = ?";
     private static final String SELECT_DEST_SEQUENCES_STMT_STR =
         "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
         + "WHERE ENDPOINT_ID = ?";
@@ -126,6 +132,8 @@ public class RMTxStore implements RMStor
     private PreparedStatement updateSrcSequenceStmt;
     private PreparedStatement selectDestSequencesStmt;
     private PreparedStatement selectSrcSequencesStmt;
+    private PreparedStatement selectDestSequenceStmt;
+    private PreparedStatement selectSrcSequenceStmt;
     private PreparedStatement createInboundMessageStmt;
     private PreparedStatement createOutboundMessageStmt;
     private PreparedStatement deleteInboundMessageStmt;
@@ -237,6 +245,68 @@ public class RMTxStore implements RMStor
         }
     }
     
+    public DestinationSequence getDestinationSequence(Identifier sid, ProtocolVariation protocol) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.info("Getting destination sequence for id: " + sid);
+        }
+        try {
+            if (null == selectDestSequenceStmt) {
+                selectDestSequenceStmt = 
+                    connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);               
+            }
+            selectDestSequenceStmt.setString(1, sid.getValue());
+            
+            ResultSet res = selectDestSequenceStmt.executeQuery(); 
+            if (res.next()) {
+                EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));  
+                long lm = res.getLong(2);
+                InputStream is = res.getBinaryStream(3); 
+                SequenceAcknowledgement ack = null;
+                if (null != is) {
+                    ack = PersistenceUtils.getInstance()
+                        .deserialiseAcknowledgment(is); 
+                }
+                return new DestinationSequence(sid, acksTo, lm, ack, protocol);
+            }
+        } catch (SQLException ex) {
+            LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(), ex);
+        }
+        return null;
+    }
+    
+    public SourceSequence getSourceSequence(Identifier sid, ProtocolVariation protocol) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.info("Getting source sequences for id: " + sid);
+        }
+        try {
+            if (null == selectSrcSequenceStmt) {
+                selectSrcSequenceStmt = 
+                    connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);     
+            }
+            selectSrcSequenceStmt.setString(1, sid.getValue());
+            ResultSet res = selectSrcSequenceStmt.executeQuery();
+            
+            if (res.next()) {
+                long cmn = res.getLong(1);
+                boolean lm = res.getBoolean(2);
+                long lval = res.getLong(3);
+                Date expiry = 0 == lval ? null : new Date(lval);
+                String oidValue = res.getString(4);
+                Identifier oi = null;
+                if (null != oidValue) {
+                    oi = RMUtils.getWSRMFactory().createIdentifier();
+                    oi.setValue(oidValue);
+                }                            
+                return new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
+                          
+            }
+        } catch (SQLException ex) {
+            // ignore
+            LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG", LOG).toString(), ex);
+        }
+        return null;
+    }
+
     public void removeDestinationSequence(Identifier sid) {
         try {
             beginTransaction();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue Aug 23 15:41:21 2011
@@ -62,6 +62,7 @@ import org.apache.cxf.ws.rm.RMProperties
 import org.apache.cxf.ws.rm.RMUtils;
 import org.apache.cxf.ws.rm.RetransmissionCallback;
 import org.apache.cxf.ws.rm.RetransmissionQueue;
+import org.apache.cxf.ws.rm.RetransmissionStatus;
 import org.apache.cxf.ws.rm.SourceSequence;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RM10PolicyUtils;
@@ -76,7 +77,10 @@ public class RetransmissionQueueImpl imp
 
     private static final Logger LOG = LogUtils.getL7dLogger(RetransmissionQueueImpl.class);
 
-    private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
+    private Map<String, List<ResendCandidate>> candidates = 
+        new HashMap<String, List<ResendCandidate>>();
+    private Map<String, List<ResendCandidate>> suspendedCandidates = 
+        new HashMap<String, List<ResendCandidate>>();
     private Resender resender;
     private RMManager manager;
 
@@ -149,6 +153,52 @@ public class RetransmissionQueueImpl imp
         }
     }
 
+    public List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq) {
+        List<Long> unacknowledged = new ArrayList<Long>();
+        List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+        if (null != sequenceCandidates) {
+            for (int i = 0; i < sequenceCandidates.size(); i++) {
+                ResendCandidate candidate = sequenceCandidates.get(i);
+                RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+                                                                              true);
+                SequenceType st = properties.getSequence();
+                unacknowledged.add(st.getMessageNumber());
+            }
+        }
+        return unacknowledged;
+    }
+    
+    public RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num) {
+        List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+        if (null != sequenceCandidates) {
+            for (int i = 0; i < sequenceCandidates.size(); i++) {
+                ResendCandidate candidate = sequenceCandidates.get(i);
+                RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+                                                                              true);
+                SequenceType st = properties.getSequence();
+                if (num == st.getMessageNumber()) {
+                    return candidate;
+                }
+            }
+        }
+        return null;
+    }
+    
+    public Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq) {
+        Map<Long, RetransmissionStatus> cp = new HashMap<Long, RetransmissionStatus>();
+        List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+        if (null != sequenceCandidates) {
+            for (int i = 0; i < sequenceCandidates.size(); i++) {
+                ResendCandidate candidate = sequenceCandidates.get(i);
+                RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+                                                                              true);
+                SequenceType st = properties.getSequence();
+                cp.put(st.getMessageNumber(), candidate);
+            }
+        }
+        return cp;
+    }
+
     /**
      * Initiate resends.
      */
@@ -182,6 +232,36 @@ public class RetransmissionQueueImpl imp
     void stop() {
         
     }
+    
+    public void suspend(SourceSequence seq) {
+        synchronized (this) {
+            String key = seq.getIdentifier().getValue();
+            List<ResendCandidate> sequenceCandidates = candidates.remove(key);
+            if (null != sequenceCandidates) {
+                for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+                    ResendCandidate candidate = sequenceCandidates.get(i);
+                    candidate.suspend();
+                }
+                suspendedCandidates.put(key, sequenceCandidates);
+                LOG.log(Level.FINE, "Suspended resends for sequence {0}.", key);
+            }
+        }
+    }
+    
+    public void resume(SourceSequence seq) {
+        synchronized (this) {
+            String key = seq.getIdentifier().getValue();
+            List<ResendCandidate> sequenceCandidates = suspendedCandidates.remove(key);
+            if (null != sequenceCandidates) {
+                for (int i = 0; i < sequenceCandidates.size(); i++) {
+                    ResendCandidate candidate = sequenceCandidates.get(i);
+                    candidate.resume();
+                }
+                candidates.put(key, sequenceCandidates);
+                LOG.log(Level.FINE, "Resumed resends for sequence {0}.", key);
+            }           
+        }
+    }
 
     /**
      * @return the exponential backoff
@@ -219,6 +299,9 @@ public class RetransmissionQueueImpl imp
                 candidates.put(key, sequenceCandidates);
             }
             candidate = new ResendCandidate(message);
+            if (isSequenceSuspended(key)) {
+                candidate.suspend();
+            }
             sequenceCandidates.add(candidate);
         }
         LOG.fine("Cached unacknowledged message.");
@@ -248,7 +331,20 @@ public class RetransmissionQueueImpl imp
      * @pre called with mutex held
      */
     protected List<ResendCandidate> getSequenceCandidates(String key) {
-        return candidates.get(key);
+        List<ResendCandidate> sc = candidates.get(key);
+        if (null == sc) {
+            sc = suspendedCandidates.get(key);
+        }
+        return sc;
+    }
+    
+    /**
+     * @param key the sequence identifier under consideration
+     * @return true if the sequence is currently suspended; false otherwise
+     * @pre called with mutex held
+     */
+    protected boolean isSequenceSuspended(String key) {
+        return suspendedCandidates.containsKey(key);
     }
 
     private void clientResend(Message message) {
@@ -373,7 +469,7 @@ public class RetransmissionQueueImpl imp
     /**
      * Represents a candidate for resend, i.e. an unacked outgoing message.
      */
-    protected class ResendCandidate implements Runnable {
+    protected class ResendCandidate implements Runnable, RetransmissionStatus {
         private Message message;
         private OutputStream out;
         private Date next;
@@ -382,6 +478,7 @@ public class RetransmissionQueueImpl imp
         private long nextInterval;
         private long backoff;
         private boolean pending;
+        private boolean suspended;
         private boolean includeAckRequested;
 
         /**
@@ -458,21 +555,43 @@ public class RetransmissionQueueImpl imp
         /**
          * @return number of resend attempts
          */
-        protected int getResends() {
+        public int getResends() {
             return resends;
         }
 
         /**
          * @return date of next resend
          */
-        protected Date getNext() {
+        public Date getNext() {
             return next;
         }
 
         /**
+         * @return date of previous resend or null if no attempt is yet taken 
+         */
+        public Date getPrevious() {
+            if (resends > 0) {
+                return new Date(next.getTime() - nextInterval / backoff);
+            }
+            return null;
+        }
+
+        public long getNextInterval() {
+            return nextInterval;
+        }
+
+        public long getBackoff() {
+            return backoff;
+        }
+
+        public boolean isSuspended() {
+            return suspended;
+        }
+
+        /**
          * @return if resend attempt is pending
          */
-        protected synchronized boolean isPending() {
+        public synchronized boolean isPending() {
             return pending;
         }
 
@@ -498,13 +617,29 @@ public class RetransmissionQueueImpl imp
             }
         }
 
+        protected void suspend() {
+            suspended = true;
+            pending = false;
+            //TODO release the message and later reload it upon resume
+            //cancel();
+            if (null != nextTask) {
+                nextTask.cancel();
+            }
+        }
+
+        protected void resume() {
+            suspended = false;
+            next = new Date(System.currentTimeMillis());
+            attempted();
+        }
+
         private void releaseSavedMessage() {
-            CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            CachedOutputStream saved = (CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT);
             if (saved != null) {
                 saved.releaseTempFileHold();
             }
-
         }
+
         /**
          * @return associated message context
          */

Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Tue Aug 23 15:41:21 2011
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.xml.namespace.QName;
+
+import junit.framework.Assert;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.binding.soap.model.SoapBindingInfo;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.WSDLConstants;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.EndpointImpl;
+import org.apache.cxf.management.InstrumentationManager;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.Service;
+import org.apache.cxf.service.ServiceImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.service.model.ServiceInfo;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.rm.v200702.Identifier;
+import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
+import org.easymock.classextension.EasyMock;
+import org.easymock.classextension.IMocksControl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ManagedRMManagerTest extends Assert {
+    private static final String TEST_URI = "http://nowhere.com/bar/foo";
+    private IMocksControl control;
+        
+    private Bus bus;
+    private InstrumentationManager im;
+    private RMManager manager;
+    private Endpoint endpoint;
+    
+    @Before
+    public void setUp() {
+        control = EasyMock.createNiceControl();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        if (bus != null) {
+            bus.shutdown(true);
+        }
+        control.verify();
+    }
+
+    @Test
+    public void testManagedRMManager() throws Exception {
+        final SpringBusFactory factory = new SpringBusFactory();
+        bus =  factory.createBus("org/apache/cxf/ws/rm/managed-manager-bean.xml");
+        im = bus.getExtension(InstrumentationManager.class);
+        manager = bus.getExtension(RMManager.class);
+        endpoint = createTestEndpoint();
+        assertTrue("Instrumentation Manager should not be null", im != null);
+        assertTrue("RMManager should not be null", manager != null);
+                
+        MBeanServer mbs = im.getMBeanServer();
+        assertNotNull("MBeanServer should be available.", mbs);
+
+        ObjectName managerName = RMUtils.getManagedObjectName(manager);
+        Set<ObjectInstance> mbset = mbs.queryMBeans(managerName, null);
+        assertEquals("ManagedRMManager should be found", 1, mbset.size());
+
+        Object o;
+        o = mbs.getAttribute(managerName, "UsingStore");
+        assertTrue(o instanceof Boolean);
+        assertFalse("Store attribute is false", (Boolean)o);
+        
+        o = mbs.invoke(managerName, "getEndpointIdentifiers", null, null);
+        assertTrue(o instanceof String[]);
+        assertEquals("No Endpoint", 0, ((String[])o).length);
+    
+        RMEndpoint rme = createTestRMEndpoint();
+        
+        ObjectName endpointName = RMUtils.getManagedObjectName(rme);
+        mbset = mbs.queryMBeans(endpointName, null);
+        assertEquals("ManagedRMEndpoint should be found", 1, mbset.size());
+        
+        o = mbs.invoke(managerName, "getEndpointIdentifiers", null, null);
+        assertEquals("One Endpoint", 1, ((String[])o).length);
+        assertEquals("Endpoint identifier must match", 
+                     RMUtils.getEndpointIdentifier(endpoint), ((String[])o)[0]);
+
+        // test some endpoint methods
+        o = mbs.getAttribute(endpointName, "Address");
+        assertTrue(o instanceof String);
+        assertEquals("Endpoint address must match", TEST_URI, o);
+        
+        o = mbs.getAttribute(endpointName, "LastApplicationMessage");
+        assertNull(o);
+
+        o = mbs.getAttribute(endpointName, "LastControlMessage");
+        assertNull(o);
+        
+        o = mbs.invoke(endpointName, "getDestinationSequenceIds", null, null);
+        assertTrue(o instanceof String[]);
+        assertEquals("No sequence", 0, ((String[])o).length);
+
+        o = mbs.invoke(endpointName, "getDestinationSequences", null, null);
+        assertTrue(o instanceof CompositeData[]);
+        assertEquals("No sequence", 0, ((CompositeData[])o).length);
+
+        o = mbs.invoke(endpointName, "getSourceSequenceIds", new Object[]{true}, new String[]{"boolean"});
+        assertTrue(o instanceof String[]);
+        assertEquals("No sequence", 0, ((String[])o).length);
+
+        o = mbs.invoke(endpointName, "getSourceSequences", new Object[]{true}, new String[]{"boolean"});
+        assertTrue(o instanceof CompositeData[]);
+        assertEquals("No sequence", 0, ((CompositeData[])o).length);
+        
+        o = mbs.invoke(endpointName, "getDeferredAcknowledgementTotalCount", null, null);
+        assertTrue(o instanceof Integer);
+        assertEquals("No deferred acks", 0, o);
+        
+        o = mbs.invoke(endpointName, "getQueuedMessageTotalCount", null, null);
+        assertTrue(o instanceof Integer);
+        assertEquals("No queued messages", 0, o);
+    }
+    
+    @Test
+    public void testManagedRMEndpointGetQueuedCount() throws Exception {
+        ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+
+        int n = managedEndpoint.getQueuedMessageTotalCount();
+        assertEquals(3, n);
+        
+        n = managedEndpoint.getQueuedMessageCount("seq1");
+        assertEquals(2, n);
+    }
+    
+    @Test
+    public void testGetUnAcknowledgedMessageIdentifiers() throws Exception {
+        ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+        
+        Long[] numbers = managedEndpoint.getUnAcknowledgedMessageIdentifiers("seq1");
+        assertEquals(2, numbers.length);
+        assertTrue(2L == numbers[0] && 4L == numbers[1]);
+    }
+    
+    @Test
+    public void testGetSourceSequenceAcknowledgedRange() throws Exception {
+        ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+        
+        Long[] ranges = managedEndpoint.getSourceSequenceAcknowledgedRange("seq1");
+        assertEquals(4, ranges.length);
+        assertTrue(1L == ranges[0] && 1L == ranges[1] && 3L == ranges[2] && 3L == ranges[3]);
+    }
+    
+    @Test
+    public void testGetSourceSequences() throws Exception {
+        ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+
+        String[] sids = managedEndpoint.getSourceSequenceIds(true);
+        assertEquals(2, sids.length);
+        assertTrue(("seq1".equals(sids[0]) || "seq1".equals(sids[1]))
+                   && ("seq2".equals(sids[0]) || "seq2".equals(sids[1])));
+        
+        String sid = managedEndpoint.getCurrentSourceSequenceId();
+        assertEquals("seq2", sid);
+        
+        CompositeData[] sequences = managedEndpoint.getSourceSequences(true);
+        assertEquals(2, sequences.length);
+        verifySourceSequence(sequences[0]);
+        verifySourceSequence(sequences[1]);
+    }
+    
+    @Test
+    public void testGetDestinationSequences() throws Exception {
+        ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+
+        String[] sids = managedEndpoint.getDestinationSequenceIds();
+        assertEquals(2, sids.length);
+        assertTrue(("seq3".equals(sids[0]) || "seq3".equals(sids[1]))
+                   && ("seq4".equals(sids[0]) || "seq4".equals(sids[1])));
+        
+        CompositeData[] sequences = managedEndpoint.getDestinationSequences();
+        assertEquals(2, sequences.length);
+        verifyDestinationSequence(sequences[0]);
+        verifyDestinationSequence(sequences[1]);
+    }
+    
+    @Test
+    public void testGetRetransmissionStatus() throws Exception {
+        ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+        TestRetransmissionQueue rq = (TestRetransmissionQueue)manager.getRetransmissionQueue();
+        
+        CompositeData status = managedEndpoint.getRetransmissionStatus("seq1", 3L);
+        assertNull(status);
+        
+        status = managedEndpoint.getRetransmissionStatus("seq1", 2L);
+        assertNotNull(status);
+        verifyRetransmissionStatus(status, 2L, rq.getRetransmissionStatus());
+    }
+    
+    @Test
+    public void testSuspendAndResumeSourceQueue() throws Exception {
+        ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+        TestRetransmissionQueue rq = (TestRetransmissionQueue)manager.getRetransmissionQueue();
+        
+        assertFalse(rq.isSuspended("seq1"));
+        
+        managedEndpoint.suspendSourceQueue("seq1");
+        assertTrue(rq.isSuspended("seq1"));
+        
+        managedEndpoint.resumeSourceQueue("seq1");
+        assertFalse(rq.isSuspended("seq1"));
+    }
+    
+    private void verifySourceSequence(CompositeData cd) {
+        Object key = cd.get("sequenceId");
+        if ("seq1".equals(key)) {
+            verifySourceSequence(cd, "seq1", 5L, 2);
+        } else if ("seq2".equals(key)) {
+            verifySourceSequence(cd, "seq2", 4L, 1);
+        } else {
+            fail("Unexpected sequence: " + key);
+        }
+    }
+
+    private void verifySourceSequence(CompositeData cd, String sid, long num, int qsize) {
+        assertTrue(sid.equals(cd.get("sequenceId")) 
+                   && num == ((Long)cd.get("currentMessageNumber")).longValue()
+                   && qsize == ((Integer)cd.get("queuedMessageCount")).intValue());
+    }
+
+    private void verifyDestinationSequence(CompositeData cd) {
+        Object key = cd.get("sequenceId");
+        assertTrue("seq3".equals(key) || "seq4".equals(key)); 
+    }
+
+    private void verifyRetransmissionStatus(CompositeData cd, long num, RetransmissionStatus status) {
+        assertEquals(num, cd.get("messageNumber"));
+        assertEquals(status.getResends(), cd.get("resends"));
+        assertEquals(status.getNext(), cd.get("next"));
+        assertEquals(status.getPrevious(), cd.get("previous"));
+        assertEquals(status.getNextInterval(), cd.get("nextInterval"));
+        assertEquals(status.getBackoff(), cd.get("backOff"));
+    }
+
+    private ManagedRMEndpoint createTestManagedRMEndpoint() {
+        manager = new RMManager(); 
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EndpointReferenceType ref = RMUtils.createReference(TEST_URI);
+        Source source = new Source(rme);
+        Destination destination = new Destination(rme);
+        
+        RetransmissionQueue rq = new TestRetransmissionQueue();
+        manager.setRetransmissionQueue(rq);
+        manager.initialise();
+        
+        List<SourceSequence> sss = createTestSourceSequences(source, ref);
+        List<DestinationSequence> dss = createTestDestinationSequences(destination, ref);
+        
+        EasyMock.expect(rme.getManager()).andReturn(manager).anyTimes();
+        EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+        EasyMock.expect(rme.getDestination()).andReturn(destination).anyTimes();
+        EasyMock.expect(rme.getProtocol()).andReturn(null).anyTimes();
+
+        control.replay();
+        setCurrentMessageNumber(sss.get(0), 5L);
+        setCurrentMessageNumber(sss.get(1), 4L);
+        source.addSequence(sss.get(0));
+        source.addSequence(sss.get(1));
+        
+        source.setCurrent(sss.get(1));
+
+        destination.addSequence(dss.get(0));
+        destination.addSequence(dss.get(1));
+        return new ManagedRMEndpoint(rme);
+    }
+
+    private void setCurrentMessageNumber(SourceSequence ss, long num) {
+        for (int i = 0; i < num; i++) {
+            ss.nextMessageNumber();
+        }
+    }
+
+    private List<SourceSequence> createTestSourceSequences(Source source, 
+                                                           EndpointReferenceType to) {
+        List<SourceSequence> sss = new ArrayList<SourceSequence>();
+        sss.add(createTestSourceSequence(source, "seq1", to, new long[]{1L, 1L, 3L, 3L}));
+        sss.add(createTestSourceSequence(source, "seq2", to, new long[]{1L, 1L, 3L, 3L}));
+        
+        return sss;
+    }
+
+    private List<DestinationSequence> createTestDestinationSequences(Destination destination, 
+                                                                     EndpointReferenceType to) {
+        List<DestinationSequence> dss = new ArrayList<DestinationSequence>();
+        dss.add(createTestDestinationSequence(destination, "seq3", to, new long[]{1L, 1L, 3L, 3L}));
+        dss.add(createTestDestinationSequence(destination, "seq4", to, new long[]{1L, 1L, 3L, 3L}));
+        
+        return dss;
+    }
+
+    private SourceSequence createTestSourceSequence(Source source, String sid, 
+                                                    EndpointReferenceType to, long[] acked) {
+        Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
+        identifier.setValue(sid);
+        SourceSequence ss = new SourceSequence(identifier, null);
+        ss.setSource(source);
+        ss.setTarget(to);
+        List<SequenceAcknowledgement.AcknowledgementRange> ranges = 
+            ss.getAcknowledgement().getAcknowledgementRange();
+        for (int i = 0; i < acked.length; i += 2) {
+            ranges.add(createAcknowledgementRange(acked[i], acked[i + 1]));    
+        }
+        return ss;
+    }
+
+    private DestinationSequence createTestDestinationSequence(Destination destination, String sid, 
+                                                              EndpointReferenceType to, long[] acked) {
+        Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
+        identifier.setValue(sid);
+        DestinationSequence ds = new DestinationSequence(identifier, to, null, null);
+        ds.setDestination(destination);
+
+        List<SequenceAcknowledgement.AcknowledgementRange> ranges = 
+            ds.getAcknowledgment().getAcknowledgementRange();
+        for (int i = 0; i < acked.length; i += 2) {
+            ranges.add(createAcknowledgementRange(acked[i], acked[i + 1]));    
+        }
+        return ds;
+    }
+
+    private SequenceAcknowledgement.AcknowledgementRange createAcknowledgementRange(long l, long u) {
+        SequenceAcknowledgement.AcknowledgementRange range = 
+            new SequenceAcknowledgement.AcknowledgementRange();
+        range.setLower(l);
+        range.setUpper(u);
+        return range;
+    }
+
+    private Endpoint createTestEndpoint() throws Exception {
+        ServiceInfo svci = new ServiceInfo();
+        svci.setName(new QName(TEST_URI, "testService"));
+        Service svc = new ServiceImpl(svci);
+        SoapBindingInfo binding = new SoapBindingInfo(svci, WSDLConstants.NS_SOAP11);
+        binding.setTransportURI(WSDLConstants.NS_SOAP_HTTP_TRANSPORT);
+        EndpointInfo ei = new EndpointInfo();
+        ei.setAddress(TEST_URI);
+        ei.setName(new QName(TEST_URI, "testPort"));
+        ei.setBinding(binding);
+        ei.setService(svci);
+        return new EndpointImpl(bus, svc, ei);
+    }
+    
+    private RMEndpoint createTestRMEndpoint() throws Exception {
+        Message message = control.createMock(Message.class);
+        Exchange exchange = control.createMock(Exchange.class);
+        
+        EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
+        EasyMock.expect(exchange.get(Endpoint.class)).andReturn(endpoint);
+        EasyMock.expect(exchange.getOutMessage()).andReturn(message);
+        
+        control.replay();
+        return manager.getReliableEndpoint(message);
+    }
+    
+    private class TestRetransmissionQueue implements RetransmissionQueue {
+        private Set<String> suspended = new HashSet<String>();
+        private RetransmissionStatus status = new TestRetransmissionStatus();
+        
+        public int countUnacknowledged(SourceSequence seq) {
+            final String key = seq.getIdentifier().getValue(); 
+            if ("seq1".equals(key)) {
+                return 2;
+            } else if ("seq2".equals(key)) {
+                return 1;
+            }
+            return 0;
+        }
+
+        public boolean isEmpty() {
+            return false;
+        }
+
+        public void addUnacknowledged(Message message) {
+            // TODO Auto-generated method stub
+        }
+
+        public void purgeAcknowledged(SourceSequence seq) {
+            // TODO Auto-generated method stub
+        }
+
+        public List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq) {
+            List<Long> list = new ArrayList<Long>();
+            final String key = seq.getIdentifier().getValue(); 
+            if ("seq1".equals(key)) {
+                list.add(2L);
+                list.add(4L);
+            } else if ("seq2".equals(key)) {
+                list.add(3L);
+            }
+            return list;
+        }
+
+        public RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num) {
+            final String key = seq.getIdentifier().getValue();
+            if (("seq1".equals(key) && (2L == num || 4L == num)) 
+                || ("seq2".equals(key) && (2L == num))) {
+                return status;
+            }
+            return null;
+        }
+
+        public Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        public void start() {
+            // TODO Auto-generated method stub
+        }
+
+        public void stop(SourceSequence seq) {
+            // TODO Auto-generated method stub
+        }
+
+        public void suspend(SourceSequence seq) {
+            suspended.add(seq.getIdentifier().getValue());
+        }
+
+        public void resume(SourceSequence seq) {
+            suspended.remove(seq.getIdentifier().getValue());
+        }
+        
+        boolean isSuspended(String sid) {
+            return suspended.contains(sid);
+        }
+        
+        RetransmissionStatus getRetransmissionStatus() {
+            return status;
+        }
+    }
+    
+    private static class TestRetransmissionStatus implements RetransmissionStatus {
+        private long interval = 300000L;
+        private Date next = new Date(System.currentTimeMillis() + interval / 2);
+        private Date previous = new Date(next.getTime() - interval);
+        
+        public Date getNext() {
+            return next;
+        }
+
+        public Date getPrevious() {
+            return previous;
+        }
+
+        public int getResends() {
+            return 2;
+        }
+
+        public long getNextInterval() {
+            return interval;
+        }
+
+        public long getBackoff() {
+            return 1L;
+        }
+
+        public boolean isPending() {
+            return false;
+        }
+
+        public boolean isSuspended() {
+            return false;
+        }
+    }
+}

Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java Tue Aug 23 15:41:21 2011
@@ -157,6 +157,16 @@ public class RMManagerConfigurationTest 
             // TODO Auto-generated method stub
             
         }
+
+        public SourceSequence getSourceSequence(Identifier seq, ProtocolVariation protocol) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        public DestinationSequence getDestinationSequence(Identifier seq, ProtocolVariation protocol) {
+            // TODO Auto-generated method stub
+            return null;
+        }
         
     }
 }

Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml Tue Aug 23 15:41:21 2011
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:cxf="http://cxf.apache.org/core"       
+       xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"       
+       xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd       
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+    <bean id="org.apache.cxf.management.InstrumentationManager" class="org.apache.cxf.management.jmx.InstrumentationManagerImpl">
+        <property name="bus" ref="cxf" />
+        <property name="enabled" value="true"/>
+        <property name="threaded" value="false"/>        
+        <property name="daemon" value="false"/>            
+        <property name="JMXServiceURL" value="service:jmx:rmi:///jndi/rmi://localhost:9914/jmxrmi" />
+    </bean> 
+
+    <cxf:bus>
+        <cxf:features>
+            <wsrm-mgr:reliableMessaging>
+                <wsrm-policy:RMAssertion>         
+                    <wsrm-policy:BaseRetransmissionInterval Milliseconds="10000"/>           
+                    <wsrm-policy:AcknowledgementInterval Milliseconds="10000"/>                                                        
+                </wsrm-policy:RMAssertion>
+                <wsrm-mgr:deliveryAssurance>
+                    <wsrm-mgr:InOrder/>
+                </wsrm-mgr:deliveryAssurance>
+                <wsrm-mgr:sourcePolicy>
+                    <wsrm-mgr:sequenceTerminationPolicy terminateOnShutdown="true"/>                    
+                </wsrm-mgr:sourcePolicy>
+                <wsrm-mgr:destinationPolicy>
+                    <wsrm-mgr:acksPolicy ImmediaAcksTimeout="2000" intraMessageThreshold="0"/>                    
+                </wsrm-mgr:destinationPolicy>
+            </wsrm-mgr:reliableMessaging>
+        </cxf:features>
+    </cxf:bus>
+    
+</beans>

Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml
------------------------------------------------------------------------------
    svn:executable = *

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java Tue Aug 23 15:41:21 2011
@@ -226,30 +226,34 @@ public class RMTxStoreTest extends Asser
     
     @Test
     public void testCreateDeleteMessages() throws IOException, SQLException  {
-        RMMessage msg = control.createMock(RMMessage.class);
+        RMMessage msg1 = control.createMock(RMMessage.class);
+        RMMessage msg2 = control.createMock(RMMessage.class);
         Identifier sid1 = new Identifier();
         sid1.setValue("sequence1");
-        EasyMock.expect(msg.getMessageNumber()).andReturn(ONE).times(2); 
+        EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE); 
+        EasyMock.expect(msg2.getMessageNumber()).andReturn(ONE); 
         byte[] bytes = new byte[89];
-        EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
-        EasyMock.expect(msg.getSize()).andReturn(bytes.length);
+        EasyMock.expect(msg1.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+        EasyMock.expect(msg1.getSize()).andReturn(bytes.length);
+        EasyMock.expect(msg2.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+        EasyMock.expect(msg2.getSize()).andReturn(bytes.length);
         
         control.replay();
         store.beginTransaction();
-        store.storeMessage(sid1, msg, true);
-        store.storeMessage(sid1, msg, false);
+        store.storeMessage(sid1, msg1, true);
+        store.storeMessage(sid1, msg2, false);
         store.commit();
         control.verify();
         
         control.reset();
-        EasyMock.expect(msg.getMessageNumber()).andReturn(ONE); 
-        EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
-        EasyMock.expect(msg.getSize()).andReturn(bytes.length);
+        EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE); 
+        EasyMock.expect(msg1.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+        EasyMock.expect(msg1.getSize()).andReturn(bytes.length);
         
         control.replay();
         store.beginTransaction();
         try {
-            store.storeMessage(sid1, msg, true);
+            store.storeMessage(sid1, msg1, true);
         } catch (SQLException ex) {
             assertEquals("23505", ex.getSQLState());
         }
@@ -257,14 +261,17 @@ public class RMTxStoreTest extends Asser
         control.verify();
         
         control.reset();
-        EasyMock.expect(msg.getMessageNumber()).andReturn(TEN).times(2); 
-        EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes)); 
-        EasyMock.expect(msg.getSize()).andReturn(bytes.length);
+        EasyMock.expect(msg1.getMessageNumber()).andReturn(TEN);
+        EasyMock.expect(msg2.getMessageNumber()).andReturn(TEN); 
+        EasyMock.expect(msg1.getInputStream()).andReturn(new ByteArrayInputStream(bytes)); 
+        EasyMock.expect(msg1.getSize()).andReturn(bytes.length);
+        EasyMock.expect(msg2.getInputStream()).andReturn(new ByteArrayInputStream(bytes)); 
+        EasyMock.expect(msg2.getSize()).andReturn(bytes.length);
         
         control.replay();
         store.beginTransaction();
-        store.storeMessage(sid1, msg, true);
-        store.storeMessage(sid1, msg, false);
+        store.storeMessage(sid1, msg1, true);
+        store.storeMessage(sid1, msg2, false);
         store.commit();
         control.verify();
         
@@ -420,6 +427,64 @@ public class RMTxStoreTest extends Asser
     }
 
     @Test
+    public void testGetDestinationSequence() throws SQLException, IOException {
+        
+        Identifier sid1 = null;
+        Identifier sid2 = null;
+        
+        DestinationSequence seq = 
+            store.getDestinationSequence(new Identifier(), ProtocolVariation.RM10WSA200408);
+        assertNull(seq);
+
+        try {
+            sid1 = setupDestinationSequence("sequence1");
+
+            seq = store.getDestinationSequence(sid1, ProtocolVariation.RM10WSA200408);
+            assertNotNull(seq);
+
+            sid2 = setupDestinationSequence("sequence2");
+            seq = store.getDestinationSequence(sid2, ProtocolVariation.RM10WSA200408);
+            assertNotNull(seq);
+        } finally {
+            if (null != sid1) {
+                store.removeDestinationSequence(sid1);
+            }
+            if (null != sid2) {
+                store.removeDestinationSequence(sid2);
+            }
+        }
+    }
+
+    @Test
+    public void testGetSourceSequence() throws SQLException, IOException {
+        
+        Identifier sid1 = null;
+        Identifier sid2 = null;
+        
+        SourceSequence seq = 
+            store.getSourceSequence(new Identifier(), ProtocolVariation.RM10WSA200408);
+        assertNull(seq);
+        
+        try {
+            sid1 = setupSourceSequence("sequence1");
+
+            seq = store.getSourceSequence(sid1, ProtocolVariation.RM10WSA200408);
+            assertNotNull(seq);
+
+            sid2 = setupSourceSequence("sequence2");
+            seq = store.getSourceSequence(sid2, ProtocolVariation.RM10WSA200408);
+            assertNotNull(seq);
+        } finally {
+            if (null != sid1) {
+                store.removeSourceSequence(sid1);
+            }
+            if (null != sid2) {
+                store.removeSourceSequence(sid2);
+            }
+        }
+    }
+
+    @Test
     public void testGetMessages() throws SQLException, IOException {
         
         Identifier sid1 = new Identifier();