You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2011/06/06 14:15:36 UTC

svn commit: r1132609 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ systests/...

Author: ay
Date: Mon Jun  6 12:15:36 2011
New Revision: 1132609

URL: http://svn.apache.org/viewvc?rev=1132609&view=rev
Log:
[CXF-3219] WS-RM's inbound to update the ack range in the store

Added:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java   (with props)
Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Mon Jun  6 12:15:36 2011
@@ -29,11 +29,13 @@ import java.util.logging.Logger;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 
 public class Destination extends AbstractEndpoint {
@@ -131,6 +133,17 @@ public class Destination extends Abstrac
             SequenceFaultFactory sff = new SequenceFaultFactory();
             throw sff.createUnknownSequenceFault(sequenceType.getIdentifier());
         }
+
+        RMStore store = getReliableEndpoint().getManager().getStore();
+        if (null != store) {
+            CachedOutputStream saved = 
+                (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            RMMessage msg = new RMMessage();
+            msg.setMessageNumber(sequenceType.getMessageNumber());
+            msg.setContent(saved);
+            store.persistIncoming(seq, msg);
+        }
+
     }
     
     void ackRequested(Message message) throws SequenceFault, RMException {
@@ -180,6 +193,7 @@ public class Destination extends Abstrac
 
         if (null != seq) {
             seq.processingComplete(sequenceType.getMessageNumber());
+            seq.purgeAcknowledged(sequenceType.getMessageNumber());
         }
     }
     

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Mon Jun  6 12:15:36 2011
@@ -148,9 +148,7 @@ public class DestinationSequence extends
             mergeRanges();
             wakeupAll();
         }
-        
-        purgeAcknowledged(messageNumber);
-        
+                
         RMAssertion rma = PolicyUtils.getRMAssertion(destination.getManager().getRMAssertion(), message);
         long acknowledgementInterval = 0;
         AcknowledgementInterval ai = rma.getAcknowledgementInterval();

Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java?rev=1132609&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java Mon Jun  6 12:15:36 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.InputStream;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+
+/**
+ * 
+ */
+public class RMCaptureInInterceptor extends AbstractRMInterceptor<Message> {
+    private static final Logger LOG = LogUtils.getLogger(RMCaptureInInterceptor.class);
+    
+    public RMCaptureInInterceptor() {
+        super(Phase.PRE_STREAM);
+    }
+
+    protected void handle(Message message) throws SequenceFault, RMException {
+        LOG.entering(getClass().getName(), "handleMessage");
+        
+        InputStream is = message.getContent(InputStream.class);
+        if (is != null) {
+            CachedOutputStream saved = new CachedOutputStream();
+            try {
+                IOUtils.copy(is, saved);
+
+                saved.flush();
+                is.close();
+
+                message.setContent(InputStream.class, saved.getInputStream());
+                LOG.fine("Capturing the original RM message");
+                message.put(RMMessageConstants.SAVED_CONTENT, saved);
+            } catch (Exception e) {
+                throw new Fault(e);
+            }
+        }
+    }
+
+}

Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Mon Jun  6 12:15:36 2011
@@ -410,6 +410,7 @@ public class RMManager implements Server
         RMEndpoint rme = createReliableEndpoint(endpoint);
         rme.initialise(conduit, null, null);
         reliableEndpoints.put(endpoint, rme);
