You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2011/06/06 14:15:36 UTC
svn commit: r1132609 - in /cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/
systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ systests/...
Author: ay
Date: Mon Jun 6 12:15:36 2011
New Revision: 1132609
URL: http://svn.apache.org/viewvc?rev=1132609&view=rev
Log:
[CXF-3219] WS-RM's inbound to update the ack range in the store
Added:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java (with props)
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Mon Jun 6 12:15:36 2011
@@ -29,11 +29,13 @@ import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
import org.apache.cxf.ws.rm.persistence.RMStore;
public class Destination extends AbstractEndpoint {
@@ -131,6 +133,17 @@ public class Destination extends Abstrac
SequenceFaultFactory sff = new SequenceFaultFactory();
throw sff.createUnknownSequenceFault(sequenceType.getIdentifier());
}
+
+ RMStore store = getReliableEndpoint().getManager().getStore();
+ if (null != store) {
+ CachedOutputStream saved =
+ (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ RMMessage msg = new RMMessage();
+ msg.setMessageNumber(sequenceType.getMessageNumber());
+ msg.setContent(saved);
+ store.persistIncoming(seq, msg);
+ }
+
}
void ackRequested(Message message) throws SequenceFault, RMException {
@@ -180,6 +193,7 @@ public class Destination extends Abstrac
if (null != seq) {
seq.processingComplete(sequenceType.getMessageNumber());
+ seq.purgeAcknowledged(sequenceType.getMessageNumber());
}
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Mon Jun 6 12:15:36 2011
@@ -148,9 +148,7 @@ public class DestinationSequence extends
mergeRanges();
wakeupAll();
}
-
- purgeAcknowledged(messageNumber);
-
+
RMAssertion rma = PolicyUtils.getRMAssertion(destination.getManager().getRMAssertion(), message);
long acknowledgementInterval = 0;
AcknowledgementInterval ai = rma.getAcknowledgementInterval();
Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java?rev=1132609&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java Mon Jun 6 12:15:36 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.InputStream;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+
+/**
+ *
+ */
+public class RMCaptureInInterceptor extends AbstractRMInterceptor<Message> {
+ private static final Logger LOG = LogUtils.getLogger(RMCaptureInInterceptor.class);
+
+ public RMCaptureInInterceptor() {
+ super(Phase.PRE_STREAM);
+ }
+
+ protected void handle(Message message) throws SequenceFault, RMException {
+ LOG.entering(getClass().getName(), "handleMessage");
+
+ InputStream is = message.getContent(InputStream.class);
+ if (is != null) {
+ CachedOutputStream saved = new CachedOutputStream();
+ try {
+ IOUtils.copy(is, saved);
+
+ saved.flush();
+ is.close();
+
+ message.setContent(InputStream.class, saved.getInputStream());
+ LOG.fine("Capturing the original RM message");
+ message.put(RMMessageConstants.SAVED_CONTENT, saved);
+ } catch (Exception e) {
+ throw new Fault(e);
+ }
+ }
+ }
+
+}
Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
------------------------------------------------------------------------------
svn:executable = *
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Mon Jun 6 12:15:36 2011
@@ -410,6 +410,7 @@ public class RMManager implements Server
RMEndpoint rme = createReliableEndpoint(endpoint);
rme.initialise(conduit, null, null);
reliableEndpoints.put(endpoint, rme);
+ SourceSequence css = null;
for (SourceSequence ss : sss) {
Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
@@ -419,7 +420,11 @@ public class RMManager implements Server
LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
rme.getSource().addSequence(ss, false);
-
+ // choosing an arbitrary valid source sequence as the current source sequence
+ if (css == null && !ss.isExpired() && !ss.isLastMessage()) {
+ css = ss;
+ rme.getSource().setCurrent(css);
+ }
for (RMMessage m : ms) {
Message message = new MessageImpl();
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java Mon Jun 6 12:15:36 2011
@@ -23,6 +23,7 @@ import org.apache.cxf.Bus;
import org.apache.cxf.common.injection.NoJSR250Annotations;
import org.apache.cxf.feature.AbstractFeature;
import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.ws.rm.RMCaptureInInterceptor;
import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
import org.apache.cxf.ws.rm.RMInInterceptor;
import org.apache.cxf.ws.rm.RMManager;
@@ -50,6 +51,7 @@ public class RMFeature extends AbstractF
private RMOutInterceptor rmLogicalOut = new RMOutInterceptor();
private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
private RMSoapInterceptor rmCodec = new RMSoapInterceptor();
+ private RMCaptureInInterceptor rmCaptureIn = new RMCaptureInInterceptor();
public void setDeliveryAssurance(DeliveryAssuranceType da) {
deliveryAssurance = da;
@@ -94,17 +96,21 @@ public class RMFeature extends AbstractF
rmLogicalIn.setBus(bus);
rmLogicalOut.setBus(bus);
rmDelivery.setBus(bus);
+ rmCaptureIn.setBus(bus);
provider.getInInterceptors().add(rmLogicalIn);
provider.getInInterceptors().add(rmCodec);
provider.getInInterceptors().add(rmDelivery);
+ if (null != store) {
+ provider.getInInterceptors().add(rmCaptureIn);
+ }
provider.getOutInterceptors().add(rmLogicalOut);
provider.getOutInterceptors().add(rmCodec);
provider.getInFaultInterceptors().add(rmLogicalIn);
provider.getInFaultInterceptors().add(rmCodec);
- provider.getInInterceptors().add(rmDelivery);
+ provider.getInFaultInterceptors().add(rmDelivery);
provider.getOutFaultInterceptors().add(rmLogicalOut);
provider.getOutFaultInterceptors().add(rmCodec);
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Mon Jun 6 12:15:36 2011
@@ -172,7 +172,7 @@ public class DestinationTest extends Ass
Identifier id = control.createMock(Identifier.class);
EasyMock.expect(st.getIdentifier()).andReturn(id);
long nr = 10;
- EasyMock.expect(st.getMessageNumber()).andReturn(nr).times(2);
+ EasyMock.expect(st.getMessageNumber()).andReturn(nr).times(3);
DestinationSequence ds = control.createMock(DestinationSequence.class);
EasyMock.expect(destination.getSequence(id)).andReturn(ds);
@@ -203,7 +203,11 @@ public class DestinationTest extends Ass
String acksToAddress = "acksTo";
EasyMock.expect(acksToURI.getValue()).andReturn(acksToAddress);
EasyMock.expect(ds.canPiggybackAckOnPartialResponse()).andReturn(false);
- EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme);
+ EasyMock.expect(destination.getReliableEndpoint()).andReturn(rme).times(2);
+ RMManager manager = control.createMock(RMManager.class);
+ EasyMock.expect(rme.getManager()).andReturn(manager);
+ RMStore store = control.createMock(RMStore.class);
+ EasyMock.expect(manager.getStore()).andReturn(store);
Proxy proxy = control.createMock(Proxy.class);
EasyMock.expect(rme.getProxy()).andReturn(proxy);
proxy.acknowledge(ds);
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java Mon Jun 6 12:15:36 2011
@@ -145,6 +145,7 @@ public class ClientPersistenceTest exten
verifyStorePopulation();
stopClient();
startClient();
+ populateStoreAfterRestart();
recover();
verifyRecovery();
}
@@ -177,12 +178,12 @@ public class ClientPersistenceTest exten
greeter.greetMeOneWay("two");
greeter.greetMeOneWay("three");
greeter.greetMeOneWay("four");
-
- MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in.getInboundMessages());
- assertNotNull(mf);
awaitMessages(5, 3);
+ MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in.getInboundMessages());
+
+ // sent create seq + 4 app messages and losing 2 app messages
mf.verifyMessages(5, true);
String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
GREETMEONEWAY_ACTION,
@@ -193,13 +194,13 @@ public class ClientPersistenceTest exten
mf.verifyMessageNumbers(new String[] {null, "1", "2", "3", "4"}, true);
mf.verifyAcknowledgements(new boolean[5], true);
-
+ // as 2 messages being lost, received seq ack and 2 ack messages
mf.verifyMessages(3, false);
expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
RMConstants.getSequenceAcknowledgmentAction(),
RMConstants.getSequenceAcknowledgmentAction()};
mf.verifyActions(expectedActions, false);
- mf.verifyAcknowledgements(new boolean[] {false, true, true}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true}, false);
}
void verifyStorePopulation() {
@@ -235,6 +236,38 @@ public class ClientPersistenceTest exten
bus.shutdown(true);
}
+ void populateStoreAfterRestart() throws Exception {
+
+ bus.getExtension(RMManager.class).getRMAssertion().getBaseRetransmissionInterval()
+ .setMilliseconds(new Long(60000));
+
+ greeter.greetMeOneWay("five");
+
+ awaitMessages(1, 3);
+
+ MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in.getInboundMessages());
+
+ // sent 1 app message and no create seq messag this time
+ mf.verifyMessages(1, true);
+ String[] expectedActions = new String[] {GREETMEONEWAY_ACTION};
+
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {"5"}, true);
+ mf.verifyAcknowledgements(new boolean[1], true);
+
+ mf.verifyMessages(3, false);
+
+ expectedActions = new String[] {RMConstants.getSequenceAcknowledgmentAction(),
+ RMConstants.getSequenceAcknowledgmentAction(),
+ null};
+ // we can't reliably predict how the three remaining messages are acknowledged
+// mf.verifyActions(expectedActions, false);
+// mf.verifyAcknowledgements(new boolean[]{true, true, false}, false);
+
+ // verify the final ack range to be complete
+ mf.verifyAcknowledgementRange(1, 5);
+ }
+
void recover() throws Exception {
// do nothing - resends should happen in the background
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Mon Jun 6 12:15:36 2011
@@ -215,6 +215,7 @@ public class SequenceTest extends Abstra
mf.verifyActions(expectedActions, false);
mf.verifyMessageNumbers(new String[] {null, null, null, null}, false);
mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+ mf.verifyAcknowledgementRange(1, 3);
}
@Test
@@ -302,7 +303,7 @@ public class SequenceTest extends Abstra
mf.verifyMessages(0, true);
mf.verifyMessages(1, false);
mf.verifyAcknowledgements(new boolean[] {true}, false);
-
+ mf.verifyAcknowledgementRange(1, 2);
}
@Test
@@ -397,7 +398,6 @@ public class SequenceTest extends Abstra
// in the course of retransmission - this is harmless but pollutes test output
awaitMessages(3, 0, 7500);
-
}
@Test
@@ -489,6 +489,7 @@ public class SequenceTest extends Abstra
mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, false);
mf.verifyLastMessage(new boolean[4], false);
mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+ mf.verifyAcknowledgementRange(1, 3);
}
// the same as above but using endpoint specific interceptor configuration
@@ -537,6 +538,7 @@ public class SequenceTest extends Abstra
mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, false);
mf.verifyLastMessage(new boolean[4], false);
mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+ mf.verifyAcknowledgementRange(1, 3);
}
@Test
@@ -588,7 +590,7 @@ public class SequenceTest extends Abstra
mf.verifyMessageNumbers(new String[1], true);
mf.verifyLastMessage(new boolean[1], true);
mf.verifyAcknowledgements(new boolean[] {true}, true);
-
+ mf.verifyAcknowledgementRange(1, 2);
}
// A maximum sequence length of 2 is configured for the client only (server allows 10).
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java Mon Jun 6 12:15:36 2011
@@ -142,6 +142,7 @@ public class ServerPersistenceTest exten
responses[2] = greeter.greetMeAsync("three");
verifyMissingResponse(responses);
+
control.stopGreeter(SERVER_LOSS_CFG);
LOG.fine("Stopped greeter server");
@@ -156,6 +157,7 @@ public class ServerPersistenceTest exten
responses[3] = greeter.greetMeAsync("four");
verifyRetransmissionQueue();
+ verifyAcknowledgementRange(1, 4);
out.getOutboundMessages().clear();
in.getInboundMessages().clear();
@@ -243,6 +245,11 @@ public class ServerPersistenceTest exten
boolean empty = greeterBus.getExtension(RMManager.class).getRetransmissionQueue().isEmpty();
assertTrue("Retransmission Queue is not empty", empty);
}
+
+ void verifyAcknowledgementRange(long lower, long higher) throws Exception {
+ MessageFlow mf = new MessageFlow(out.getOutboundMessages(), in.getInboundMessages());
+ mf.verifyAcknowledgementRange(lower, higher);
+ }
protected void awaitMessages(int nExpectedOut, int nExpectedIn) {
awaitMessages(nExpectedOut, nExpectedIn, 10000);
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java?rev=1132609&r1=1132608&r2=1132609&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java Mon Jun 6 12:15:36 2011
@@ -420,6 +420,37 @@ public class MessageFlow extends Assert
}
}
+ public void verifyAcknowledgementRange(long lower, long upper) throws Exception {
+ long currentLower = 0;
+ long currentUpper = 0;
+ // get the final ack range
+ for (Document doc : inboundMessages) {
+ Element e = getRMHeaderElement(doc, RMConstants.getSequenceAckName());
+ // let the newer messages take precedence over the older messages in getting the final range
+ if (null != e) {
+ e = getAcknowledgementRange(e);
+ if (null != e) {
+ currentLower = Long.parseLong(e.getAttribute("Lower"));
+ currentUpper = Long.parseLong(e.getAttribute("Upper"));
+ }
+ }
+ }
+ assertEquals("Unexpected acknowledgement lower range",
+ lower, currentLower);
+ assertEquals("Unexpected acknowledgement upper range",
+ upper, currentUpper);
+ }
+
+ // note that this method onsiders only the first range element
+ private Element getAcknowledgementRange(Element element) throws Exception {
+ for (Node nd = element.getFirstChild(); nd != null; nd = nd.getNextSibling()) {
+ if (Node.ELEMENT_NODE == nd.getNodeType() && "AcknowledgementRange".equals(nd.getLocalName())) {
+ return (Element)nd;
+ }
+ }
+ return null;
+ }
+
public void purgePartialResponses() throws Exception {
for (int i = inboundMessages.size() - 1; i >= 0; i--) {
if (isPartialResponse(inboundMessages.get(i))) {