You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2011/02/03 22:30:32 UTC
svn commit: r1066985 - in /cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/ws-specs/s...
Author: dkulp
Date: Thu Feb 3 21:30:31 2011
New Revision: 1066985
URL: http://svn.apache.org/viewvc?rev=1066985&view=rev
Log:
[CXF-3271] WS-RM code does not support InOrder assurances
Patch from Dennis Sosnoski applied
Modified patch to use continuations when available to not consume
threads.
Added:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml (with props)
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Thu Feb 3 21:30:31 2011
@@ -47,8 +47,12 @@ public abstract class AbstractRMIntercep
private RMManager manager;
private Bus bus;
+ protected AbstractRMInterceptor(String phase) {
+ super(phase);
+ }
+
protected AbstractRMInterceptor() {
- super(Phase.PRE_LOGICAL);
+ this(Phase.PRE_LOGICAL);
}
public RMManager getManager() {
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Thu Feb 3 21:30:31 2011
@@ -19,16 +19,26 @@
package org.apache.cxf.ws.rm;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
import org.apache.cxf.ws.rm.persistence.RMStore;
public class Destination extends AbstractEndpoint {
+
+ private static final Logger LOG = LogUtils.getL7dLogger(Destination.class);
private Map<String, DestinationSequence> map;
@@ -91,12 +101,31 @@ public class Destination extends Abstrac
DestinationSequence seq = getSequence(sequenceType.getIdentifier());
if (null != seq) {
- seq.applyDeliveryAssurance(sequenceType.getMessageNumber());
- seq.acknowledge(message);
-
- if (null != sequenceType.getLastMessage()) {
- seq.setLastMessageNumber(sequenceType.getMessageNumber());
- ackImmediately(seq, message);
+ if (seq.applyDeliveryAssurance(sequenceType.getMessageNumber(), message)) {
+ seq.acknowledge(message);
+
+ if (null != sequenceType.getLastMessage()) {
+ seq.setLastMessageNumber(sequenceType.getMessageNumber());
+ ackImmediately(seq, message);
+ }
+ } else {
+ try {
+ message.getInterceptorChain().abort();
+ Conduit conduit = message.getExchange().getDestination()
+ .getBackChannel(message, null, null);
+ if (conduit != null) {
+ //for a one-way, the back channel could be
+ //null if it knows it cannot send anything.
+ Message partial = createMessage(message.getExchange());
+ partial.remove(Message.CONTENT_TYPE);
+ partial.setExchange(message.getExchange());
+ conduit.prepare(partial);
+ conduit.close(partial);
+ }
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, e.getMessage());
+ throw new RMException(e);
+ }
}
} else {
SequenceFaultFactory sff = new SequenceFaultFactory();
@@ -140,5 +169,28 @@ public class Destination extends Abstrac
getReliableEndpoint().getProxy().acknowledge(seq);
}
}
+
+ void processingComplete(Message message) {
+ SequenceType sequenceType = RMContextUtils.retrieveRMProperties(message, false).getSequence();
+ if (null == sequenceType) {
+ return;
+ }
+
+ DestinationSequence seq = getSequence(sequenceType.getIdentifier());
+ if (null != seq) {
+ seq.processingComplete(sequenceType.getMessageNumber());
+ }
+ }
+
+ private static Message createMessage(Exchange exchange) {
+ Endpoint ep = exchange.get(Endpoint.class);
+ Message msg = null;
+ if (ep != null) {
+ msg = new MessageImpl();
+ msg.setExchange(exchange);
+ msg = ep.getBinding().createMessage(msg);
+ }
+ return msg;
+ }
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Thu Feb 3 21:30:31 2011
@@ -22,12 +22,16 @@ package org.apache.cxf.ws.rm;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.message.Message;
import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
@@ -51,6 +55,9 @@ public class DestinationSequence extends
private List<DeferredAcknowledgment> deferredAcknowledgments;
private SequenceTermination scheduledTermination;
private String correlationID;
+ private BigInteger inProcessNumber;
+ private BigInteger highNumberCompleted = BigInteger.ZERO;
+ private List<Continuation> continuations = new LinkedList<Continuation>();
public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d) {
this(i, a, null, null);
@@ -141,7 +148,7 @@ public class DestinationSequence extends
acknowledgement.getAcknowledgementRange().add(i, range);
}
mergeRanges();
- notifyAll();
+ wakeupAll();
}
purgeAcknowledged(messageNumber);
@@ -208,45 +215,102 @@ public class DestinationSequence extends
// can be included in a HTTP response
return getAcksTo().getAddress().getValue().equals(RMConstants.getAnonymousAddress());
}
-
+
/**
* Ensures that the delivery assurance is honored, e.g. by throwing an
* exception if the message had already been delivered and the delivery
* assurance is AtMostOnce.
- * This method blocks in case the delivery assurance is
- * InOrder and and not all messages with lower message numbers have been
- * delivered.
+ * If the delivery assurance includes either AtLeastOnce or ExactlyOnce, combined with InOrder, this
+ * queues out-of-order messages for processing after the missing messages have been received.
*
- * @param s the SequenceType object including identifier and message number
+ * @param mn message number
+ * @return <code>true</code> if message processing to continue, <code>false</code> if to be dropped
* @throws Fault if message had already been acknowledged
*/
- void applyDeliveryAssurance(BigInteger mn) throws RMException {
+ boolean applyDeliveryAssurance(BigInteger mn, Message message) throws RMException {
+ Continuation cont = getContinuation(message);
DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
- if (da.isSetAtMostOnce() && isAcknowledged(mn)) {
+ if (cont != null && da.isSetInOrder() && !cont.isNew()) {
+ return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
+ message, cont);
+ }
+ if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) && isAcknowledged(mn)) {
org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
"MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn, getIdentifier().getValue());
- LOG.log(Level.SEVERE, msg.toString());
+ LOG.log(Level.INFO, msg.toString());
throw new RMException(msg);
}
- if (da.isSetInOrder() && da.isSetAtLeastOnce()) {
- synchronized (this) {
- boolean ok = allPredecessorsAcknowledged(mn);
- while (!ok) {
- try {
- wait();
- ok = allPredecessorsAcknowledged(mn);
- } catch (InterruptedException ie) {
- // ignore
- }
+ if (da.isSetInOrder()) {
+ return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
+ message, cont);
+ }
+ return true;
+ }
+
+ private Continuation getContinuation(Message message) {
+ if (message == null) {
+ return null;
+ }
+ return message.get(Continuation.class);
+ }
+
+ synchronized boolean waitInQueue(BigInteger mn, boolean canSkip,
+ Message message, Continuation continuation) {
+ while (true) {
+
+ // can process now if no other in process and this one is next
+ if (inProcessNumber == null) {
+ BigInteger diff = mn.subtract(highNumberCompleted);
+ if (BigInteger.ONE.equals(diff) || (canSkip && diff.signum() > 0)) {
+ inProcessNumber = mn;
+ return true;
+ }
+ }
+
+ // can abort now if same message in process or already processed
+ BigInteger compare = inProcessNumber == null ? highNumberCompleted : inProcessNumber;
+ if (compare.compareTo(mn) >= 0) {
+ return false;
+ }
+ if (continuation == null) {
+ ContinuationProvider p = message.get(ContinuationProvider.class);
+ if (p != null) {
+ boolean isOneWay = message.getExchange().isOneWay();
+ message.getExchange().setOneWay(false);
+ continuation = p.getContinuation();
+ message.getExchange().setOneWay(isOneWay);
+ message.put(Continuation.class, continuation);
+ }
+ }
+
+ if (continuation != null) {
+ continuation.setObject(message);
+ if (continuation.suspend(-1)) {
+ continuations.add(continuation);
+ throw new SuspendedInvocationException();
}
}
+ try {
+ //if we get here, there isn't a continuation available
+ //so we need to block/wait
+ wait();
+ } catch (InterruptedException ie) {
+ // ignore
+ }
}
}
+ synchronized void wakeupAll() {
+ while (!continuations.isEmpty()) {
+ Continuation c = continuations.remove(0);
+ c.resume();
+ }
+ notifyAll();
+ }
- synchronized boolean allPredecessorsAcknowledged(BigInteger mn) {
- return acknowledgement.getAcknowledgementRange().size() == 1
- && acknowledgement.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
- && acknowledgement.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
+ synchronized void processingComplete(BigInteger mn) {
+ inProcessNumber = null;
+ highNumberCompleted = mn;
+ wakeupAll();
}
void purgeAcknowledged(BigInteger messageNr) {
Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java Thu Feb 3 21:30:31 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+
+/**
+ * Interceptor used for InOrder delivery of messages to the destination. This works with
+ * {@link DestinationSequence} to allow only one message at a time from a particular sequence through to the
+ * destination (since otherwise there is no way to enforce in-order delivery).
+ */
+public class RMDeliveryInterceptor extends AbstractRMInterceptor<Message> {
+
+ private static final Logger LOG = LogUtils.getL7dLogger(RMDeliveryInterceptor.class);
+
+ public RMDeliveryInterceptor() {
+ super(Phase.POST_INVOKE);
+ }
+
+ // Interceptor interface
+
+ public void handle(Message message) throws SequenceFault, RMException {
+ LOG.entering(getClass().getName(), "handleMessage");
+ getManager().getDestination(message).processingComplete(message);
+ }
+}
Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java Thu Feb 3 21:30:31 2011
@@ -23,6 +23,7 @@ import org.apache.cxf.Bus;
import org.apache.cxf.common.injection.NoJSR250Annotations;
import org.apache.cxf.feature.AbstractFeature;
import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
import org.apache.cxf.ws.rm.RMInInterceptor;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMOutInterceptor;
@@ -47,6 +48,7 @@ public class RMFeature extends AbstractF
private RMInInterceptor rmLogicalIn = new RMInInterceptor();
private RMOutInterceptor rmLogicalOut = new RMOutInterceptor();
+ private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
private RMSoapInterceptor rmCodec = new RMSoapInterceptor();
public void setDeliveryAssurance(DeliveryAssuranceType da) {
@@ -91,15 +93,18 @@ public class RMFeature extends AbstractF
rmLogicalIn.setBus(bus);
rmLogicalOut.setBus(bus);
+ rmDelivery.setBus(bus);
provider.getInInterceptors().add(rmLogicalIn);
provider.getInInterceptors().add(rmCodec);
+ provider.getInInterceptors().add(rmDelivery);
provider.getOutInterceptors().add(rmLogicalOut);
provider.getOutInterceptors().add(rmCodec);
provider.getInFaultInterceptors().add(rmLogicalIn);
provider.getInFaultInterceptors().add(rmCodec);
+ provider.getInInterceptors().add(rmDelivery);
provider.getOutFaultInterceptors().add(rmLogicalOut);
provider.getOutFaultInterceptors().add(rmCodec);
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java Thu Feb 3 21:30:31 2011
@@ -25,6 +25,7 @@ import javax.xml.namespace.QName;
import org.apache.cxf.Bus;
import org.apache.cxf.ws.policy.AbstractPolicyInterceptorProvider;
+import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
import org.apache.cxf.ws.rm.RMInInterceptor;
import org.apache.cxf.ws.rm.RMOutInterceptor;
import org.apache.cxf.ws.rm.soap.RMSoapInterceptor;
@@ -35,6 +36,7 @@ public class RMPolicyInterceptorProvider
private RMInInterceptor rmIn = new RMInInterceptor();
private RMOutInterceptor rmOut = new RMOutInterceptor();
private RMSoapInterceptor rmSoap = new RMSoapInterceptor();
+ private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
static {
Collection<QName> types = new ArrayList<QName>();
@@ -46,15 +48,18 @@ public class RMPolicyInterceptorProvider
super(ASSERTION_TYPES);
rmIn.setBus(bus);
rmOut.setBus(bus);
+ rmDelivery.setBus(bus);
getInInterceptors().add(rmIn);
getInInterceptors().add(rmSoap);
+ getInInterceptors().add(rmDelivery);
getOutInterceptors().add(rmOut);
getOutInterceptors().add(rmSoap);
getInFaultInterceptors().add(rmIn);
getInFaultInterceptors().add(rmSoap);
+ getInInterceptors().add(rmDelivery);
getOutFaultInterceptors().add(rmOut);
getOutFaultInterceptors().add(rmSoap);
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Thu Feb 3 21:30:31 2011
@@ -462,7 +462,7 @@ public class DestinationSequenceTest ext
control.replay();
DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
ds.setDestination(destination);
- ds.applyDeliveryAssurance(mn);
+ ds.applyDeliveryAssurance(mn, null);
control.verify();
control.reset();
@@ -475,7 +475,7 @@ public class DestinationSequenceTest ext
EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
control.replay();
try {
- ds.applyDeliveryAssurance(mn);
+ ds.applyDeliveryAssurance(mn, null);
fail("Expected Fault not thrown.");
} catch (RMException ex) {
assertEquals("MESSAGE_ALREADY_DELIVERED_EXC", ex.getCode());
@@ -486,34 +486,6 @@ public class DestinationSequenceTest ext
}
@Test
- public void testInOrderNoWait() throws RMException {
- setUpDestination();
-
- BigInteger mn = BigInteger.TEN;
-
- DeliveryAssuranceType da = control.createMock(DeliveryAssuranceType.class);
- EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da);
- EasyMock.expect(da.isSetAtMostOnce()).andReturn(false);
- EasyMock.expect(da.isSetAtLeastOnce()).andReturn(true);
- EasyMock.expect(da.isSetInOrder()).andReturn(true);
-
- SequenceAcknowledgement ack = control.createMock(SequenceAcknowledgement.class);
- List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
- AcknowledgementRange r = control.createMock(AcknowledgementRange.class);
- ranges.add(r);
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
- EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
- EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
-
- control.replay();
-
- DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
- ds.setDestination(destination);
- ds.applyDeliveryAssurance(mn);
- control.verify();
- }
-
- @Test
public void testInOrderWait() {
setUpDestination();
Message[] messages = new Message[5];
@@ -549,7 +521,7 @@ public class DestinationSequenceTest ext
public void run() {
try {
ds.acknowledge(message);
- ds.applyDeliveryAssurance(messageNr);
+ ds.applyDeliveryAssurance(messageNr, message);
} catch (Exception ex) {
// ignore
}
@@ -583,51 +555,6 @@ public class DestinationSequenceTest ext
}
@Test
- public void testAllPredecessorsAcknowledged() {
-
- SequenceAcknowledgement ack = control.createMock(SequenceAcknowledgement.class);
- List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
- AcknowledgementRange r = control.createMock(AcknowledgementRange.class);
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
- control.replay();
- DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
- ds.setDestination(destination);
- assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
-
- control.reset();
- ranges.add(r);
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(2);
- EasyMock.expect(r.getLower()).andReturn(BigInteger.TEN);
- control.replay();
- assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
-
- control.reset();
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
- EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
- EasyMock.expect(r.getUpper()).andReturn(new BigInteger("5"));
- control.replay();
- assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
-
- control.reset();
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
- EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
- EasyMock.expect(r.getUpper()).andReturn(BigInteger.TEN);
- control.replay();
- assertTrue("not all predecessors acknowledged", ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
-
- ranges.add(r);
- control.reset();
- EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
- control.replay();
- assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
- control.verify();
- }
-
- @Test
public void testScheduleSequenceTermination() throws SequenceFault {
Timer timer = new Timer();
setUpDestination(timer);
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Thu Feb 3 21:30:31 2011
@@ -146,7 +146,7 @@ public class DestinationTest extends Ass
BigInteger nr = BigInteger.TEN;
EasyMock.expect(st.getMessageNumber()).andReturn(nr);
RMException ex = new RMException(new RuntimeException("already acknowledged"));
- ds.applyDeliveryAssurance(nr);
+ ds.applyDeliveryAssurance(nr, message);
EasyMock.expectLastCall().andThrow(ex);
control.replay();
try {
@@ -177,8 +177,8 @@ public class DestinationTest extends Ass
DestinationSequence ds = control.createMock(DestinationSequence.class);
EasyMock.expect(destination.getSequence(id)).andReturn(ds);
- ds.applyDeliveryAssurance(nr);
- EasyMock.expectLastCall();
+ ds.applyDeliveryAssurance(nr, message);
+ EasyMock.expectLastCall().andReturn(Boolean.TRUE);
ds.acknowledge(message);
EasyMock.expectLastCall();
SequenceType.LastMessage lm = control.createMock(SequenceType.LastMessage.class);
Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java Thu Feb 3 21:30:31 2011
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.io.StringReader;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+import javax.jws.WebService;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.Provider;
+import javax.xml.ws.Service.Mode;
+import javax.xml.ws.ServiceMode;
+import javax.xml.xpath.XPathConstants;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.frontend.ClientProxy;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.helpers.XMLUtils;
+import org.apache.cxf.helpers.XPathUtils;
+import org.apache.cxf.systest.ws.util.ConnectionHelper;
+import org.apache.cxf.test.TestUtilities;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.ws.rm.RMManager;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Tests the operation of InOrder delivery assurance for one-way messages to the server.
+ */
+public class DeliveryAssuranceOnewayTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = allocatePort(DeliveryAssuranceOnewayTest.class);
+ private static final String GREETER_ADDRESS
+ = "http://localhost:" + PORT + "/SoapContext/GreeterPort";
+
+ private static final Logger LOG = LogUtils.getLogger(DeliveryAssuranceOnewayTest.class);
+
+ private Bus serverBus;
+ private Endpoint endpoint;
+ private Bus greeterBus;
+ private Greeter greeter;
+
+
+ @BeforeClass
+ public static void setProps() throws Exception {
+ TestUtilities.setKeepAliveSystemProperty(false);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ TestUtilities.recoverKeepAliveSystemProperty();
+ }
+
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ stopClient();
+ } catch (Throwable t) {
+ //ignore
+ }
+ try {
+ stopServer();
+ } catch (Throwable t) {
+ //ignore
+ }
+ Thread.sleep(100);
+ }
+
+/* @Test
+ public void testAtLeastOnce() throws Exception {
+ testOnewayAtLeastOnce(null);
+ }
+
+ @Test
+ public void testAtLeastOnceAsyncExecutor() throws Exception {
+ testOnewayAtLeastOnce(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayAtLeastOnce(Executor executor) throws Exception {
+ init("org/apache/cxf/systest/ws/rm/atleastonce.xml", executor);
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+ String[] callArgs = new String[] {"one", "two", "three", "four"};
+ for (int i = 0; i < callArgs.length; i++) {
+ greeter.greetMeOneWay(callArgs[i]);
+ }
+
+ awaitMessages(callArgs.length + 2, 3000, 60000);
+ List<String> actualArgs = GreeterProvider.CALL_ARGS;
+ assertTrue("Too few messages", callArgs.length <= actualArgs.size());
+ for (int i = 0; i < callArgs.length; i++) {
+ boolean match = false;
+ for (int j = 0; j < actualArgs.size(); j++) {
+ if (actualArgs.get(j).equals(callArgs[i])) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ fail("No match for request " + callArgs[i]);
+ }
+ }
+
+ }
+
+ @Test
+ public void testAtMostOnce() throws Exception {
+ testOnewayAtMostOnce(null);
+ }
+
+ @Test
+ public void testAtMostOnceAsyncExecutor() throws Exception {
+ testOnewayAtMostOnce(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayAtMostOnce(Executor executor) throws Exception {
+ init("org/apache/cxf/systest/ws/rm/atmostonce.xml", executor);
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+ String[] callArgs = new String[] {"one", "two", "three", "four"};
+ for (int i = 0; i < callArgs.length; i++) {
+ greeter.greetMeOneWay(callArgs[i]);
+ }
+
+ awaitMessages(callArgs.length, 3000, 60000);
+ List<String> actualArgs = GreeterProvider.CALL_ARGS;
+ assertTrue("Too many messages", callArgs.length >= actualArgs.size());
+ for (int i = 0; i < actualArgs.size() - 1; i++) {
+ for (int j = i + 1; j < actualArgs.size(); j++) {
+ if (actualArgs.get(j).equals(actualArgs.get(i))) {
+ fail("Message received more than once " + callArgs[i]);
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testExactlyOnce() throws Exception {
+ testOnewayExactlyOnce(null);
+ }
+
+ @Test
+ public void testExactlyOnceAsyncExecutor() throws Exception {
+ testOnewayExactlyOnce(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayExactlyOnce(Executor executor) throws Exception {
+ init("org/apache/cxf/systest/ws/rm/exactlyonce.xml", executor);
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+ String[] callArgs = new String[] {"one", "two", "three", "four"};
+ for (int i = 0; i < callArgs.length; i++) {
+ greeter.greetMeOneWay(callArgs[i]);
+ }
+
+ awaitMessages(callArgs.length, 3000, 60000);
+ List<String> actualArgs = GreeterProvider.CALL_ARGS;
+ assertEquals("Wrong message count", callArgs.length, actualArgs.size());
+ for (int i = 0; i < callArgs.length; i++) {
+ boolean match = false;
+ for (int j = 0; j < actualArgs.size(); j++) {
+ if (actualArgs.get(j).equals(callArgs[i])) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ fail("No match for request " + callArgs[i]);
+ }
+ }
+
+ }*/
+
+ @Test
+ public void testInOrder() throws Exception {
+ testOnewayInOrder(null);
+ }
+
+ @Test
+ public void testInOrderAsyncExecutor() throws Exception {
+ testOnewayInOrder(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayInOrder(Executor executor) throws Exception {
+ init("org/apache/cxf/systest/ws/rm/inorder.xml", executor);
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+ String[] callArgs = new String[] {"one", "two", "three", "four"};
+ for (int i = 0; i < callArgs.length; i++) {
+ greeter.greetMeOneWay(callArgs[i]);
+ }
+
+ awaitMessages(callArgs.length - 2, 3000, 60000);
+ List<String> actualArgs = GreeterProvider.CALL_ARGS;
+ int argNum = 0;
+ for (String actual : actualArgs) {
+ while (argNum < callArgs.length && !actual.equals(callArgs[argNum])) {
+ argNum++;
+ }
+ assertTrue("Message out of order", argNum < callArgs.length);
+ }
+ }
+
+ @Test
+ public void testAtMostOnceInOrder() throws Exception {
+ testOnewayAtMostOnceInOrder(null);
+ }
+
+ @Test
+ public void testAtMostOnceInOrderAsyncExecutor() throws Exception {
+ testOnewayAtMostOnceInOrder(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayAtMostOnceInOrder(Executor executor) throws Exception {
+ init("org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml", executor);
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+ String[] callArgs = new String[] {"one", "two", "three", "four"};
+ for (int i = 0; i < callArgs.length; i++) {
+ greeter.greetMeOneWay(callArgs[i]);
+ }
+
+ awaitMessages(callArgs.length - 2, 3000, 60000);
+ List<String> actualArgs = GreeterProvider.CALL_ARGS;
+ assertTrue("Too many messages", callArgs.length >= actualArgs.size());
+ int argNum = 0;
+ for (String actual : actualArgs) {
+ while (argNum < callArgs.length && !actual.equals(callArgs[argNum])) {
+ argNum++;
+ }
+ assertTrue("Message out of order", argNum < callArgs.length);
+ }
+ }
+
+ @Test
+ public void testExactlyOnceInOrder() throws Exception {
+ testOnewayExactlyOnceInOrder(null);
+ }
+
+ @Test
+ public void testExactlyOnceInOrderAsyncExecutor() throws Exception {
+ testOnewayExactlyOnceInOrder(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayExactlyOnceInOrder(Executor executor) throws Exception {
+ init("org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml", executor);
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+ String[] callArgs = new String[] {"one", "two", "three", "four"};
+ for (int i = 0; i < callArgs.length; i++) {
+ greeter.greetMeOneWay(callArgs[i]);
+ }
+
+ awaitMessages(callArgs.length, 3000, 60000);
+ List<String> actualArgs = GreeterProvider.CALL_ARGS;
+ assertEquals("Wrong number of messages", callArgs.length, actualArgs.size());
+ int argNum = 0;
+ for (String actual : actualArgs) {
+ while (argNum < callArgs.length && !actual.equals(callArgs[argNum])) {
+ argNum++;
+ }
+ assertTrue("Message out of order", argNum < callArgs.length);
+ }
+ }
+
+ // --- test utilities ---
+
+ private void init(String cfgResource, Executor executor) {
+
+ SpringBusFactory bf = new SpringBusFactory();
+ initServer(bf, cfgResource);
+ initGreeterBus(bf, cfgResource);
+ initProxy(executor);
+ }
+
+ private void initServer(SpringBusFactory bf, String cfgResource) {
+ String derbyHome = System.getProperty("derby.system.home");
+ try {
+ synchronized (GreeterProvider.CALL_ARGS) {
+ GreeterProvider.CALL_ARGS.clear();
+ }
+ System.setProperty("derby.system.home", derbyHome + "-server");
+ serverBus = bf.createBus(cfgResource);
+ BusFactory.setDefaultBus(serverBus);
+ LOG.info("Initialised bus " + serverBus + " with cfg file resource: " + cfgResource);
+ LOG.info("serverBus inInterceptors: " + serverBus.getInInterceptors());
+ endpoint = Endpoint.publish(GREETER_ADDRESS, new GreeterProvider());
+ } finally {
+ if (derbyHome != null) {
+ System.setProperty("derby.system.home", derbyHome);
+ } else {
+ System.clearProperty("derby.system.home");
+ }
+ }
+ }
+
+ private void initGreeterBus(SpringBusFactory bf,
+ String cfgResource) {
+ greeterBus = bf.createBus(cfgResource);
+ BusFactory.setDefaultBus(greeterBus);
+ LOG.fine("Initialised greeter bus with configuration: " + cfgResource);
+ }
+
+ private void initProxy(Executor executor) {
+ GreeterService gs = new GreeterService();
+
+ if (null != executor) {
+ gs.setExecutor(executor);
+ }
+
+ greeter = gs.getGreeterPort();
+ try {
+ updateAddressPort(greeter, PORT);
+ } catch (Exception e) {
+ //ignore
+ }
+ LOG.fine("Created greeter client.");
+
+ ConnectionHelper.setKeepAliveConnection(greeter, true);
+ }
+
+ private void stopClient() {
+ if (null != greeterBus) {
+
+ //ensure we close the decoupled destination of the conduit,
+ //so that release the port if the destination reference count hit zero
+ if (greeter != null) {
+ ClientProxy.getClient(greeter).getConduit().close();
+ }
+ greeterBus.shutdown(true);
+ greeter = null;
+ greeterBus = null;
+ }
+ }
+
+ private void stopServer() {
+ if (null != endpoint) {
+ LOG.info("Stopping Greeter endpoint");
+ endpoint.stop();
+ } else {
+ LOG.info("No endpoint active.");
+ }
+ endpoint = null;
+ if (null != serverBus) {
+ serverBus.shutdown(true);
+ serverBus = null;
+ }
+ }
+
+ /**
+ * @param nExpectedIn number of messages to wait for
+ * @param delay added delay before return (in case more are coming)
+ * @param timeout maximum time to wait for expected messages
+ */
+ private void awaitMessages(int nExpectedIn, int delay, int timeout) {
+ int waited = 0;
+ int nIn = 0;
+ while (waited <= timeout) {
+ synchronized (GreeterProvider.CALL_ARGS) {
+ nIn = GreeterProvider.CALL_ARGS.size();
+ }
+ if (nIn >= nExpectedIn) {
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ waited += 100;
+ }
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+
+ @WebService(serviceName = "GreeterService",
+ portName = "GreeterPort",
+ targetNamespace = "http://cxf.apache.org/greeter_control",
+ wsdlLocation = "/wsdl/greeter_control.wsdl")
+ @ServiceMode(Mode.PAYLOAD)
+ public static class GreeterProvider implements Provider<Source> {
+
+ public static final List<String> CALL_ARGS = new ArrayList<String>();
+
+ public Source invoke(Source obj) {
+
+ Node el;
+ try {
+ el = XMLUtils.fromSource(obj);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (el instanceof Document) {
+ el = ((Document)el).getDocumentElement();
+ }
+
+ Map<String, String> ns = new HashMap<String, String>();
+ ns.put("ns", "http://cxf.apache.org/greeter_control/types");
+ XPathUtils xp = new XPathUtils(ns);
+ String s = (String)xp.getValue("/ns:greetMe/ns:requestType",
+ el,
+ XPathConstants.STRING);
+
+ if (s == null || "".equals(s)) {
+ s = (String)xp.getValue("/ns:greetMeOneWay/ns:requestType",
+ el,
+ XPathConstants.STRING);
+ synchronized (CALL_ARGS) {
+ CALL_ARGS.add(s);
+ }
+ return null;
+ } else {
+ synchronized (CALL_ARGS) {
+ CALL_ARGS.add(s);
+ }
+ String resp =
+ "<greetMeResponse "
+ + "xmlns=\"http://cxf.apache.org/greeter_control/types\">"
+ + "<responseType>" + s.toUpperCase() + "</responseType>"
+ + "</greetMeResponse>";
+ return new StreamSource(new StringReader(resp));
+ }
+ }
+ }
+}
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Thu Feb 3 21:30:31 2011
@@ -1104,7 +1104,8 @@ public class SequenceTest extends Abstra
OutMessageRecorder outRecorder;
String id;
- ClientThread(SpringBusFactory bf, String cfgResource, int n) {
+ ClientThread(SpringBusFactory bf, String cfgResource, int n) {
+ super("client " + n);
SequenceTest.this.initGreeter(bf, cfgResource, true, null);
greeter = SequenceTest.this.greeter;
greeterBus = SequenceTest.this.greeterBus;
@@ -1114,9 +1115,18 @@ public class SequenceTest extends Abstra
}
public void run() {
- greeter.greetMe(id + ": a");
- greeter.greetMe(id + ": b");
- greeter.greetMe(id + ": c");
+ String s = greeter.greetMe(id + ": a").toLowerCase();
+ if (!s.contains(id)) {
+ System.out.println("Correlation problem <" + s + "> <" + id + ">");
+ }
+ s = greeter.greetMe(id + ": b").toLowerCase();
+ if (!s.contains(id)) {
+ System.out.println("Correlation problem <" + s + "> <" + id + ">");
+ }
+ s = greeter.greetMe(id + ": c").toLowerCase();
+ if (!s.contains(id)) {
+ System.out.println("Correlation problem <" + s + "> <" + id + ">");
+ }
// three application messages plus createSequence
@@ -1134,9 +1144,10 @@ public class SequenceTest extends Abstra
for (int i = 0; i < clients.length; i++) {
clients[i].start();
}
-
for (int i = 0; i < clients.length; i++) {
clients[i].join();
+ }
+ for (int i = 0; i < clients.length; i++) {
MessageFlow mf = new MessageFlow(clients[i].outRecorder.getOutboundMessages(),
clients[i].inRecorder.getInboundMessages());
Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml Thu Feb 3 21:30:31 2011
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xsi:schemaLocation="
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <import resource="rminterceptors.xml" />
+
+ <wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000" />
+ <wsrm-policy:AcknowledgementInterval Milliseconds="10000" />
+ </wsrm-policy:RMAssertion>
+ <wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:AtLeastOnce />
+ </wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:destinationPolicy>
+ <wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+ </wsrm-mgr:destinationPolicy>
+ </wsrm-mgr:rmManager>
+
+
+</beans>
\ No newline at end of file
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml Thu Feb 3 21:30:31 2011
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xsi:schemaLocation="
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <import resource="rminterceptors.xml" />
+
+ <wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000" />
+ <wsrm-policy:AcknowledgementInterval Milliseconds="10000" />
+ </wsrm-policy:RMAssertion>
+ <wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:AtMostOnce />
+ <wsrm-mgr:InOrder />
+ </wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:destinationPolicy>
+ <wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+ </wsrm-mgr:destinationPolicy>
+ </wsrm-mgr:rmManager>
+
+
+</beans>
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml Thu Feb 3 21:30:31 2011
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xsi:schemaLocation="
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <import resource="rminterceptors.xml" />
+
+ <wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000" />
+ <wsrm-policy:AcknowledgementInterval Milliseconds="10000" />
+ </wsrm-policy:RMAssertion>
+ <wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:ExactlyOnce />
+ <wsrm-mgr:InOrder />
+ </wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:destinationPolicy>
+ <wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+ </wsrm-mgr:destinationPolicy>
+ </wsrm-mgr:rmManager>
+
+
+</beans>
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml Thu Feb 3 21:30:31 2011
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xsi:schemaLocation="
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <import resource="rminterceptors.xml" />
+
+ <wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000" />
+ <wsrm-policy:AcknowledgementInterval Milliseconds="10000" />
+ </wsrm-policy:RMAssertion>
+ <wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:InOrder />
+ </wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:destinationPolicy>
+ <wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+ </wsrm-mgr:destinationPolicy>
+ </wsrm-mgr:rmManager>
+
+
+</beans>
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml Thu Feb 3 21:30:31 2011
@@ -31,6 +31,9 @@ http://www.springframework.org/schema/be
<property name="bus" ref="cxf"/>
</bean>
<bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+ <bean id="rmDelivery" class="org.apache.cxf.ws.rm.RMDeliveryInterceptor">
+ <property name="bus" ref="cxf"/>
+ </bean>
<!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
@@ -41,7 +44,8 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalIn"/>
<ref bean="rmCodec"/>
- <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
+ <ref bean="rmDelivery"/>
+ <!-- bean class="org.apache.cxf.interceptor.LoggingInInterceptor" /-->
</list>
</property>
<property name="inFaultInterceptors">
@@ -50,7 +54,8 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalIn"/>
<ref bean="rmCodec"/>
- <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
+ <ref bean="rmDelivery"/>
+ <!--bean class="org.apache.cxf.interceptor.LoggingInInterceptor" /-->
</list>
</property>
<property name="outInterceptors">
@@ -68,7 +73,7 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalOut"/>
<ref bean="rmCodec"/>
- <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor" />
+ <!-- bean class="org.apache.cxf.interceptor.LoggingOutInterceptor" /-->
</list>
</property>
</bean>
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml Thu Feb 3 21:30:31 2011
@@ -31,6 +31,9 @@ http://www.springframework.org/schema/be
<property name="bus" ref="cxf"/>
</bean>
<bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+ <bean id="rmDelivery" class="org.apache.cxf.ws.rm.RMDeliveryInterceptor">
+ <property name="bus" ref="cxf"/>
+ </bean>
<!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
@@ -41,6 +44,7 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalIn"/>
<ref bean="rmCodec"/>
+ <ref bean="rmDelivery"/>
<bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
</list>
</property>
@@ -50,6 +54,7 @@ http://www.springframework.org/schema/be
<ref bean="mapCodec"/>
<ref bean="rmLogicalIn"/>
<ref bean="rmCodec"/>
+ <ref bean="rmDelivery"/>
<bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
</list>
</property>
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java Thu Feb 3 21:30:31 2011
@@ -55,7 +55,8 @@ public class InMessageRecorder extends A
IOUtils.copy(is, bos);
is.close();
bos.close();
- inbound.add(bos.toByteArray());
+ byte bytes[] = bos.toByteArray();
+ inbound.add(bytes);
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("inbound: " + bos.toString());
}
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java Thu Feb 3 21:30:31 2011
@@ -24,8 +24,6 @@ import java.util.ArrayList;
import java.util.List;
import javax.xml.namespace.QName;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -33,6 +31,7 @@ import org.w3c.dom.Node;
import junit.framework.Assert;
+import org.apache.cxf.staxutils.StaxUtils;
import org.apache.cxf.ws.rm.RMConstants;
@@ -64,21 +63,18 @@ public class MessageFlow extends Assert
out.remove(0);
}
outStreams = out;
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- factory.setNamespaceAware(true);
- DocumentBuilder parser = factory.newDocumentBuilder();
inboundMessages.clear();
for (int i = 0; i < inStreams.size(); i++) {
byte[] bytes = inStreams.get(i);
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
- Document document = parser.parse(is);
+ Document document = StaxUtils.read(is);
inboundMessages.add(document);
}
outboundMessages.clear();
for (int i = 0; i < outStreams.size(); i++) {
byte[] bytes = outStreams.get(i);
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
- Document document = parser.parse(is);
+ Document document = StaxUtils.read(is);
outboundMessages.add(document);
}
}