+        SourceSequence css = null;
         for (SourceSequence ss : sss) {            
  
             Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
@@ -419,7 +420,11 @@ public class RMManager implements Server
             LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
             
             rme.getSource().addSequence(ss, false);
-            
+            // choosing an arbitrary valid source sequence as the current source sequence
+            if (css == null && !ss.isExpired() && !ss.isLastMessage()) {
+                css = ss;
+                rme.getSource().setCurrent(css);
+            }
             for (RMMessage m : ms) {                
                 
                 Message message = new MessageImpl();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java Mon Jun  6 12:15:36 2011
@@ -23,6 +23,7 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.common.injection.NoJSR250Annotations;
 import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.ws.rm.RMCaptureInInterceptor;
 import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
 import org.apache.cxf.ws.rm.RMInInterceptor;
 import org.apache.cxf.ws.rm.RMManager;
@@ -50,6 +51,7 @@ public class RMFeature extends AbstractF
     private RMOutInterceptor rmLogicalOut = new RMOutInterceptor();
     private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
     private RMSoapInterceptor rmCodec = new RMSoapInterceptor();
+    private RMCaptureInInterceptor rmCaptureIn = new RMCaptureInInterceptor(); 
 
     public void setDeliveryAssurance(DeliveryAssuranceType da) {
         deliveryAssurance = da;
@@ -94,17 +96,21 @@ public class RMFeature extends AbstractF
         rmLogicalIn.setBus(bus);
         rmLogicalOut.setBus(bus);
         rmDelivery.setBus(bus);
+        rmCaptureIn.setBus(bus);
 
         provider.getInInterceptors().add(rmLogicalIn);
         provider.getInInterceptors().add(rmCodec);
         provider.getInInterceptors().add(rmDelivery);
+        if (null != store) {
+            provider.getInInterceptors().add(rmCaptureIn);
+        }
 
         provider.getOutInterceptors().add(rmLogicalOut);
         provider.getOutInterceptors().add(rmCodec);
 
         provider.getInFaultInterceptors().add(rmLogicalIn);
         provider.getInFaultInterceptors().add(rmCodec);
-        provider.getInInterceptors().add(rmDelivery);
+        provider.getInFaultInterceptors().add(rmDelivery);
 
         provider.getOutFaultInterceptors().add(rmLogicalOut);
         provider.getOutFaultInterceptors().add(rmCodec);

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Mon Jun  6 12:15:36 2011
@@ -172,7 +172,7 @@ public class DestinationTest extends Ass
         Identifier id = control.createMock(Identifier.class);
         EasyMock.expect(st.getIdentifier()).andReturn(id); 
         long nr = 10;
-        EasyMock.expect(st.getMessageNumber()).andReturn(nr).times(2);
+        EasyMock.expect(st.getMessageNumber()).andReturn(nr).times(3);
         DestinationSequence ds = control.createMock(DestinationSequence.class);
         EasyMock.expect(destination.getSequence(id)).andReturn(ds);
         
@@ -203,7 +203,11 @@ public class DestinationTest extends Ass
         String acksToAddress = "acksTo";
         EasyMock.expect(acksToURI.getValue()).andReturn(acksToAddress);
         EasyMock.expect(ds.canPiggybackAckOnPartialResponse()).andReturn(false);
-        EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme);
+        EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme).times(2);
+        RMManager manager = control.createMock(RMManager.class);
+        EasyMock.expect(rme.getManager()).andReturn(manager);
+        RMStore store = control.createMock(RMStore.class);
+        EasyMock.expect(manager.getStore()).andReturn(store);
         Proxy proxy = control.createMock(Proxy.class);
         EasyMock.expect(rme.getProxy()).andReturn(proxy);
         proxy.acknowledge(ds);

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java Mon Jun  6 12:15:36 2011
@@ -145,6 +145,7 @@ public class ClientPersistenceTest exten
         verifyStorePopulation();
         stopClient();
         startClient();
+        populateStoreAfterRestart();
         recover();
         verifyRecovery();
     }
@@ -177,12 +178,12 @@ public class ClientPersistenceTest exten
         greeter.greetMeOneWay("two");
         greeter.greetMeOneWay("three");
         greeter.greetMeOneWay("four");
-        
-        MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in.getInboundMessages());
 
-        assertNotNull(mf);
         awaitMessages(5, 3);
         
+        MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in.getInboundMessages());
+
+        // sent create seq + 4 app messages and losing 2 app messages
         mf.verifyMessages(5, true);
         String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
                                                  GREETMEONEWAY_ACTION,
@@ -193,13 +194,13 @@ public class ClientPersistenceTest exten
         mf.verifyMessageNumbers(new String[] {null, "1", "2", "3", "4"}, true);
         mf.verifyAcknowledgements(new boolean[5], true);
 
-
+        // as 2 messages being lost, received seq ack and 2 ack messages 
         mf.verifyMessages(3, false);
         expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
                                         RMConstants.getSequenceAcknowledgmentAction(),
                                         RMConstants.getSequenceAcknowledgmentAction()};
         mf.verifyActions(expectedActions, false);
-        mf.verifyAcknowledgements(new boolean[] {false, true, true}, false);        
+        mf.verifyAcknowledgements(new boolean[] {false, true, true}, false);
     }
     
     void verifyStorePopulation() {
@@ -235,6 +236,38 @@ public class ClientPersistenceTest exten
         bus.shutdown(true);
     }
       
+    void populateStoreAfterRestart() throws Exception {
+        
+        bus.getExtension(RMManager.class).getRMAssertion().getBaseRetransmissionInterval()
+        .setMilliseconds(new Long(60000));
+
+        greeter.greetMeOneWay("five");
+
+        awaitMessages(1, 3);
+        
+        MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in.getInboundMessages());
+        
+        // sent 1 app message and no create seq messag this time
+        mf.verifyMessages(1, true);
+        String[] expectedActions = new String[] {GREETMEONEWAY_ACTION};
+
+        mf.verifyActions(expectedActions, true);
+        mf.verifyMessageNumbers(new String[] {"5"}, true);
+        mf.verifyAcknowledgements(new boolean[1], true);
+
+        mf.verifyMessages(3, false);
+
+        expectedActions = new String[] {RMConstants.getSequenceAcknowledgmentAction(),
+                                        RMConstants.getSequenceAcknowledgmentAction(),
+                                        null};
+        // we can't reliably predict how the three remaining messages are acknowledged
+//        mf.verifyActions(expectedActions, false);
+//        mf.verifyAcknowledgements(new boolean[]{true, true, false}, false);
+        
+        // verify the final ack range to be complete 
+        mf.verifyAcknowledgementRange(1, 5);
+    }
+
     void recover() throws Exception {
         
         // do nothing - resends should happen in the background  

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Mon Jun  6 12:15:36 2011
@@ -215,6 +215,7 @@ public class SequenceTest extends Abstra
         mf.verifyActions(expectedActions, false);
         mf.verifyMessageNumbers(new String[] {null, null, null, null}, false);
         mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+        mf.verifyAcknowledgementRange(1, 3);
     }
     
     @Test
