You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ds...@apache.org on 2014/06/23 03:33:11 UTC
[4/5] git commit: [CXF-3272] Return acknowledgement rather than Fault
when duplicate message received.
[CXF-3272] Return acknowledgement rather than Fault when duplicate
message received.
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c613aa49
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c613aa49
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c613aa49
Branch: refs/heads/master
Commit: c613aa4919e81a3027d0dfbbf3c4ee0a1c6ce348
Parents: 0494f76
Author: dsosnoski <ds...@apache.org>
Authored: Mon Jun 23 11:52:03 2014 +1200
Committer: dsosnoski <ds...@apache.org>
Committed: Mon Jun 23 11:52:03 2014 +1200
----------------------------------------------------------------------
.../java/org/apache/cxf/ws/rm/Destination.java | 22 +-
.../apache/cxf/ws/rm/DestinationSequence.java | 31 ++-
.../apache/cxf/ws/rm/InternalContextUtils.java | 275 +++++++++++++++++++
.../org/apache/cxf/ws/rm/RMOutInterceptor.java | 4 +-
.../cxf/ws/rm/DestinationSequenceTest.java | 8 +-
.../org/apache/cxf/ws/rm/DestinationTest.java | 32 +--
.../apache/cxf/systest/ws/rm/SequenceTest.java | 42 +--
.../apache/cxf/systest/ws/rm/rminterceptors.xml | 8 +-
.../systest/ws/policy/RM12PolicyWsdlTest.java | 2 +-
9 files changed, 357 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
index 378c9b2..77c9c49 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
@@ -126,16 +126,24 @@ public class Destination extends AbstractEndpoint {
} else {
try {
message.getInterceptorChain().abort();
- Conduit conduit = message.getExchange().getDestination()
- .getBackChannel(message);
+ if (seq.sendAcknowledgement()) {
+ ackImmediately(seq, message);
+ }
+ Exchange exchange = message.getExchange();
+ Conduit conduit = exchange.getDestination().getBackChannel(message);
if (conduit != null) {
//for a one-way, the back channel could be
//null if it knows it cannot send anything.
- Message partial = createMessage(message.getExchange());
- partial.remove(Message.CONTENT_TYPE);
- partial.setExchange(message.getExchange());
- conduit.prepare(partial);
- conduit.close(partial);
+ if (seq.sendAcknowledgement()) {
+ AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
+ InternalContextUtils.rebaseResponse(null, maps, message);
+ } else {
+ Message response = createMessage(exchange);
+ response.setExchange(exchange);
+ response.remove(Message.CONTENT_TYPE);
+ conduit.prepare(response);
+ conduit.close(response);
+ }
}
} catch (IOException e) {
LOG.log(Level.SEVERE, e.getMessage());
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index d0aef1d..36f44d6 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -59,6 +59,7 @@ public class DestinationSequence extends AbstractSequence {
private String correlationID;
private volatile long inProcessNumber;
private volatile long highNumberCompleted;
+ private long nextInOrder;
private List<Continuation> continuations = new LinkedList<Continuation>();
private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
@@ -223,23 +224,21 @@ public class DestinationSequence extends AbstractSequence {
}
/**
- * Ensures that the delivery assurance is honored, e.g. by throwing an
- * exception if the message had already been delivered and the delivery
- * assurance is AtMostOnce.
+ * Ensures that the delivery assurance is honored.
* If the delivery assurance includes either AtLeastOnce or ExactlyOnce, combined with InOrder, this
* queues out-of-order messages for processing after the missing messages have been received.
*
* @param mn message number
* @return <code>true</code> if message processing to continue, <code>false</code> if to be dropped
- * @throws RMException if message had already been acknowledged
*/
- boolean applyDeliveryAssurance(long mn, Message message) throws RMException {
+ boolean applyDeliveryAssurance(long mn, Message message) {
Continuation cont = getContinuation(message);
RMConfiguration config = destination.getReliableEndpoint().getConfiguration();
DeliveryAssurance da = config.getDeliveryAssurance();
boolean canSkip = da != DeliveryAssurance.AT_LEAST_ONCE && da != DeliveryAssurance.EXACTLY_ONCE;
boolean robust = false;
boolean robustDelivering = false;
+ boolean inOrder = mn - nextInOrder == 1;
if (message != null) {
robust = MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
if (robust) {
@@ -250,22 +249,30 @@ public class DestinationSequence extends AbstractSequence {
if (robust && !robustDelivering) {
// no check performed if in robust and not in delivering
removeDeliveringMessageNumber(mn);
+ if (inOrder) {
+ nextInOrder++;
+ }
return true;
}
+ if (inOrder) {
+ nextInOrder++;
+ } else {
+
+ // message out of order, schedule acknowledgement to update sender
+ scheduleImmediateAcknowledgement();
+ if (nextInOrder < mn) {
+ nextInOrder = mn + 1;
+ }
+ }
if (cont != null && config.isInOrder() && !cont.isNew()) {
return waitInQueue(mn, canSkip, message, cont);
}
if ((da == DeliveryAssurance.EXACTLY_ONCE || da == DeliveryAssurance.AT_MOST_ONCE)
- && (isAcknowledged(mn)
- || (robustDelivering && deliveringMessageNumbers.contains(mn)))) {
-
- // acknowledge at first opportunity following duplicate message
- scheduleImmediateAcknowledgement();
+ && (isAcknowledged(mn) || (robustDelivering && deliveringMessageNumbers.contains(mn)))) {
org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
"MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn, getIdentifier().getValue());
LOG.log(Level.INFO, msg.toString());
- throw new RMException(msg);
-
+ return false;
}
if (robustDelivering) {
deliveringMessageNumbers.add(mn);
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java
new file mode 100644
index 0000000..eac5662
--- /dev/null
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.ConduitSelector;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.NullConduitSelector;
+import org.apache.cxf.endpoint.PreexistingConduitSelector;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.interceptor.OutgoingChainInterceptor;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.ConduitInitiatorManager;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.workqueue.OneShotAsyncExecutor;
+import org.apache.cxf.workqueue.SynchronousExecutor;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.ContextUtils;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
+import org.apache.cxf.ws.addressing.Names;
+
+
+
+/**
+ * Holder for utility methods relating to contexts. Somewhat stripped-down version of class of same name in
+ * org.apache.cxf.ws.addressing.impl.
+ */
+final class InternalContextUtils {
+ private static final class DecoupledDestination implements Destination {
+ private final EndpointInfo ei;
+ private final EndpointReferenceType reference;
+
+ private DecoupledDestination(EndpointInfo ei, EndpointReferenceType reference) {
+ this.ei = ei;
+ this.reference = reference;
+ }
+
+ public EndpointReferenceType getAddress() {
+ return reference;
+ }
+
+ public Conduit getBackChannel(Message inMessage) throws IOException {
+ if (ContextUtils.isNoneAddress(reference)) {
+ return null;
+ }
+ Bus bus = inMessage.getExchange().get(Bus.class);
+ //this is a response targeting a decoupled endpoint. Treat it as a oneway so
+ //we don't wait for a response.
+ inMessage.getExchange().setOneWay(true);
+ ConduitInitiator conduitInitiator
+ = bus.getExtension(ConduitInitiatorManager.class)
+ .getConduitInitiatorForUri(reference.getAddress().getValue());
+ if (conduitInitiator != null) {
+ Conduit c = conduitInitiator.getConduit(ei, reference, bus);
+ // ensure decoupled back channel input stream is closed
+ c.setMessageObserver(new MessageObserver() {
+ public void onMessage(Message m) {
+ InputStream is = m.getContent(InputStream.class);
+ if (is != null) {
+ try {
+ is.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ });
+ return c;
+ }
+ return null;
+ }
+
+ public MessageObserver getMessageObserver() {
+ return null;
+ }
+
+ public void shutdown() {
+ }
+
+ public void setMessageObserver(MessageObserver observer) {
+ }
+ }
+
+ private static final Logger LOG = LogUtils.getL7dLogger(InternalContextUtils.class);
+
+ /**
+ * Prevents instantiation.
+ */
+ private InternalContextUtils() {
+ }
+
+
+ /**
+ * Rebase response on replyTo
+ *
+ * @param reference the replyTo reference
+ * @param inMAPs the inbound MAPs
+ * @param inMessage the current message
+ */
+ //CHECKSTYLE:OFF Max executable statement count limitation
+ public static void rebaseResponse(EndpointReferenceType reference,
+ AddressingProperties inMAPs,
+ final Message inMessage) {
+
+ String namespaceURI = inMAPs.getNamespaceURI();
+ if (!ContextUtils.retrievePartialResponseSent(inMessage)) {
+ ContextUtils.storePartialResponseSent(inMessage);
+ Exchange exchange = inMessage.getExchange();
+ Message fullResponse = exchange.getOutMessage();
+ Message partialResponse = ContextUtils.createMessage(exchange);
+ ensurePartialResponseMAPs(partialResponse, namespaceURI);
+
+ // ensure the inbound MAPs are available in the partial response
+ // message (used to determine relatesTo etc.)
+ ContextUtils.propogateReceivedMAPs(inMAPs, partialResponse);
+ Destination target = inMessage.getDestination();
+ if (target == null) {
+ return;
+ }
+
+ try {
+ if (reference == null) {
+ reference = ContextUtils.getNoneEndpointReference();
+ }
+ Conduit backChannel = target.getBackChannel(inMessage);
+ if (backChannel != null) {
+ partialResponse.put(Message.PARTIAL_RESPONSE_MESSAGE, Boolean.TRUE);
+ partialResponse.put(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE, Boolean.TRUE);
+ boolean robust =
+ MessageUtils.isTrue(inMessage.getContextualProperty(Message.ROBUST_ONEWAY));
+
+ if (robust) {
+ BindingOperationInfo boi = exchange.get(BindingOperationInfo.class);
+ // insert the executor in the exchange to fool the OneWayProcessorInterceptor
+ exchange.put(Executor.class, getExecutor(inMessage));
+ // pause dispatch on current thread and resume...
+ inMessage.getInterceptorChain().pause();
+ inMessage.getInterceptorChain().resume();
+ // restore the BOI for the partial response handling
+ exchange.put(BindingOperationInfo.class, boi);
+ }
+
+
+ // set up interceptor chains and send message
+ InterceptorChain chain =
+ fullResponse != null
+ ? fullResponse.getInterceptorChain()
+ : OutgoingChainInterceptor.getOutInterceptorChain(exchange);
+ exchange.setOutMessage(partialResponse);
+ partialResponse.setInterceptorChain(chain);
+ exchange.put(ConduitSelector.class,
+ new PreexistingConduitSelector(backChannel,
+ exchange.get(Endpoint.class)));
+
+ if (chain != null && !chain.doIntercept(partialResponse)
+ && partialResponse.getContent(Exception.class) != null) {
+ if (partialResponse.getContent(Exception.class) instanceof Fault) {
+ throw (Fault)partialResponse.getContent(Exception.class);
+ } else {
+ throw new Fault(partialResponse.getContent(Exception.class));
+ }
+ }
+ if (chain != null) {
+ chain.reset();
+ }
+ exchange.put(ConduitSelector.class, new NullConduitSelector());
+
+ if (fullResponse == null) {
+ fullResponse = ContextUtils.createMessage(exchange);
+ }
+ exchange.setOutMessage(fullResponse);
+
+ Destination destination = createDecoupledDestination(
+ exchange,
+ reference);
+ exchange.setDestination(destination);
+
+ }
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "SERVER_TRANSPORT_REBASE_FAILURE_MSG", e);
+ }
+ }
+ }
+ //CHECKSTYLE:ON
+
+ private static Destination createDecoupledDestination(
+ Exchange exchange, final EndpointReferenceType reference) {
+ final EndpointInfo ei = exchange.get(Endpoint.class).getEndpointInfo();
+ return new DecoupledDestination(ei, reference);
+ }
+
+ /**
+ * Construct and store MAPs for partial response.
+ *
+ * @param partialResponse the partial response message
+ * @param namespaceURI the current namespace URI
+ */
+ private static void ensurePartialResponseMAPs(Message partialResponse,
+ String namespaceURI) {
+ // ensure there is a MAPs instance available for the outbound
+ // partial response that contains appropriate To and ReplyTo
+ // properties (i.e. anonymous & none respectively)
+ AddressingProperties maps = new AddressingProperties();
+ maps.setTo(EndpointReferenceUtils.getAnonymousEndpointReference());
+ maps.setReplyTo(ContextUtils.WSA_OBJECT_FACTORY.createEndpointReferenceType());
+ maps.getReplyTo().setAddress(ContextUtils.getAttributedURI(Names.WSA_NONE_ADDRESS));
+ maps.setAction(ContextUtils.getAttributedURI(""));
+ maps.exposeAs(namespaceURI);
+ ContextUtils.storeMAPs(maps, partialResponse, true, true, false);
+ }
+
+ /**
+ * Get the Executor for this invocation.
+ * @param endpoint
+ * @return
+ */
+ private static Executor getExecutor(final Message message) {
+ Endpoint endpoint = message.getExchange().get(Endpoint.class);
+ Executor executor = endpoint.getService().getExecutor();
+
+ if (executor == null || SynchronousExecutor.isA(executor)) {
+ // need true asynchrony
+ Bus bus = message.getExchange().get(Bus.class);
+ if (bus != null) {
+ WorkQueueManager workQueueManager =
+ bus.getExtension(WorkQueueManager.class);
+ Executor autoWorkQueue =
+ workQueueManager.getNamedWorkQueue("ws-addressing");
+ executor = autoWorkQueue != null
+ ? autoWorkQueue
+ : workQueueManager.getAutomaticWorkQueue();
+ } else {
+ executor = OneShotAsyncExecutor.getInstance();
+ }
+ }
+ message.getExchange().put(Executor.class, executor);
+ return executor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
index 95eb200..8d3aa17 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
@@ -115,10 +115,10 @@ public class RMOutInterceptor extends AbstractRMInterceptor<Message> {
if (isPartialResponse && rmpsOut.getAcks() != null && rmpsOut.getAcks().size() > 0) {
setAction(maps, constants.getSequenceAckAction());
msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE);
+ isAck = true;
}
}
- if (isAck || constants.getSequenceAckAction().equals(action)
- || (constants.getTerminateSequenceAction().equals(action)
+ if (isAck || (constants.getTerminateSequenceAction().equals(action)
&& RM10Constants.NAMESPACE_URI.equals(rmNamespace))) {
maps.setReplyTo(RMUtils.createNoneReference());
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
index e46bf87..a189932 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
@@ -484,13 +484,7 @@ public class DestinationSequenceTest extends Assert {
EasyMock.expect(r.getLower()).andReturn(new Long(5));
EasyMock.expect(r.getUpper()).andReturn(new Long(15));
control.replay();
- try {
- ds.applyDeliveryAssurance(mn, null);
- fail("Expected Fault not thrown.");
- } catch (RMException ex) {
- assertEquals("MESSAGE_ALREADY_DELIVERED_EXC", ex.getCode());
- }
-
+ ds.applyDeliveryAssurance(mn, null);
control.verify();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
index 2405542..2288247 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
@@ -19,8 +19,10 @@
package org.apache.cxf.ws.rm;
+import java.io.IOException;
import java.lang.reflect.Method;
+import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -96,7 +98,7 @@ public class DestinationTest extends Assert {
}
@Test
- public void testAcknowledgeNoSequence() throws SequenceFault, RMException {
+ public void testAcknowledgeNoSequence() throws SequenceFault, RMException, IOException {
Message message = setupMessage();
RMProperties rmps = control.createMock(RMProperties.class);
EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps);
@@ -106,7 +108,7 @@ public class DestinationTest extends Assert {
}
@Test
- public void testAcknowledgeUnknownSequence() throws RMException {
+ public void testAcknowledgeUnknownSequence() throws RMException, IOException {
Message message = setupMessage();
RMProperties rmps = control.createMock(RMProperties.class);
EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps);
@@ -129,7 +131,7 @@ public class DestinationTest extends Assert {
@Test
public void testAcknowledgeAlreadyAcknowledgedMessage() throws SequenceFault, RMException,
- NoSuchMethodException {
+ NoSuchMethodException, IOException {
Method m1 = Destination.class.getDeclaredMethod("getSequence", new Class[] {Identifier.class});
destination = EasyMock.createMockBuilder(Destination.class)
@@ -145,17 +147,12 @@ public class DestinationTest extends Assert {
EasyMock.expect(destination.getSequence(id)).andReturn(ds);
long nr = 10;
EasyMock.expect(st.getMessageNumber()).andReturn(nr);
- RMException ex = new RMException(new RuntimeException("already acknowledged"));
ds.applyDeliveryAssurance(nr, message);
- EasyMock.expectLastCall().andThrow(ex);
+ EasyMock.expectLastCall().andReturn(false);
+ InterceptorChain ic = control.createMock(InterceptorChain.class);
+ EasyMock.expect(message.getInterceptorChain()).andReturn(ic);
control.replay();
- try {
- destination.acknowledge(message);
- fail("Expected RMEcception not thrown.");
- } catch (RMException e) {
- assertSame(ex, e);
- }
-
+ destination.acknowledge(message);
}
/* @Test
@@ -218,12 +215,15 @@ public class DestinationTest extends Assert {
destination.acknowledge(message);
} */
- private Message setupMessage() {
+ private Message setupMessage() throws IOException {
Message message = control.createMock(Message.class);
Exchange exchange = control.createMock(Exchange.class);
- EasyMock.expect(message.getExchange()).andReturn(exchange);
- EasyMock.expect(exchange.getOutMessage()).andReturn(null);
- EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null);
+ org.apache.cxf.transport.Destination tdest = control.createMock(org.apache.cxf.transport.Destination.class);
+ EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
+ EasyMock.expect(exchange.getOutMessage()).andReturn(null).anyTimes();
+ EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null).anyTimes();
+ EasyMock.expect(exchange.getDestination()).andReturn(tdest).anyTimes();
+ EasyMock.expect(tdest.getBackChannel(message)).andReturn(null).anyTimes();
return message;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
index 8642542..4946657 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
@@ -439,10 +439,13 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
assertEquals(0, inRecorder.getInboundMessages().size());
// allow resends to kick in
- // await multiple of 3 resends to avoid shutting down server
- // in the course of retransmission - this is harmless but pollutes test output
+ // first duplicate received will trigger acknowledgement
+ awaitMessages(1, 1, 3000);
- awaitMessages(3, 0, 7500);
+ mf.reset(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+ mf.verifyMessages(1, true);
+ mf.verifyMessages(1, false);
+ mf.verifyAcknowledgements(new boolean[] {true}, false);
}
@@ -764,17 +767,16 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
greeter.greetMe("one");
try {
greeter.greetMe("two");
- fail("Expected fault.");
+ fail("Expected timeout.");
} catch (WebServiceException ex) {
- SoapFault sf = (SoapFault)ex.getCause();
- assertEquals("Unexpected fault code.", Soap11.getInstance().getReceiver(), sf.getFaultCode());
- assertNull("Unexpected sub code.", sf.getSubCode());
- assertTrue("Unexpected reason.", sf.getReason().endsWith("has already been delivered."));
+ assertTrue("Unexpected exception cause", ex.getCause() instanceof IOException);
+ IOException ie = (IOException)ex.getCause();
+ assertTrue("Unexpected IOException message", ie.getMessage().startsWith("Timed out"));
}
// wait for resend to occur
- awaitMessages(3, 3, 5000);
+ awaitMessages(4, 3, 5000);
MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(),
inRecorder.getInboundMessages(), Names200408.WSA_NAMESPACE_NAME, RM10Constants.NAMESPACE_URI);
@@ -782,30 +784,30 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
// Expected outbound:
// CreateSequence
// + two requests
+ // + acknowledgement
- String[] expectedActions = new String[3];
+ String[] expectedActions = new String[4];
expectedActions[0] = RM10Constants.CREATE_SEQUENCE_ACTION;
- for (int i = 1; i < expectedActions.length; i++) {
- expectedActions[i] = GREETME_ACTION;
- }
+ expectedActions[1] = GREETME_ACTION;
+ expectedActions[2] = GREETME_ACTION;
+ expectedActions[3] = RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION;
mf.verifyActions(expectedActions, true);
- mf.verifyMessageNumbers(new String[] {null, "1", "1"}, true);
- mf.verifyLastMessage(new boolean[3], true);
- mf.verifyAcknowledgements(new boolean[3], true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "1", null}, true);
+ mf.verifyLastMessage(new boolean[expectedActions.length], true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, false, true}, true);
// Expected inbound:
// createSequenceResponse
// + 1 response without acknowledgement
- // + 1 fault
+ // + 1 acknowledgement/last message
mf.verifyMessages(3, false);
expectedActions = new String[] {RM10Constants.CREATE_SEQUENCE_RESPONSE_ACTION,
GREETME_RESPONSE_ACTION,
- RM10_GENERIC_FAULT_ACTION};
+ RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION};
mf.verifyActions(expectedActions, false);
mf.verifyMessageNumbers(new String[] {null, "1", null}, false);
- mf.verifyAcknowledgements(new boolean[3] , false);
-
+ mf.verifyAcknowledgements(new boolean[] {false, false, true}, false);
}
@Test
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
index f78e2e0..342c0b9 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
@@ -42,7 +42,9 @@
<ref bean="rmLogicalIn"/>
<ref bean="rmSoapIn"/>
<ref bean="rmDelivery"/>
- <bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/>
+ <bean class="org.apache.cxf.interceptor.LoggingInInterceptor">
+ <property name="prettyLogging" value="true"/>
+ </bean>
</cxf:inInterceptors>
<cxf:inFaultInterceptors>
<ref bean="mapAggregator"/>
@@ -58,7 +60,9 @@
<ref bean="rmLogicalOut"/>
<ref bean="rmSoapOut"/>
<ref bean="rmCaptureOut"/>
- <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/>
+ <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor">
+ <property name="prettyLogging" value="true"/>
+ </bean>
</cxf:outInterceptors>
<cxf:outFaultInterceptors>
<ref bean="mapAggregator"/>
http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java b/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
index e1b9191..f86b2f9 100644
--- a/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
+++ b/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
@@ -69,7 +69,7 @@ public class RM12PolicyWsdlTest extends RMPolicyWsdlTestBase {
@BeforeClass
public static void startServers() throws Exception {
TestUtil.getNewPortNumber("decoupled");
- assertTrue("server did not launch correctly", launchServer(Server.class, true));
+ assertTrue("server did not launch correctly", launchServer(Server.class, false));
}
@Test