You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2007/08/20 05:37:50 UTC
svn commit: r567536 [2/4] - in /incubator/servicemix/trunk/deployables:
bindingcomponents/servicemix-cxf-bc/
bindingcomponents/servicemix-cxf-bc/src/main/java/org/apache/servicemix/cxfbc/
bindingcomponents/servicemix-cxf-bc/src/main/java/org/apache/ser...
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMSequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMSequenceTest.java?rev=567536&view=auto
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMSequenceTest.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMSequenceTest.java Sun Aug 19 20:37:48 2007
@@ -0,0 +1,1426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.cxfbc.ws.rm;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+import javax.xml.ws.WebServiceException;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.binding.soap.Soap11;
+import org.apache.cxf.binding.soap.SoapFault;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.frontend.ClientProxy;
+import org.apache.cxf.greeter_control.Control;
+import org.apache.cxf.greeter_control.ControlService;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.ws.rm.RMConstants;
+import org.apache.cxf.ws.rm.RMContextUtils;
+import org.apache.cxf.ws.rm.RMInInterceptor;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RMOutInterceptor;
+import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.soap.RMSoapInterceptor;
+import org.apache.servicemix.cxfbc.ws.policy.ConnectionHelper;
+import org.apache.servicemix.jbi.container.SpringJBIContainer;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.tck.SpringTestSupport;
+import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
+import org.apache.xbean.spring.context.SpringXmlPreprocessor;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+
+public class CxfBcRMSequenceTest extends SpringTestSupport {
+ private static final Logger LOG = Logger.getLogger(CxfBcRMSequenceTest.class.getName());
+ private static final String GREETMEONEWAY_ACTION = null;
+ private static final String GREETME_ACTION = null;
+ private static final String GREETME_RESPONSE_ACTION = null;
+ private static final QName CONTROL_SERVICE = new QName("http://cxf.apache.org/greeter_control", "ControlService");
+ private static final QName GREETER_SERVICE = new QName("http://cxf.apache.org/greeter_control", "GreeterService");
+
+
+ private static int decoupledEndpointPort = 10000;
+ private static String decoupledEndpoint;
+
+
+
+
+ private Bus controlBus;
+ private Control control;
+ private Bus greeterBus;
+ private Greeter greeter;
+ private OutMessageRecorder outRecorder;
+ private InMessageRecorder inRecorder;
+
+ private boolean testAll = true;
+ private boolean doTestOnewayAnonymousAcks = testAll;
+ private boolean doTestOnewayDeferredAnonymousAcks = testAll;
+ private boolean doTestOnewayDeferredNonAnonymousAcks = testAll;
+ private boolean doTestOnewayAnonymousAcksSequenceLength1 = testAll;
+ private boolean doTestOnewayAnonymousAcksSuppressed = testAll;
+ private boolean doTestOnewayAnonymousAcksSuppressedAsyncExecutor = testAll;
+ private boolean doTestTwowayNonAnonymous = testAll;
+ private boolean doTestTwowayNonAnonymousEndpointSpecific = testAll;
+ private boolean doTestTwowayNonAnonymousDeferred = testAll;
+ private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
+ private boolean doTestTwowayAtMostOnce = testAll;
+ private boolean doTestUnknownSequence = testAll;
+ private boolean doTestInactivityTimeout = testAll;
+ private boolean doTestOnewayMessageLoss = testAll;
+ private boolean doTestOnewayMessageLossAsyncExecutor = testAll;
+ private boolean doTestTwowayMessageLoss = testAll;
+ private boolean doTestTwowayMessageLossAsyncExecutor = testAll;
+ private boolean doTestTwowayNonAnonymousNoOffer = testAll;
+ private boolean doTestConcurrency = testAll;
+ private boolean doTestMultiClientOneway = testAll;
+ private boolean doTestMultiClientTwoway = testAll;
+ private boolean doTestServerSideMessageLoss = testAll;
+ private boolean doTestTerminateOnShutdown = testAll;
+
+
+ public void setUp() throws Exception {
+ //override super setup
+ LOG.info("setUp is invoked");
+ }
+
+ public void setUpJBI(String beanFile) throws Exception {
+ if (context != null) {
+ context.refresh();
+ }
+ transformer = new SourceTransformer();
+ if (beanFile == null) {
+ context = createBeanFactory();
+ } else {
+ context = createBeanFactory(beanFile);
+ }
+
+ jbi = (SpringJBIContainer) context.getBean("jbi");
+ assertNotNull("JBI Container not found in spring!", jbi);
+ }
+
+ public void tearDown() throws Exception {
+ stopGreeter();
+ stopControl();
+ if (context != null) {
+ context.destroy();
+ context = null;
+ }
+ if (jbi != null) {
+ jbi.shutDown();
+ jbi.destroy();
+ jbi = null;
+ }
+ }
+
+ /**
+ * Server is configured with RM interceptors, client without;
+ * Addressing interceptors are installed on either side.
+ * The (oneway) application request should be dispatched straight to the
+ * implementor.
+ */
+
+ public void testRMServerPlainClient() throws Exception {
+ setUpJBI(null);
+ SpringBusFactory bf = new SpringBusFactory();
+
+ controlBus = bf.createBus();
+ BusFactory.setDefaultBus(controlBus);
+ URL wsdl = new ClassPathResource("/wsdl/greeter_control.wsdl").getURL();
+ ControlService cs = new ControlService(wsdl, CONTROL_SERVICE);
+ control = cs.getControlPort();
+
+ assertTrue("Failed to start greeter",
+ control.startGreeter("org/apache/cxf/systest/ws/rm/rminterceptors.xml"));
+
+ greeterBus = bf.createBus("org/apache/cxf/systest/ws/rm/rminterceptors.xml");
+ BusFactory.setDefaultBus(greeterBus);
+ removeRMInterceptors(greeterBus.getOutInterceptors());
+ removeRMInterceptors(greeterBus.getOutFaultInterceptors());
+ removeRMInterceptors(greeterBus.getInInterceptors());
+ removeRMInterceptors(greeterBus.getInFaultInterceptors());
+ LOG.fine("Initialised greeter bus with addressing but without RM interceptors");
+
+ outRecorder = new OutMessageRecorder();
+ greeterBus.getOutInterceptors().add(outRecorder);
+ inRecorder = new InMessageRecorder();
+ greeterBus.getInInterceptors().add(inRecorder);
+
+ GreeterService gs = new GreeterService(wsdl, GREETER_SERVICE);
+ greeter = gs.getGreeterPort();
+ LOG.fine("Created greeter client.");
+
+ ConnectionHelper.setKeepAliveConnection(greeter, true);
+
+ greeter.greetMeOneWay("once");
+
+ }
+
+
+ public void testOnewayAnonymousAcks() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ if (!doTestOnewayAnonymousAcks) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+
+ greeter.greetMeOneWay("once");
+ greeter.greetMeOneWay("twice");
+ greeter.greetMeOneWay("thrice");
+
+ // three application messages plus createSequence
+
+ awaitMessages(4, 4);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(4, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+
+ // createSequenceResponse plus 3 partial responses
+
+ mf.verifyMessages(4, false);
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ RMConstants.getSequenceAcknowledgmentAction(),
+ RMConstants.getSequenceAcknowledgmentAction(),
+ RMConstants.getSequenceAcknowledgmentAction()};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, null, null, null}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+ }
+
+
+ public void testOnewayDeferredAnonymousAcks() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/deferred.xml");
+ if (!doTestOnewayDeferredAnonymousAcks) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/deferred.xml");
+
+ greeter.greetMeOneWay("once");
+ greeter.greetMeOneWay("twice");
+
+ try {
+ Thread.sleep(3 * 1000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+
+ greeter.greetMeOneWay("thrice");
+
+ awaitMessages(4, 4);
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ // three application messages plus createSequence
+ mf.verifyMessages(4, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+
+ // createSequenceResponse message plus 3 partial responses, only the
+ // last one should include a sequence acknowledgment
+
+ mf.verifyMessages(4, false);
+ expectedActions =
+ new String[] {RMConstants.getCreateSequenceResponseAction(), null, null,
+ RMConstants.getSequenceAcknowledgmentAction()};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, null, null, null}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, false, false, true}, false);
+ }
+
+
+ public void testOnewayDeferredNonAnonymousAcks() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/deferred.xml");
+ if (!doTestOnewayDeferredNonAnonymousAcks) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/deferred.xml", true);
+
+ greeter.greetMeOneWay("once");
+ greeter.greetMeOneWay("twice");
+
+ // CreateSequence plus two greetMeOneWay requests
+
+ awaitMessages(3, 4);
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(3, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2"}, true);
+
+ // CreateSequenceResponse plus three partial responses, no
+ // acknowledgments included
+
+ mf.verifyMessages(4, false);
+ mf.verifyMessageNumbers(new String[4], false);
+ mf.verifyAcknowledgements(new boolean[4], false);
+
+ mf.verifyPartialResponses(3);
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction()};
+ mf.verifyActionsIgnoringPartialResponses(expectedActions);
+ mf.purge();
+
+ try {
+ Thread.sleep(3 * 1000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+
+ // a standalone acknowledgement should have been sent from the server
+ // side by now
+
+ awaitMessages(0, 1);
+ mf.reset(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(0, true);
+ mf.verifyMessages(1, false);
+ mf.verifyAcknowledgements(new boolean[] {true}, false);
+
+ }
+
+
+ public void testOnewayAnonymousAcksSequenceLength1() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/seqlength1.xml");
+ if (!doTestOnewayAnonymousAcksSequenceLength1) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/seqlength1.xml");
+
+ greeter.greetMeOneWay("once");
+ greeter.greetMeOneWay("twice");
+
+ // two application messages plus two createSequence plus two
+ // terminateSequence
+
+ awaitMessages(6, 6);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(6, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETMEONEWAY_ACTION,
+ RMConstants.getTerminateSequenceAction(),
+ RMConstants.getCreateSequenceAction(),
+ GREETMEONEWAY_ACTION,
+ RMConstants.getTerminateSequenceAction()};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", null, null, "1", null}, true);
+ mf.verifyLastMessage(new boolean[] {false, true, false, false, true, false}, true);
+
+ // createSequenceResponse message plus partial responses to
+ // greetMeOneWay and terminateSequence ||: 2
+
+ mf.verifyMessages(6, false);
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ RMConstants.getSequenceAcknowledgmentAction(), null,
+ RMConstants.getCreateSequenceResponseAction(),
+ RMConstants.getSequenceAcknowledgmentAction(), null};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, null, null, null, null, null}, false);
+ mf.verifyLastMessage(new boolean[] {false, false, false, false, false, false}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, false, false, true, false}, false);
+ }
+
+
+ public void testOnewayAnonymousAcksSuppressed() throws Exception {
+ if (!doTestOnewayAnonymousAcksSuppressed) {
+ return;
+ }
+ testOnewayAnonymousAcksSuppressed(null);
+ }
+
+
+ public void testOnewayAnonymousAcksSuppressedAsyncExecutor() throws Exception {
+ if (!doTestOnewayAnonymousAcksSuppressedAsyncExecutor) {
+ return;
+ }
+ testOnewayAnonymousAcksSuppressed(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayAnonymousAcksSuppressed(Executor executor) throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/suppressed.xml");
+ init("org/apache/servicemix/cxfbc/ws/rm/suppressed.xml", false, executor);
+
+ greeter.greetMeOneWay("once");
+ greeter.greetMeOneWay("twice");
+ greeter.greetMeOneWay("thrice");
+
+ // three application messages plus createSequence
+
+ awaitMessages(4, 4, 2000);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(4, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+
+ // createSequenceResponse plus 3 partial responses, none of which
+ // contain an acknowledgment
+
+ mf.verifyMessages(4, false);
+ mf.verifyPartialResponses(3, new boolean[3]);
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction()};
+ mf.verifyActions(expectedActions, false);
+
+ mf.purge();
+ assertEquals(0, outRecorder.getOutboundMessages().size());
+ assertEquals(0, inRecorder.getInboundMessages().size());
+
+ // allow resends to kick in
+ // await multiple of 3 resends to avoid shutting down server
+ // in the course of retransmission - this is harmless but pollutes test output
+
+ awaitMessages(3, 0, 7500);
+
+ }
+
+
+ public void testTwowayNonAnonymous() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ if (!doTestTwowayNonAnonymous) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml", true);
+
+ greeter.greetMe("one");
+ greeter.greetMe("two");
+ greeter.greetMe("three");
+
+ // CreateSequence and three greetMe messages
+ // TODO there should be partial responses to the decoupled responses!
+
+ awaitMessages(4, 8);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+
+ mf.verifyMessages(4, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETME_ACTION,
+ GREETME_ACTION,
+ GREETME_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+ mf.verifyLastMessage(new boolean[] {false, false, false, false}, true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, true, true}, true);
+
+ // createSequenceResponse plus 3 greetMeResponse messages plus
+ // one partial response for each of the four messages
+ // the first partial response should no include an acknowledgement, the other three should
+
+ mf.verifyMessages(8, false);
+ mf.verifyPartialResponses(4, new boolean[4]);
+
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, false);
+ mf.verifyLastMessage(new boolean[4], false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+ }
+
+ // the same as above but using endpoint specific interceptor configuration
+
+
+ public void testTwowayNonAnonymousEndpointSpecific() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/twoway-endpoint-specific.xml");
+ if (!doTestTwowayNonAnonymousEndpointSpecific) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/twoway-endpoint-specific.xml", true);
+
+
+ greeter.greetMe("one");
+ greeter.greetMe("two");
+ greeter.greetMe("three");
+
+ // CreateSequence and three greetMe messages
+ // TODO there should be partial responses to the decoupled responses!
+
+ awaitMessages(4, 8);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+
+ mf.verifyMessages(4, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETME_ACTION,
+ GREETME_ACTION,
+ GREETME_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+ mf.verifyLastMessage(new boolean[] {false, false, false, false}, true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, true, true}, true);
+
+ // createSequenceResponse plus 3 greetMeResponse messages plus
+ // one partial response for each of the four messages
+ // the first partial response should no include an acknowledgement, the other three should
+
+ mf.verifyMessages(8, false);
+ mf.verifyPartialResponses(4, new boolean[4]);
+
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, false);
+ mf.verifyLastMessage(new boolean[4], false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+ }
+
+
+ public void testTwowayNonAnonymousDeferred() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/deferred.xml");
+ if (!doTestTwowayNonAnonymousDeferred) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/deferred.xml", true);
+
+ greeter.greetMe("one");
+ greeter.greetMe("two");
+
+ // CreateSequence and three greetMe messages, no acknowledgments
+ // included
+
+ awaitMessages(3, 6);
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(3, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETME_ACTION,
+ GREETME_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2"}, true);
+ mf.verifyLastMessage(new boolean[3], true);
+ mf.verifyAcknowledgements(new boolean[3], true);
+
+ // CreateSequenceResponse plus 2 greetMeResponse messages plus
+ // one partial response for each of the three messages no acknowledgments
+ // included
+
+ mf.verifyMessages(6, false);
+ mf.verifyLastMessage(new boolean[6], false);
+ mf.verifyAcknowledgements(new boolean[6], false);
+
+ mf.verifyPartialResponses(3);
+ mf.purgePartialResponses();
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2"}, false);
+ mf.purge();
+
+
+ // one standalone acknowledgement should have been sent from the client and one
+ // should have been received from the server
+
+ awaitMessages(1, 0);
+ mf.reset(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessageNumbers(new String[1], true);
+ mf.verifyLastMessage(new boolean[1], true);
+ mf.verifyAcknowledgements(new boolean[] {true}, true);
+
+ }
+
+ /**
+ * A maximum sequence length of 2 is configured for the client only (server allows 10).
+ * However, as we use the defaults regarding the including and acceptance
+ * for inbound sequence offers and correlate offered sequences that are
+ * included in a CreateSequence request and accepted with those that are
+ * created on behalf of such a request, the server also tries terminate its
+ * sequences. Note that as part of the sequence termination exchange a
+ * standalone sequence acknowledgment needs to be sent regardless of whether
+ * or nor acknowledgments are delivered steadily with every response.
+ */
+
+ public void testTwowayNonAnonymousMaximumSequenceLength2() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/seqlength10.xml");
+ if (!doTestTwowayNonAnonymousMaximumSequenceLength2) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/seqlength10.xml", true);
+
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ assertEquals("Unexpected maximum sequence length.", BigInteger.TEN,
+ manager.getSourcePolicy().getSequenceTerminationPolicy().getMaxLength());
+ manager.getSourcePolicy().getSequenceTerminationPolicy().setMaxLength(
+ new BigInteger("2"));
+
+ greeter.greetMe("one");
+ greeter.greetMe("two");
+ greeter.greetMe("three");
+
+ awaitMessages(7, 13, 5000);
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(7, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETME_ACTION,
+ GREETME_ACTION,
+ RMConstants.getTerminateSequenceAction(),
+ RMConstants.getSequenceAckAction(),
+ RMConstants.getCreateSequenceAction(),
+ GREETME_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", null, null, null, "1"}, true);
+ mf.verifyLastMessage(new boolean[] {false, false, true, false, false, false, false}, true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, true, false, true, false, false}, true);
+
+ // 7 partial responses plus 2 full responses to CreateSequence requests
+ // plus 3 full responses to greetMe requests plus server originiated
+ // TerminateSequence request
+
+ mf.verifyMessages(13, false);
+
+ mf.verifyPartialResponses(7);
+
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION,
+ RMConstants.getTerminateSequenceAction(),
+ RMConstants.getCreateSequenceResponseAction(),
+ GREETME_RESPONSE_ACTION};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", null, null, "1"}, false);
+ boolean[] expected = new boolean[6];
+ expected[2] = true;
+ mf.verifyLastMessage(expected, false);
+ expected[1] = true;
+ expected[5] = true;
+ mf.verifyAcknowledgements(expected, false);
+ }
+
+
+ public void testTwowayAtMostOnce() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/atmostonce.xml");
+ if (!doTestTwowayAtMostOnce) {
+ return;
+ }
+
+ init("org/apache/servicemix/cxfbc/ws/rm/atmostonce.xml");
+
+ class MessageNumberInterceptor extends AbstractPhaseInterceptor {
+ public MessageNumberInterceptor() {
+ super(Phase.USER_LOGICAL);
+ }
+
+ public void handleMessage(Message m) {
+ RMProperties rmps = RMContextUtils.retrieveRMProperties(m, true);
+ if (null != rmps && null != rmps.getSequence()) {
+ rmps.getSequence().setMessageNumber(BigInteger.ONE);
+ }
+ }
+ }
+ greeterBus.getOutInterceptors().add(new MessageNumberInterceptor());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+
+ greeter.greetMe("one");
+ try {
+ greeter.greetMe("two");
+ fail("Expected fault.");
+ } catch (WebServiceException ex) {
+ SoapFault sf = (SoapFault)ex.getCause();
+ assertEquals("Unexpected fault code.", Soap11.getInstance().getReceiver(), sf.getFaultCode());
+ assertNull("Unexpected sub code.", sf.getSubCode());
+ assertTrue("Unexpected reason.", sf.getReason().endsWith("has already been delivered."));
+ }
+
+ // wait for resend to occur
+
+ awaitMessages(3, 3, 5000);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ // Expected outbound:
+ // CreateSequence
+ // + two requests
+
+ String[] expectedActions = new String[3];
+ expectedActions[0] = RMConstants.getCreateSequenceAction();
+ for (int i = 1; i < expectedActions.length; i++) {
+ expectedActions[i] = GREETME_ACTION;
+ }
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "1"}, true);
+ mf.verifyLastMessage(new boolean[3], true);
+ mf.verifyAcknowledgements(new boolean[3], true);
+
+ // Expected inbound:
+ // createSequenceResponse
+ // + 1 response without acknowledgement
+ // + 1 fault
+
+ mf.verifyMessages(3, false);
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ null, null};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", null}, false);
+ mf.verifyAcknowledgements(new boolean[3] , false);
+
+ }
+
+ public void testUnknownSequence() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ if (!doTestUnknownSequence) {
+ return;
+ }
+
+ init("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+
+ class SequenceIdInterceptor extends AbstractPhaseInterceptor {
+ public SequenceIdInterceptor() {
+ super(Phase.USER_LOGICAL);
+ }
+
+ public void handleMessage(Message m) {
+ RMProperties rmps = RMContextUtils.retrieveRMProperties(m, true);
+ if (null != rmps && null != rmps.getSequence()) {
+ rmps.getSequence().getIdentifier().setValue("UNKNOWN");
+ }
+ }
+ }
+ greeterBus.getOutInterceptors().add(new SequenceIdInterceptor());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+
+ try {
+ greeter.greetMe("one");
+ fail("Expected fault.");
+ } catch (WebServiceException ex) {
+ SoapFault sf = (SoapFault)ex.getCause();
+ assertEquals("Unexpected fault code.", Soap11.getInstance().getSender(), sf.getFaultCode());
+ assertNull("Unexpected sub code.", sf.getSubCode());
+ assertTrue("Unexpected reason.", sf.getReason().endsWith("is not a known Sequence identifier."));
+ }
+
+ // the third inbound message has a SequenceFault header
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+ mf.verifySequenceFault(RMConstants.getUnknownSequenceFaultCode(), false, 1);
+ }
+
+ public void testInactivityTimeout() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/inactivity-timeout.xml");
+ if (!doTestInactivityTimeout) {
+ return;
+ }
+
+ init("org/apache/servicemix/cxfbc/ws/rm/inactivity-timeout.xml");
+
+ greeter.greetMe("one");
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+
+ try {
+ greeter.greetMe("two");
+ fail("Expected fault.");
+ } catch (WebServiceException ex) {
+ SoapFault sf = (SoapFault)ex.getCause();
+ assertEquals("Unexpected fault code.", Soap11.getInstance().getSender(), sf.getFaultCode());
+ assertNull("Unexpected sub code.", sf.getSubCode());
+ assertTrue("Unexpected reason.", sf.getReason().endsWith("is not a known Sequence identifier."));
+ }
+
+ awaitMessages(3, 3, 5000);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ // Expected outbound:
+ // CreateSequence
+ // + two requests (second request does not include acknowledgement for first response as
+ // in the meantime the client has terminated the sequence
+
+ String[] expectedActions = new String[3];
+ expectedActions[0] = RMConstants.getCreateSequenceAction();
+ for (int i = 1; i < expectedActions.length; i++) {
+ expectedActions[i] = GREETME_ACTION;
+ }
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2"}, true);
+ mf.verifyLastMessage(new boolean[3], true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, false}, true);
+
+ // Expected inbound:
+ // createSequenceResponse
+ // + 1 response with acknowledgement
+ // + 1 fault without acknowledgement
+
+ mf.verifyMessages(3, false);
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ null, null};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", null}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, false} , false);
+
+ // the third inbound message has a SequenceFault header
+
+ mf.verifySequenceFault(RMConstants.getUnknownSequenceFaultCode(), false, 2);
+
+ }
+
+
+ public void testOnewayMessageLoss() throws Exception {
+ if (!doTestOnewayMessageLoss) {
+ return;
+ }
+ testOnewayMessageLoss(null);
+ }
+
+
+ public void testOnewayMessageLossAsyncExecutor() throws Exception {
+ if (!doTestOnewayMessageLossAsyncExecutor) {
+ return;
+ }
+ testOnewayMessageLoss(Executors.newSingleThreadExecutor());
+ }
+
+ private void testOnewayMessageLoss(Executor executor) throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/message-loss.xml");
+ init("org/apache/servicemix/cxfbc/ws/rm/message-loss.xml", false, executor);
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+
+ greeter.greetMeOneWay("one");
+ greeter.greetMeOneWay("two");
+ greeter.greetMeOneWay("three");
+ greeter.greetMeOneWay("four");
+
+ awaitMessages(7, 5, 10000);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ // Expected outbound:
+ // CreateSequence
+ // + 4 greetMe messages
+ // + at least 2 resends (message may be resent multiple times depending
+ // on the timing of the ACKs)
+
+ String[] expectedActions = new String[7];
+ expectedActions[0] = RMConstants.getCreateSequenceAction();
+ for (int i = 1; i < expectedActions.length; i++) {
+ expectedActions[i] = GREETMEONEWAY_ACTION;
+ }
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3", "4", "2", "4"}, true, false);
+ mf.verifyLastMessage(new boolean[7], true);
+ mf.verifyAcknowledgements(new boolean[7], true);
+
+ // Expected inbound:
+ // createSequenceResponse
+ // + 2 partial responses to successfully transmitted messages
+ // + 2 partial responses to resent messages
+
+ mf.verifyMessages(5, false);
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ RMConstants.getSequenceAcknowledgmentAction(),
+ RMConstants.getSequenceAcknowledgmentAction(),
+ RMConstants.getSequenceAcknowledgmentAction(),
+ RMConstants.getSequenceAcknowledgmentAction()};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, null, null, null, null}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true, true, true}, false);
+
+ }
+
+ public void testTwowayMessageLoss() throws Exception {
+ if (!doTestTwowayMessageLoss) {
+ return;
+ }
+ testTwowayMessageLoss(null);
+ }
+
+ public void testTwowayMessageLossAsyncExecutor() throws Exception {
+ if (!doTestTwowayMessageLossAsyncExecutor) {
+ return;
+ }
+ testTwowayMessageLoss(Executors.newSingleThreadExecutor());
+ }
+
+ private void testTwowayMessageLoss(Executor executor) throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/message-loss.xml");
+ init("org/apache/servicemix/cxfbc/ws/rm/message-loss.xml", true, executor);
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+
+ greeter.greetMe("one");
+ greeter.greetMe("two");
+ greeter.greetMe("three");
+ greeter.greetMe("four");
+
+ awaitMessages(7, 10, 10000);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ // Expected outbound:
+ // CreateSequence
+ // + 4 greetMe messages
+ // + 2 resends
+
+ String[] expectedActions = new String[7];
+ expectedActions[0] = RMConstants.getCreateSequenceAction();
+ for (int i = 1; i < expectedActions.length; i++) {
+ expectedActions[i] = GREETME_ACTION;
+ }
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "2", "3", "4", "4"}, true);
+ mf.verifyLastMessage(new boolean[7], true);
+ boolean[] expectedAcks = new boolean[7];
+ for (int i = 2; i < expectedAcks.length; i++) {
+ expectedAcks[i] = true;
+ }
+ mf.verifyAcknowledgements(expectedAcks , true);
+
+ // Expected inbound:
+ // createSequenceResponse
+ // + 4 greetMeResponse actions (to original or resent)
+ // + 5 partial responses (to CSR & each of the initial greetMe messages)
+ // + at least 2 further partial response (for each of the resends)
+
+ mf.verifyPartialResponses(5);
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ GREETME_RESPONSE_ACTION, GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION, GREETME_RESPONSE_ACTION};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3", "4"}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true, true, true}, false);
+
+ }
+
+ public void testTwowayNonAnonymousNoOffer() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/no-offer.xml");
+ if (!doTestTwowayNonAnonymousNoOffer) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/no-offer.xml", true);
+
+ greeter.greetMe("one");
+ // greeter.greetMe("two");
+
+ // Outbound expected:
+ // CreateSequence + greetMe + CreateSequenceResponse = 3 messages
+
+ awaitMessages(3, 6);
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(3, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETME_ACTION,
+ RMConstants.getCreateSequenceResponseAction()};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", null}, true);
+ mf.verifyLastMessage(new boolean[] {false, false, false}, true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, false}, true);
+
+ mf.verifyPartialResponses(3, new boolean[3]);
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ RMConstants.getCreateSequenceAction(),
+ GREETME_RESPONSE_ACTION};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, null, "1"}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, false, false}, false);
+ }
+
+ public void testConcurrency() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ if (!doTestConcurrency) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml", true);
+
+ for (int i = 0; i < 5; i++) {
+ greeter.greetMeAsync(Integer.toString(i));
+ }
+
+ // CreateSequence and five greetMe messages
+ // full and partial responses to each
+
+ awaitMessages(6, 12, 7500);
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(6, true);
+ String[] expectedActions = new String[6];
+ expectedActions[0] = RMConstants.getCreateSequenceAction();
+ for (int i = 1; i < expectedActions.length; i++) {
+ expectedActions[i] = GREETME_ACTION;
+ }
+ mf.verifyActions(expectedActions, true);
+ }
+
+ public void testMultiClientOneway() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ if (!doTestMultiClientOneway) {
+ return;
+ }
+
+ SpringBusFactory bf = new SpringBusFactory();
+ String cfgResource = "org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml";
+ initControl(bf, cfgResource);
+
+ class ClientThread extends Thread {
+
+ Greeter greeter;
+ Bus greeterBus;
+ InMessageRecorder inRecorder;
+ OutMessageRecorder outRecorder;
+ String id;
+
+ ClientThread(SpringBusFactory bf, String cfgResource, int n) {
+ CxfBcRMSequenceTest.this.initGreeter(bf, cfgResource, false, null);
+ greeter = CxfBcRMSequenceTest.this.greeter;
+ greeterBus = CxfBcRMSequenceTest.this.greeterBus;
+ inRecorder = CxfBcRMSequenceTest.this.inRecorder;
+ outRecorder = CxfBcRMSequenceTest.this.outRecorder;
+ id = "client " + n;
+ }
+
+ public void run() {
+ greeter.greetMeOneWay(id + ": once");
+ greeter.greetMeOneWay(id + ": twice");
+ greeter.greetMeOneWay(id + ": thrice");
+
+ // three application messages plus createSequence
+
+ awaitMessages(4, 4);
+ }
+ }
+
+ ClientThread clients[] = new ClientThread[2];
+
+ try {
+ for (int i = 0; i < clients.length; i++) {
+ clients[i] = new ClientThread(bf, cfgResource, i);
+ }
+
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].start();
+ }
+
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].join();
+ MessageFlow mf = new MessageFlow(clients[i].outRecorder.getOutboundMessages(),
+ clients[i].inRecorder.getInboundMessages());
+
+ mf.verifyMessages(4, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+
+ // createSequenceResponse plus 3 partial responses
+
+ mf.verifyMessages(4, false);
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ RMConstants.getSequenceAcknowledgmentAction(),
+ RMConstants.getSequenceAcknowledgmentAction(),
+ RMConstants.getSequenceAcknowledgmentAction()};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, null, null, null}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+
+ }
+ } finally {
+ for (int i = 0; i < clients.length; i++) {
+ greeter = clients[i].greeter;
+ greeterBus = clients[i].greeterBus;
+ stopGreeter();
+ }
+ greeter = null;
+ }
+ }
+
+ public void testMultiClientTwoway() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ if (!doTestMultiClientTwoway) {
+ return;
+ }
+
+ SpringBusFactory bf = new SpringBusFactory();
+ String cfgResource = "org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml";
+ initControl(bf, cfgResource);
+
+ class ClientThread extends Thread {
+
+ Greeter greeter;
+ Bus greeterBus;
+ InMessageRecorder inRecorder;
+ OutMessageRecorder outRecorder;
+ String id;
+
+ ClientThread(SpringBusFactory bf, String cfgResource, int n) {
+ CxfBcRMSequenceTest.this.initGreeter(bf, cfgResource, true, null);
+ greeter = CxfBcRMSequenceTest.this.greeter;
+ greeterBus = CxfBcRMSequenceTest.this.greeterBus;
+ inRecorder = CxfBcRMSequenceTest.this.inRecorder;
+ outRecorder = CxfBcRMSequenceTest.this.outRecorder;
+ id = "client " + n;
+ }
+
+ public void run() {
+ greeter.greetMe(id + ": a");
+ greeter.greetMe(id + ": b");
+ greeter.greetMe(id + ": c");
+
+ // three application messages plus createSequence
+
+ awaitMessages(4, 8);
+ }
+ }
+
+ ClientThread clients[] = new ClientThread[2];
+
+ try {
+ for (int i = 0; i < clients.length; i++) {
+ clients[i] = new ClientThread(bf, cfgResource, i);
+ }
+
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].start();
+ }
+
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].join();
+ MessageFlow mf = new MessageFlow(clients[i].outRecorder.getOutboundMessages(),
+ clients[i].inRecorder.getInboundMessages());
+
+ mf.verifyMessages(4, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETME_ACTION,
+ GREETME_ACTION,
+ GREETME_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+ mf.verifyLastMessage(new boolean[] {false, false, false, false}, true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, true, true}, true);
+
+ // createSequenceResponse plus 3 greetMeResponse messages plus
+ // one partial response for each of the four messages
+ // the first partial response should no include an acknowledgement, the other three should
+
+ mf.verifyMessages(8, false);
+ mf.verifyPartialResponses(4, new boolean[4]);
+
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, false);
+ mf.verifyLastMessage(new boolean[4], false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true, true}, false);
+
+ }
+ } finally {
+ for (int i = 0; i < clients.length; i++) {
+ greeter = clients[i].greeter;
+ greeterBus = clients[i].greeterBus;
+ stopGreeter();
+ }
+ greeter = null;
+ }
+ }
+
+ public void testServerSideMessageLoss() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/message-loss-server.xml");
+ if (!doTestServerSideMessageLoss) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/message-loss-server.xml", true);
+
+ // avoid client side message loss
+ List<Interceptor> outInterceptors = greeterBus.getOutInterceptors();
+ for (Interceptor i : outInterceptors) {
+ if (i.getClass().equals(MessageLossSimulator.class)) {
+ outInterceptors.remove(i);
+ break;
+ }
+ }
+ // avoid client side resends
+ greeterBus.getExtension(RMManager.class).getRMAssertion().getBaseRetransmissionInterval()
+ .setMilliseconds(new BigInteger("60000"));
+
+ greeter.greetMe("one");
+ greeter.greetMe("two");
+
+ // outbound: CreateSequence and two greetMe messages
+
+ awaitMessages(3, 6);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+
+ mf.verifyMessages(3, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETME_ACTION,
+ GREETME_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2"}, true);
+ mf.verifyLastMessage(new boolean[] {false, false, false}, true);
+ mf.verifyAcknowledgements(new boolean[] {false, false, true}, true);
+
+ // createSequenceResponse plus 2 greetMeResponse messages plus
+ // one partial response for each of the four messages
+ // the first partial response should no include an acknowledgement, the other three should
+
+ mf.verifyMessages(6, false);
+ mf.verifyPartialResponses(3, new boolean[3]);
+
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ GREETME_RESPONSE_ACTION,
+ GREETME_RESPONSE_ACTION};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2"}, false);
+ mf.verifyLastMessage(new boolean[3], false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true}, false);
+ }
+
+ public void testTerminateOnShutdown() throws Exception {
+ setUpJBI("org/apache/servicemix/cxfbc/ws/rm/terminate-on-shutdown.xml");
+ if (!doTestTerminateOnShutdown) {
+ return;
+ }
+ init("org/apache/servicemix/cxfbc/ws/rm/terminate-on-shutdown.xml", true);
+
+ greeter.greetMeOneWay("neutrophil");
+ greeter.greetMeOneWay("basophil");
+ greeter.greetMeOneWay("eosinophil");
+ stopGreeter();
+
+ awaitMessages(6, 8);
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(6, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION,
+ RMConstants.getLastMessageAction(),
+ RMConstants.getTerminateSequenceAction()};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3", "4", null}, true);
+
+ // inbound: CreateSequenceResponse, out-of-band SequenceAcknowledgement
+ // plus 6 partial responses
+
+ mf.verifyMessages(8, false);
+ mf.verifyMessageNumbers(new String[8], false);
+
+ mf.verifyPartialResponses(6);
+ mf.purgePartialResponses();
+
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ RMConstants.getSequenceAckAction()};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true}, false);
+
+ }
+
+ // --- test utilities ---
+
+ private void init(String cfgResource) {
+ init(cfgResource, false);
+ }
+
+ private void init(String cfgResource, boolean useDecoupledEndpoint) {
+ init(cfgResource, useDecoupledEndpoint, null);
+ }
+
+ private void init(String cfgResource, boolean useDecoupledEndpoint, Executor executor) {
+
+ SpringBusFactory bf = new SpringBusFactory();
+ initControl(bf, cfgResource);
+ initGreeter(bf, cfgResource, useDecoupledEndpoint, executor);
+ }
+
+ private void initControl(SpringBusFactory bf, String cfgResource) {
+ controlBus = bf.createBus();
+ BusFactory.setDefaultBus(controlBus);
+ URL wsdl = null;
+ try {
+ wsdl = new ClassPathResource("/wsdl/greeter_control.wsdl").getURL();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ }
+ ControlService cs = new ControlService(wsdl, CONTROL_SERVICE);
+ control = cs.getControlPort();
+
+ assertTrue("Failed to start greeter", control.startGreeter(cfgResource));
+ }
+
+ private void initGreeter(SpringBusFactory bf, String cfgResource,
+ boolean useDecoupledEndpoint, Executor executor) {
+ greeterBus = bf.createBus(cfgResource);
+ BusFactory.setDefaultBus(greeterBus);
+ LOG.fine("Initialised greeter bus with configuration: " + cfgResource);
+
+ outRecorder = new OutMessageRecorder();
+ greeterBus.getOutInterceptors().add(outRecorder);
+ inRecorder = new InMessageRecorder();
+ greeterBus.getInInterceptors().add(inRecorder);
+ URL wsdl = null;
+ try {
+ wsdl = new ClassPathResource("/wsdl/greeter_control.wsdl").getURL();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ }
+ GreeterService gs = new GreeterService(wsdl, GREETER_SERVICE);
+
+ if (null != executor) {
+ gs.setExecutor(executor);
+ }
+
+ greeter = gs.getGreeterPort();
+ LOG.fine("Created greeter client.");
+
+ ConnectionHelper.setKeepAliveConnection(greeter, true);
+
+ if (!useDecoupledEndpoint) {
+ return;
+ }
+
+ // programatically configure decoupled endpoint that is guaranteed to
+ // be unique across all test cases
+
+ decoupledEndpointPort--;
+ decoupledEndpoint = "http://localhost:" + decoupledEndpointPort + "/decoupled_endpoint";
+
+ Client c = ClientProxy.getClient(greeter);
+ HTTPConduit hc = (HTTPConduit)(c.getConduit());
+ HTTPClientPolicy cp = hc.getClient();
+ cp.setDecoupledEndpoint(decoupledEndpoint);
+
+ LOG.fine("Using decoupled endpoint: " + cp.getDecoupledEndpoint());
+ }
+
+ private void stopGreeter() {
+ if (null != greeterBus) {
+ greeterBus.shutdown(true);
+ greeter = null;
+ greeterBus = null;
+ }
+ }
+
+ private void stopControl() {
+ if (null != control) {
+ assertTrue("Failed to stop greeter", control.stopGreeter(null));
+ controlBus.shutdown(true);
+ }
+ }
+
+ private void awaitMessages(int nExpectedOut, int nExpectedIn) {
+ awaitMessages(nExpectedOut, nExpectedIn, 10000);
+ }
+
+ private void awaitMessages(int nExpectedOut, int nExpectedIn, int timeout) {
+ MessageRecorder mr = new MessageRecorder(outRecorder, inRecorder);
+ mr.awaitMessages(nExpectedOut, nExpectedIn, timeout);
+ }
+
+ private void removeRMInterceptors(List<Interceptor> interceptors) {
+ for (Iterator<Interceptor> it = interceptors.iterator(); it.hasNext();) {
+ Interceptor i = it.next();
+ if (i instanceof RMSoapInterceptor
+ || i instanceof RMOutInterceptor
+ || i instanceof RMInInterceptor) {
+ it.remove();
+ }
+ }
+ }
+
+
+
+ protected AbstractXmlApplicationContext createBeanFactory(String busCfgFile) {
+ //load cxf se and bc from specified spring config file
+ List<SpringXmlPreprocessor> preList = new ArrayList<SpringXmlPreprocessor>();
+ preList.add(new BusCfgSetXmlPreprocessor(busCfgFile));
+ return new ClassPathXmlApplicationContext(new String[] {"org/apache/servicemix/cxfbc/ws/rm/sequence.xml"},
+ true, null, preList);
+ }
+
+ @Override
+ protected AbstractXmlApplicationContext createBeanFactory() {
+ //load cxf se and bc from default spring config file
+ return new ClassPathXmlApplicationContext(
+ "org/apache/servicemix/cxfbc/ws/rm/sequence.xml");
+ }
+
+
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMSequenceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMSequenceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/GreeterImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/GreeterImpl.java?rev=567536&view=auto
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/GreeterImpl.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/GreeterImpl.java Sun Aug 19 20:37:48 2007
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.cxfbc.ws.rm;
+
+import java.util.concurrent.Future;
+import java.util.logging.Logger;
+
+import javax.jws.WebService;
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.greeter_control.PingMeFault;
+import org.apache.cxf.greeter_control.types.FaultDetail;
+import org.apache.cxf.greeter_control.types.GreetMeResponse;
+import org.apache.cxf.greeter_control.types.PingMeResponse;
+import org.apache.cxf.greeter_control.types.SayHiResponse;
+
+
+@WebService(serviceName = "GreeterService",
+ portName = "GreeterPort",
+ endpointInterface = "org.apache.cxf.greeter_control.Greeter",
+ targetNamespace = "http://cxf.apache.org/greeter_control")
+public class GreeterImpl {
+ private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
+ private long delay;
+ private String lastOnewayArg;
+ private boolean throwAlways;
+ private boolean useLastOnewayArg;
+ private int pingMeCount;
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long d) {
+ delay = d;
+ }
+
+ public void resetLastOnewayArg() {
+ lastOnewayArg = null;
+ }
+
+ public void useLastOnewayArg(Boolean use) {
+ useLastOnewayArg = use;
+ }
+
+ public void setThrowAlways(boolean t) {
+ throwAlways = t;
+ }
+
+ public String greetMe(String arg0) {
+ LOG.fine("Executing operation greetMe with parameter: " + arg0);
+ if ("twoway".equals(arg0)) {
+ useLastOnewayArg(true);
+ setDelay(5000);
+ }
+ if (delay > 0) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+ String result = null;
+ synchronized (this) {
+ result = useLastOnewayArg ? lastOnewayArg : arg0.toUpperCase();
+ }
+ LOG.fine("returning: " + result);
+ return result;
+ }
+
+ public Future<?> greetMeAsync(String arg0, AsyncHandler<GreetMeResponse> arg1) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Response<GreetMeResponse> greetMeAsync(String arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void greetMeOneWay(String arg0) {
+ synchronized (this) {
+ lastOnewayArg = arg0;
+ }
+ LOG.info("Executing operation greetMeOneWay with parameter: " + arg0);
+ }
+
+ public void pingMe() throws PingMeFault {
+ pingMeCount++;
+ if ((pingMeCount % 2) == 0 || throwAlways) {
+ LOG.fine("Throwing PingMeFault while executiong operation pingMe");
+ FaultDetail fd = new FaultDetail();
+ fd.setMajor((short)2);
+ fd.setMinor((short)1);
+ throw new PingMeFault("Pings succeed only every other time.", fd);
+ } else {
+ LOG.fine("Executing operation pingMe");
+ }
+ }
+
+ public Response<PingMeResponse> pingMeAsync() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Future<?> pingMeAsync(AsyncHandler<PingMeResponse> arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public String sayHi() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Response<SayHiResponse> sayHiAsync() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Future<?> sayHiAsync(AsyncHandler<SayHiResponse> arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
+
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/GreeterImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/GreeterImpl.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/InMessageRecorder.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/InMessageRecorder.java?rev=567536&view=auto
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/InMessageRecorder.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/InMessageRecorder.java Sun Aug 19 20:37:48 2007
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.cxfbc.ws.rm;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+
+public class InMessageRecorder extends AbstractPhaseInterceptor<Message> {
+
+ private static final Logger LOG = Logger.getLogger(InMessageRecorder.class.getName());
+ private List<byte[]> inbound;
+
+ public InMessageRecorder() {
+ super(Phase.RECEIVE);
+ inbound = new ArrayList<byte[]>();
+ }
+
+ public void handleMessage(Message message) throws Fault {
+ InputStream is = message.getContent(InputStream.class);
+
+ if (is == null) {
+ return;
+ }
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ IOUtils.copy(is, bos);
+ is.close();
+ bos.close();
+ inbound.add(bos.toByteArray());
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("inbound: " + bos.toString());
+ }
+ LOG.info("***inbound: " + bos.toString());
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ message.setContent(InputStream.class, bis);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ public List<byte[]> getInboundMessages() {
+ return inbound;
+ }
+}
+
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/InMessageRecorder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-cxf-bc/src/test/java/org/apache/servicemix/cxfbc/ws/rm/InMessageRecorder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date