You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2011/02/03 22:36:06 UTC
svn commit: r1066987 - in /cxf/branches/2.3.x-fixes: ./
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ s...
Author: dkulp
Date: Thu Feb 3 21:36:05 2011
New Revision: 1066987
URL: http://svn.apache.org/viewvc?rev=1066987&view=rev
Log:
Merged revisions 1066985 via svnmerge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r1066985 | dkulp | 2011-02-03 16:30:31 -0500 (Thu, 03 Feb 2011) | 4 lines
[CXF-3271] WS-RM code does not support InOrder assurances
Patch from Dennis Sosnoski applied
Modified patch to use continuations when available to not consume
threads.
........
Added:
cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
- copied unchanged from r1066985, cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
- copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
- copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
- copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
- copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
- copied unchanged from r1066985, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
Modified:
cxf/branches/2.3.x-fixes/ (props changed)
cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java
cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
Propchange: cxf/branches/2.3.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Thu Feb 3 21:36:05 2011
@@ -47,8 +47,12 @@ public abstract class AbstractRMIntercep
private RMManager manager;
private Bus bus;
+ protected AbstractRMInterceptor(String phase) {
+ super(phase);
+ }
+
protected AbstractRMInterceptor() {
- super(Phase.PRE_LOGICAL);
+ this(Phase.PRE_LOGICAL);
}
public RMManager getManager() {
Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Thu Feb 3 21:36:05 2011
@@ -19,16 +19,26 @@
package org.apache.cxf.ws.rm;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
import org.apache.cxf.ws.rm.persistence.RMStore;
public class Destination extends AbstractEndpoint {
+
+ private static final Logger LOG = LogUtils.getL7dLogger(Destination.class);
private Map<String, DestinationSequence> map;
@@ -91,12 +101,31 @@ public class Destination extends Abstrac
DestinationSequence seq = getSequence(sequenceType.getIdentifier());
if (null != seq) {
- seq.applyDeliveryAssurance(sequenceType.getMessageNumber());
- seq.acknowledge(message);
-
- if (null != sequenceType.getLastMessage()) {
- seq.setLastMessageNumber(sequenceType.getMessageNumber());
- ackImmediately(seq, message);
+ if (seq.applyDeliveryAssurance(sequenceType.getMessageNumber(), message)) {
+ seq.acknowledge(message);
+
+ if (null != sequenceType.getLastMessage()) {
+ seq.setLastMessageNumber(sequenceType.getMessageNumber());
+ ackImmediately(seq, message);
+ }
+ } else {
+ try {
+ message.getInterceptorChain().abort();
+ Conduit conduit = message.getExchange().getDestination()
+ .getBackChannel(message, null, null);
+ if (conduit != null) {
+ //for a one-way, the back channel could be
+ //null if it knows it cannot send anything.
+ Message partial = createMessage(message.getExchange());
+ partial.remove(Message.CONTENT_TYPE);
+ partial.setExchange(message.getExchange());
+ conduit.prepare(partial);
+ conduit.close(partial);
+ }
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, e.getMessage());
+ throw new RMException(e);
+ }
}
} else {
SequenceFaultFactory sff = new SequenceFaultFactory();
@@ -140,5 +169,28 @@ public class Destination extends Abstrac
getReliableEndpoint().getProxy().acknowledge(seq);
}
}
+
+ void processingComplete(Message message) {
+ SequenceType sequenceType = RMContextUtils.retrieveRMProperties(message, false).getSequence();
+ if (null == sequenceType) {
+ return;
+ }
+
+ DestinationSequence seq = getSequence(sequenceType.getIdentifier());
+ if (null != seq) {
+ seq.processingComplete(sequenceType.getMessageNumber());
+ }
+ }
+
+ private static Message createMessage(Exchange exchange) {
+ Endpoint ep = exchange.get(Endpoint.class);
+ Message msg = null;
+ if (ep != null) {
+ msg = new MessageImpl();
+ msg.setExchange(exchange);
+ msg = ep.getBinding().createMessage(msg);
+ }
+ return msg;
+ }
}
Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Thu Feb 3 21:36:05 2011
@@ -22,12 +22,16 @@ package org.apache.cxf.ws.rm;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.message.Message;
import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
@@ -51,6 +55,9 @@ public class DestinationSequence extends
private List<DeferredAcknowledgment> deferredAcknowledgments;
private SequenceTermination scheduledTermination;
private String correlationID;
+ private BigInteger inProcessNumber;
+ private BigInteger highNumberCompleted = BigInteger.ZERO;
+ private List<Continuation> continuations = new LinkedList<Continuation>();
public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d) {
this(i, a, null, null);
@@ -141,7 +148,7 @@ public class DestinationSequence extends
acknowledgement.getAcknowledgementRange().add(i, range);
}
mergeRanges();
- notifyAll();
+ wakeupAll();
}
purgeAcknowledged(messageNumber);
@@ -208,45 +215,102 @@ public class DestinationSequence extends
// can be included in a HTTP response
return getAcksTo().getAddress().getValue().equals(RMConstants.getAnonymousAddress());
}
-
+
/**
* Ensures that the delivery assurance is honored, e.g. by throwing an
* exception if the message had already been delivered and the delivery
* assurance is AtMostOnce.
- * This method blocks in case the delivery assurance is
- * InOrder and and not all messages with lower message numbers have been
- * delivered.
+ * If the delivery assurance includes either AtLeastOnce or ExactlyOnce, combined with InOrder, this
+ * queues out-of-order messages for processing after the missing messages have been received.
*
- * @param s the SequenceType object including identifier and message number
+ * @param mn message number
+ * @return <code>true</code> if message processing to continue, <code>false</code> if to be dropped
* @throws Fault if message had already been acknowledged
*/
- void applyDeliveryAssurance(BigInteger mn) throws RMException {
+ boolean applyDeliveryAssurance(BigInteger mn, Message message) throws RMException {
+ Continuation cont = getContinuation(message);
DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
- if (da.isSetAtMostOnce() && isAcknowledged(mn)) {
+ if (cont != null && da.isSetInOrder() && !cont.isNew()) {
+ return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
+ message, cont);
+ }
+ if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) && isAcknowledged(mn)) {
org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
"MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn, getIdentifier().getValue());
- LOG.log(Level.SEVERE, msg.toString());
+ LOG.log(Level.INFO, msg.toString());
throw new RMException(msg);
}
- if (da.isSetInOrder() && da.isSetAtLeastOnce()) {
- synchronized (this) {
- boolean ok = allPredecessorsAcknowledged(mn);
- while (!ok) {
- try {
- wait();
- ok = allPredecessorsAcknowledged(mn);
- } catch (InterruptedException ie) {
- // ignore
- }
+ if (da.isSetInOrder()) {
+ return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
+ message, cont);
+ }
+ return true;
+ }
+
+ private Continuation getContinuation(Message message) {
+ if (message == null) {
+ return null;
+ }
+ return message.get(Continuation.class);
+ }
+
+ synchronized boolean waitInQueue(BigInteger mn, boolean canSkip,
+ Message message, Continuation continuation) {
+ while (true) {
+
+ // can process now if no other in process and this one is next
+ if (inProcessNumber == null) {
+ BigInteger diff = mn.subtract(highNumberCompleted);
+ if (BigInteger.ONE.equals(diff) || (canSkip && diff.signum() > 0)) {
+ inProcessNumber = mn;
+ return true;
+ }
+ }
+
+ // can abort now if same message in process or already processed
+ BigInteger compare = inProcessNumber == null ? highNumberCompleted : inProcessNumber;
+ if (compare.compareTo(mn) >= 0) {
+ return false;
+ }
+ if (continuation == null) {
+ ContinuationProvider p = message.get(ContinuationProvider.class);
+ if (p != null) {
+ boolean isOneWay = message.getExchange().isOneWay();
+ message.getExchange().setOneWay(false);
+ continuation = p.getContinuation();
+ message.getExchange().setOneWay(isOneWay);
+ message.put(Continuation.class, continuation);
+ }
+ }
+
+ if (continuation != null) {
+ continuation.setObject(message);
+ if (continuation.suspend(-1)) {
+ continuations.add(continuation);
+ throw new SuspendedInvocationException();
}
}
+ try {
+ //if we get here, there isn't a continuation available
+ //so we need to block/wait
+ wait();
+ } catch (InterruptedException ie) {
+ // ignore
+ }
}
}
+ synchronized void wakeupAll() {
+ while (!continuations.isEmpty()) {
+ Continuation c = continuations.remove(0);
+ c.resume();
+ }
+ notifyAll();
+ }
- synchronized boolean allPredecessorsAcknowledged(BigInteger mn) {
- return acknowledgement.getAcknowledgementRange().size() == 1
- && acknowledgement.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
- && acknowledgement.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
+ synchronized void processingComplete(BigInteger mn) {
+ inProcessNumber = null;
+ highNumberCompleted = mn;
+ wakeupAll();
}
void purgeAcknowledged(BigInteger messageNr) {
Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java Thu Feb 3 21:36:05 2011
@@ -23,6 +23,7 @@ import org.apache.cxf.Bus;
import org.apache.cxf.common.injection.NoJSR250Annotations;
import org.apache.cxf.feature.AbstractFeature;
import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
import org.apache.cxf.ws.rm.RMInInterceptor;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMOutInterceptor;
@@ -47,6 +48,7 @@ public class RMFeature extends AbstractF
private RMInInterceptor rmLogicalIn = new RMInInterceptor();
private RMOutInterceptor rmLogicalOut = new RMOutInterceptor();
+ private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
private RMSoapInterceptor rmCodec = new RMSoapInterceptor();
public void setDeliveryAssurance(DeliveryAssuranceType da) {
@@ -91,15 +93,18 @@ public class RMFeature extends AbstractF
rmLogicalIn.setBus(bus);
rmLogicalOut.setBus(bus);
+ rmDelivery.setBus(bus);
provider.getInInterceptors().add(rmLogicalIn);
provider.getInInterceptors().add(rmCodec);
+ provider.getInInterceptors().add(rmDelivery);
provider.getOutInterceptors().add(rmLogicalOut);
provider.getOutInterceptors().add(rmCodec);
provider.getInFaultInterceptors().add(rmLogicalIn);
provider.getInFaultInterceptors().add(rmCodec);
+ provider.getInInterceptors().add(rmDelivery);
provider.getOutFaultInterceptors().add(rmLogicalOut);
provider.getOutFaultInterceptors().add(rmCodec);
Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java Thu Feb 3 21:36:05 2011
@@ -25,6 +25,7 @@ import javax.xml.namespace.QName;
import org.apache.cxf.Bus;
import org.apache.cxf.ws.policy.AbstractPolicyInterceptorProvider;
+import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
import org.apache.cxf.ws.rm.RMInInterceptor;
import org.apache.cxf.ws.rm.RMOutInterceptor;
import org.apache.cxf.ws.rm.soap.RMSoapInterceptor;
@@ -35,6 +36,7 @@ public class RMPolicyInterceptorProvider
private RMInInterceptor rmIn = new RMInInterceptor();
private RMOutInterceptor rmOut = new RMOutInterceptor();
private RMSoapInterceptor rmSoap = new RMSoapInterceptor();
+ private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
static {
Collection<QName> types = new ArrayList<QName>();
@@ -46,15 +48,18 @@ public class RMPolicyInterceptorProvider
super(ASSERTION_TYPES);
rmIn.setBus(bus);
rmOut.setBus(bus);
+ rmDelivery.setBus(bus);
getInInterceptors().add(rmIn);
getInInterceptors().add(rmSoap);
+ getInInterceptors().add(rmDelivery);
getOutInterceptors().add(rmOut);
getOutInterceptors().add(rmSoap);
getInFaultInterceptors().add(rmIn);
getInFaultInterceptors().add(rmSoap);
+ getInInterceptors().add(rmDelivery);
getOutFaultInterceptors().add(rmOut);
getOutFaultInterceptors().add(rmSoap);
Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Thu Feb 3 21:36:05 2011
@@ -462,7 +462,7 @@ public class DestinationSequenceTest ext
control.replay();
DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
ds.setDestination(destination);
- ds.applyDeliveryAssurance(mn);
+ ds.applyDeliveryAssurance(mn, null);
control.verify();
control.reset();
@@ -475,7 +475,7 @@ public class DestinationSequenceTest ext
EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
control.replay();
try {
- ds.applyDeliveryAssurance(mn);
+ ds.applyDeliveryAssurance(mn, null);
fail("Expected Fault not thrown.");
} catch (RMException ex) {
assertEquals("MESSAGE_ALREADY_DELIVERED_EXC", ex.getCode());
@@ -486,34 +486,6 @@ public class DestinationSequenceTest ext
}
@Test
- public void testInOrderNoWait() throws RMException {
- setUpDestination();
-
- BigInteger mn = BigInteger.TEN;
-
- DeliveryAssuranceType da = control.createMock(DeliveryAssuranceType.class);
- EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da);
- EasyMock.expect(da.isSetAtMostOnce()).andReturn(false);
- EasyMock.expect(da.isSetAtLeastOnce()).andReturn(true);
- EasyMock.expect(da.isSetInOrder()).andReturn(true);
-
- SequenceAcknowledgement ack = control.createMock(SequenceAcknowledgement.class);
- List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
- AcknowledgementRange r = control.createMock(AcknowledgementRange.class);
- ranges.add(r);
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
- EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
- EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
-
- control.replay();
-
- DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
- ds.setDestination(destination);
- ds.applyDeliveryAssurance(mn);
- control.verify();
- }
-
- @Test
public void testInOrderWait() {
setUpDestination();
Message[] messages = new Message[5];
@@ -549,7 +521,7 @@ public class DestinationSequenceTest ext
public void run() {
try {
ds.acknowledge(message);
- ds.applyDeliveryAssurance(messageNr);
+ ds.applyDeliveryAssurance(messageNr, message);
} catch (Exception ex) {
// ignore
}
@@ -583,51 +555,6 @@ public class DestinationSequenceTest ext
}
@Test
- public void testAllPredecessorsAcknowledged() {
-
- SequenceAcknowledgement ack = control.createMock(SequenceAcknowledgement.class);
- List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
- AcknowledgementRange r = control.createMock(AcknowledgementRange.class);
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
- control.replay();
- DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
- ds.setDestination(destination);
- assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
-
- control.reset();
- ranges.add(r);
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(2);
- EasyMock.expect(r.getLower()).andReturn(BigInteger.TEN);
- control.replay();
- assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
-
- control.reset();
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
- EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
- EasyMock.expect(r.getUpper()).andReturn(new BigInteger("5"));
- control.replay();
- assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
-
- control.reset();
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
- EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
- EasyMock.expect(r.getUpper()).andReturn(BigInteger.TEN);
- control.replay();
- assertTrue("not all predecessors acknowledged", ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
-
- ranges.add(r);
- control.reset();
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
- control.replay();
- assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
- }
-
- @Test
public void testScheduleSequenceTermination() throws SequenceFault {
Timer timer = new Timer();
setUpDestination(timer);
Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Thu Feb 3 21:36:05 2011
@@ -146,7 +146,7 @@ public class DestinationTest extends Ass
BigInteger nr = BigInteger.TEN;
EasyMock.expect(st.getMessageNumber()).andReturn(nr);
RMException ex = new RMException(new RuntimeException("already acknowledged"));
- ds.applyDeliveryAssurance(nr);
+ ds.applyDeliveryAssurance(nr, message);
EasyMock.expectLastCall().andThrow(ex);
control.replay();
try {
@@ -177,8 +177,8 @@ public class DestinationTest extends Ass
DestinationSequence ds = control.createMock(DestinationSequence.class);
EasyMock.expect(destination.getSequence(id)).andReturn(ds);
- ds.applyDeliveryAssurance(nr);
- EasyMock.expectLastCall();
+ ds.applyDeliveryAssurance(nr, message);
+ EasyMock.expectLastCall().andReturn(Boolean.TRUE);
ds.acknowledge(message);
EasyMock.expectLastCall();
SequenceType.LastMessage lm = control.createMock(SequenceType.LastMessage.class);
Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Thu Feb 3 21:36:05 2011
@@ -1104,7 +1104,8 @@ public class SequenceTest extends Abstra
OutMessageRecorder outRecorder;
String id;
- ClientThread(SpringBusFactory bf, String cfgResource, int n) {
+ ClientThread(SpringBusFactory bf, String cfgResource, int n) {
+ super("client " + n);
SequenceTest.this.initGreeter(bf, cfgResource, true, null);
greeter = SequenceTest.this.greeter;
greeterBus = SequenceTest.this.greeterBus;
@@ -1114,9 +1115,18 @@ public class SequenceTest extends Abstra
}
public void run() {
- greeter.greetMe(id + ": a");
- greeter.greetMe(id + ": b");
- greeter.greetMe(id + ": c");
+ String s = greeter.greetMe(id + ": a").toLowerCase();
+ if (!s.contains(id)) {
+ System.out.println("Correlation problem <" + s + "> <" + id + ">");
+ }
+ s = greeter.greetMe(id + ": b").toLowerCase();
+ if (!s.contains(id)) {
+ System.out.println("Correlation problem <" + s + "> <" + id + ">");
+ }
+ s = greeter.greetMe(id + ": c").toLowerCase();
+ if (!s.contains(id)) {
+ System.out.println("Correlation problem <" + s + "> <" + id + ">");
+ }
// three application messages plus createSequence
@@ -1134,9 +1144,10 @@ public class SequenceTest extends Abstra
for (int i = 0; i < clients.length; i++) {
clients[i].start();
}
-
for (int i = 0; i < clients.length; i++) {
clients[i].join();
+ }
+ for (int i = 0; i < clients.length; i++) {
MessageFlow mf = new MessageFlow(clients[i].outRecorder.getOutboundMessages(),
clients[i].inRecorder.getInboundMessages());
Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml Thu Feb 3 21:36:05 2011
@@ -31,6 +31,9 @@ http://www.springframework.org/schema/be
<property name="bus" ref="cxf"/>
</bean>
<bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+ <bean id="rmDelivery" class="org.apache.cxf.ws.rm.RMDeliveryInterceptor">
+ <property name="bus" ref="cxf"/>
+ </bean>
<!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
@@ -41,7 +44,8 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalIn"/>
<ref bean="rmCodec"/>
- <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
+ <ref bean="rmDelivery"/>
+ <!-- bean class="org.apache.cxf.interceptor.LoggingInInterceptor" /-->
</list>
</property>
<property name="inFaultInterceptors">
@@ -50,7 +54,8 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalIn"/>
<ref bean="rmCodec"/>
- <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
+ <ref bean="rmDelivery"/>
+ <!--bean class="org.apache.cxf.interceptor.LoggingInInterceptor" /-->
</list>
</property>
<property name="outInterceptors">
@@ -68,7 +73,7 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalOut"/>
<ref bean="rmCodec"/>
- <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor" />
+ <!-- bean class="org.apache.cxf.interceptor.LoggingOutInterceptor" /-->
</list>
</property>
</bean>
Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml Thu Feb 3 21:36:05 2011
@@ -31,6 +31,9 @@ http://www.springframework.org/schema/be
<property name="bus" ref="cxf"/>
</bean>
<bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+ <bean id="rmDelivery" class="org.apache.cxf.ws.rm.RMDeliveryInterceptor">
+ <property name="bus" ref="cxf"/>
+ </bean>
<!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
@@ -41,6 +44,7 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalIn"/>
<ref bean="rmCodec"/>
+ <ref bean="rmDelivery"/>
<bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
</list>
</property>
@@ -50,6 +54,7 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalIn"/>
<ref bean="rmCodec"/>
+ <ref bean="rmDelivery"/>
<bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
</list>
</property>
Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java Thu Feb 3 21:36:05 2011
@@ -55,7 +55,8 @@ public class InMessageRecorder extends A
IOUtils.copy(is, bos);
is.close();
bos.close();
- inbound.add(bos.toByteArray());
+ byte bytes[] = bos.toByteArray();
+ inbound.add(bytes);
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("inbound: " + bos.toString());
}
Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java?rev=1066987&r1=1066986&r2=1066987&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java (original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java Thu Feb 3 21:36:05 2011
@@ -24,8 +24,6 @@ import java.util.ArrayList;
import java.util.List;
import javax.xml.namespace.QName;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -33,6 +31,7 @@ import org.w3c.dom.Node;
import junit.framework.Assert;
+import org.apache.cxf.staxutils.StaxUtils;
import org.apache.cxf.ws.rm.RMConstants;
@@ -64,21 +63,18 @@ public class MessageFlow extends Assert
out.remove(0);
}
outStreams = out;
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- factory.setNamespaceAware(true);
- DocumentBuilder parser = factory.newDocumentBuilder();
inboundMessages.clear();
for (int i = 0; i < inStreams.size(); i++) {
byte[] bytes = inStreams.get(i);
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
- Document document = parser.parse(is);
+ Document document = StaxUtils.read(is);
inboundMessages.add(document);
}
outboundMessages.clear();
for (int i = 0; i < outStreams.size(); i++) {
byte[] bytes = outStreams.get(i);
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
- Document document = parser.parse(is);
+ Document document = StaxUtils.read(is);
outboundMessages.add(document);
}
}