@@ -302,7 +303,7 @@ public class SequenceTest extends Abstra
         mf.verifyMessages(0, true);
         mf.verifyMessages(1, false);
         mf.verifyAcknowledgements(new boolean[] {true}, false);
-
+        mf.verifyAcknowledgementRange(1, 2);
     }
     
     @Test
@@ -397,7 +398,6 @@ public class SequenceTest extends Abstra
         // in the course of retransmission - this is harmless but pollutes test output
         
         awaitMessages(3, 0, 7500);
-        
     }
     
     @Test
@@ -489,6 +489,7 @@ public class SequenceTest extends Abstra
         mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, false);
         mf.verifyLastMessage(new boolean[4], false);
         mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+        mf.verifyAcknowledgementRange(1, 3);
     }
 
     // the same as above but using endpoint specific interceptor configuration
@@ -537,6 +538,7 @@ public class SequenceTest extends Abstra
         mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, false);
         mf.verifyLastMessage(new boolean[4], false);
         mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+        mf.verifyAcknowledgementRange(1, 3);
     }
 
     @Test
@@ -588,7 +590,7 @@ public class SequenceTest extends Abstra
         mf.verifyMessageNumbers(new String[1], true);
         mf.verifyLastMessage(new boolean[1], true);
         mf.verifyAcknowledgements(new boolean[] {true}, true);
-
+        mf.verifyAcknowledgementRange(1, 2);
     }
     
     // A maximum sequence length of 2 is configured for the client only (server allows 10).

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java Mon Jun  6 12:15:36 2011
@@ -142,6 +142,7 @@ public class ServerPersistenceTest exten
         responses[2] = greeter.greetMeAsync("three");
         
         verifyMissingResponse(responses);
+
         control.stopGreeter(SERVER_LOSS_CFG);
         LOG.fine("Stopped greeter server");
        
@@ -156,6 +157,7 @@ public class ServerPersistenceTest exten
         responses[3] = greeter.greetMeAsync("four");
         
         verifyRetransmissionQueue();
+        verifyAcknowledgementRange(1, 4);
         
         out.getOutboundMessages().clear();
         in.getInboundMessages().clear();
@@ -243,6 +245,11 @@ public class ServerPersistenceTest exten
         boolean empty = greeterBus.getExtension(RMManager.class).getRetransmissionQueue().isEmpty();
         assertTrue("Retransmission Queue is not empty", empty);
     }
+    
+    void verifyAcknowledgementRange(long lower, long higher) throws Exception {
+        MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in.getInboundMessages());
+        mf.verifyAcknowledgementRange(lower, higher);
+    }
 
     protected void awaitMessages(int nExpectedOut, int nExpectedIn) {
         awaitMessages(nExpectedOut, nExpectedIn, 10000);

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java Mon Jun  6 12:15:36 2011
@@ -420,6 +420,37 @@ public class MessageFlow extends Assert 
         }
     }
     
+    public void verifyAcknowledgementRange(long lower, long upper) throws Exception {
+        long currentLower = 0;
+        long currentUpper = 0;
+        // get the final ack range
+        for (Document doc : inboundMessages) {
+            Element e = getRMHeaderElement(doc, RMConstants.getSequenceAckName());
+            // let the newer messages take precedence over the older messages in getting the final range
+            if (null != e) {
+                e = getAcknowledgementRange(e);
+                if (null != e) {
+                    currentLower = Long.parseLong(e.getAttribute("Lower"));
+                    currentUpper = Long.parseLong(e.getAttribute("Upper"));
+                }
+            }
+        }
+        assertEquals("Unexpected acknowledgement lower range", 
+                     lower, currentLower);
+        assertEquals("Unexpected acknowledgement upper range", 
+                     upper, currentUpper);
+    }
+    
+    // note that this method onsiders only the first range element 
+    private Element getAcknowledgementRange(Element element) throws Exception {
+        for (Node nd = element.getFirstChild(); nd != null; nd = nd.getNextSibling()) { 
+            if (Node.ELEMENT_NODE == nd.getNodeType() && "AcknowledgementRange".equals(nd.getLocalName())) {
+                return (Element)nd;
+            }
+        } 
+        return null;
+    }
+
     public void purgePartialResponses() throws Exception {
         for (int i = inboundMessages.size() - 1; i >= 0; i--) {
             if (isPartialResponse(inboundMessages.get(i))) {