You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2006/11/10 13:01:09 UTC
svn commit: r473319 - in /incubator/cxf/trunk:
rt/core/src/main/java/org/apache/cxf/endpoint/
rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/src/...
Author: andreasmyth
Date: Fri Nov 10 04:01:06 2006
New Revision: 473319
URL: http://svn.apache.org/viewvc?view=rev&rev=473319
Log:
[JIRA CXF-130, CXF-140] Support for stand-alone SequenceAcknowledgment messages.
Added:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml (with props)
Modified:
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Fri Nov 10 04:01:06 2006
@@ -30,6 +30,7 @@
import org.apache.cxf.Bus;
import org.apache.cxf.BusException;
import org.apache.cxf.binding.Binding;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.interceptor.AbstractBasicInterceptorProvider;
import org.apache.cxf.interceptor.ClientOutFaultObserver;
import org.apache.cxf.interceptor.Fault;
@@ -56,7 +57,7 @@
public class ClientImpl extends AbstractBasicInterceptorProvider implements Client, MessageObserver {
- private static final Logger LOG = Logger.getLogger(ClientImpl.class.getName());
+ private static final Logger LOG = LogUtils.getL7dLogger(ClientImpl.class);
private static final String FINISHED = "exchange.finished";
@@ -64,12 +65,19 @@
private Endpoint endpoint;
private Conduit initedConduit;
private ClientOutFaultObserver outFaultObserver;
- private int synchronousTimeout = 100000; // default 10 second timeout
+ private int synchronousTimeout = 10000; // default 10 second timeout
public ClientImpl(Bus b, Endpoint e) {
+ this(b, e, null);
+ }
+
+ public ClientImpl(Bus b, Endpoint e, Conduit c) {
bus = b;
endpoint = e;
outFaultObserver = new ClientOutFaultObserver(bus);
+ if (null != c) {
+ initedConduit = c;
+ }
}
public Endpoint getEndpoint() {
@@ -184,7 +192,7 @@
remaining -= (int)(end - start);
}
if (!Boolean.TRUE.equals(exchange.get(FINISHED))) {
- LOG.info("RESPONSE_TIMEOUT");
+ LogUtils.log(LOG, Level.WARNING, "RESPONSE_TIMEOUT");
}
}
Modified: incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/AddressingPropertiesImpl.java Fri Nov 10 04:01:06 2006
@@ -161,4 +161,53 @@
public void exposeAs(String uri) {
namespaceURI = uri;
}
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("[");
+ if (null != messageID) {
+ if (buf.length() > 1) {
+ buf.append(", ");
+ }
+ buf.append("MessageId: ");
+ buf.append(messageID.getValue());
+ }
+ if (null != action) {
+ if (buf.length() > 1) {
+ buf.append(", ");
+ }
+ buf.append("Action: ");
+ buf.append(action.getValue());
+ }
+ if (null != to) {
+ if (buf.length() > 1) {
+ buf.append(", ");
+ }
+ buf.append("To: ");
+ buf.append(to.getValue());
+ }
+ if (null != replyTo) {
+ AttributedURIType address = replyTo.getAddress();
+ if (null != address) {
+ if (buf.length() > 1) {
+ buf.append(", ");
+ }
+ buf.append("ReplyTo: ");
+ buf.append(address.getValue());
+ }
+ }
+ if (null != faultTo) {
+ AttributedURIType address = faultTo.getAddress();
+ if (null != address) {
+ if (buf.length() > 1) {
+ buf.append(", ");
+ }
+ buf.append("FaultTo: ");
+ buf.append(address.getValue());
+ }
+ }
+ buf.append("]");
+ return buf.toString();
+
+ }
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Fri Nov 10 04:01:06 2006
@@ -119,184 +119,4 @@
}
return null;
}
-
-
-
-
-
-
-
-
- /*
- * private static final Logger LOG = LogUtils.getL7dLogger(RMHandler.class);
- * private static Map<BindingBase, RMHandler> handlers; private RMSource
- * source; private RMDestination destination; private RMProxy proxy; private
- * RMServant servant; private ConfigurationHelper configurationHelper;
- * private PersistenceManager persistenceManager; private Timer timer;
- * private boolean busLifeCycleListenesRegistered; @Resource(name =
- * JAXWSConstants.BUS_PROPERTY) private Bus bus; @Resource(name =
- * JAXWSConstants.CLIENT_BINDING_PROPERTY) private ClientBinding
- * clientBinding; @Resource(name = JAXWSConstants.SERVER_BINDING_PROPERTY)
- * private ServerBinding serverBinding; @Resource(name =
- * JAXWSConstants.CLIENT_TRANSPORT_PROPERTY) private ClientTransport
- * clientTransport; @Resource(name =
- * JAXWSConstants.SERVER_TRANSPORT_PROPERTY) private ServerTransport
- * serverTransport; public RMHandler() { proxy = new RMProxy(this); servant =
- * new RMServant(); } @PostConstruct protected synchronized void
- * initialise() { /* getHandlerMap().put(getBinding(), this); if (null ==
- * configurationHelper) { configurationHelper = new
- * ConfigurationHelper(getBinding(), null == clientBinding); } if (null ==
- * getSource()) { source = new RMSource(this); } if (null == destination) {
- * destination = new RMDestination(this); } if (null == timer) { timer = new
- * Timer(); } if (!busLifeCycleListenerRegistered) {
- * getBinding().getBus().getLifeCycleManager()
- * .registerLifeCycleListener(new RMBusLifeCycleListener(getSource()));
- * busLifeCycleListenerRegistered = true; } } public static Map<BindingBase,
- * RMHandler> getHandlerMap() { if (null == handlers) { handlers = new
- * HashMap<BindingBase, RMHandler>(); } return handlers; } public void
- * close(MessageContext context) { // TODO commit transaction } public
- * boolean handleFault(LogicalMessageContext context) { return
- * handle(context); } public boolean handleMessage(LogicalMessageContext
- * context) { return handle(context); } public PersistenceManager
- * getPersistenceManager() { return persistenceManager; } public void
- * setPersistenceManager(PersistenceManager pm) { persistenceManager = pm; }
- * public RMStore getStore() { if (null != persistenceManager) { return
- * persistenceManager.getStore(); } return null; } public Timer getTimer() {
- * return timer; } public Bus getBus() { return bus; } public Transport
- * getTransport() { return null == clientTransport ? serverTransport :
- * clientTransport; } public ClientTransport getClientTransport() { return
- * clientTransport; } public ServerTransport getServerTransport() { return
- * serverTransport; } public ClientBinding getClientBinding() { return
- * clientBinding; } public ServerBinding getServerBinding() { return
- * serverBinding; } public boolean isServerSide() { return null !=
- * serverBinding; } public AbstractBindingBase getBinding() { if (null !=
- * clientBinding) { return (AbstractBindingBase)clientBinding; } return
- * (AbstractBindingBase)serverBinding; } public RMProxy getProxy() { return
- * proxy; } public RMServant getServant() { return servant; } public
- * RMSource getSource() { return source; } public RMDestination
- * getDestination() { return destination; } protected void
- * open(LogicalMessageContext context) { // TODO begin transaction }
- * protected boolean handle(LogicalMessageContext context) { try { if
- * (ContextUtils.isOutbound(context)) { handleOutbound(context); } else {
- * handleInbound(context); } } catch (SequenceFault sf) {
- * LOG.log(Level.SEVERE, "SequenceFault", sf); } return true; } protected
- * void handleOutbound(LogicalMessageContext context) throws SequenceFault {
- * LOG.entering(getClass().getName(), "handleOutbound");
- * AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(context, false,
- * true); // ensure the appropriate version of WS-Addressing is used
- * maps.exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME); String
- * action = null; if (maps != null && null != maps.getAction()) { action =
- * maps.getAction().getValue(); } // nothing to do if this is a
- * CreateSequence, TerminateSequence or // SequenceInfo request if
- * (LOG.isLoggable(Level.FINE)) { LOG.fine("Action: " + action); } boolean
- * isApplicationMessage = true; if
- * (RMUtils.getRMConstants().getCreateSequenceAction().equals(action) ||
- * RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action) ||
- * RMUtils.getRMConstants().getTerminateSequenceAction().equals(action) ||
- * RMUtils.getRMConstants().getLastMessageAction().equals(action) ||
- * RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action) ||
- * RMUtils.getRMConstants().getSequenceInfoAction().equals(action)) {
- * isApplicationMessage = false; } RMPropertiesImpl rmpsOut =
- * (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, true); if
- * (null == rmpsOut) { rmpsOut = new RMPropertiesImpl();
- * RMContextUtils.storeRMProperties(context, rmpsOut, true); }
- * RMPropertiesImpl rmpsIn = null; Identifier inSeqId = null; BigInteger
- * inMessageNumber = null; if (isApplicationMessage) { rmpsIn =
- * (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, false); if
- * (null != rmpsIn && null != rmpsIn.getSequence()) { inSeqId =
- * rmpsIn.getSequence().getIdentifier(); inMessageNumber =
- * rmpsIn.getSequence().getMessageNumber(); } if
- * (LOG.isLoggable(Level.FINE)) { LOG.fine("inbound sequence: " + (null ==
- * inSeqId ? "null" : inSeqId.getValue())); } // not for partial responses
- * to oneway requests if (!(isServerSide() &&
- * BindingContextUtils.isOnewayTransport(context))) { if
- * (!ContextUtils.isRequestor(context)) { assert null != inSeqId; } // get
- * the current sequence, requesting the creation of a new one if necessary
- * SourceSequence seq = getSequence(inSeqId, context, maps); assert null !=
- * seq; // increase message number and store a sequence type object in //
- * context seq.nextMessageNumber(inSeqId, inMessageNumber);
- * rmpsOut.setSequence(seq); // if this was the last message in the
- * sequence, reset the // current sequence so that a new one will be created
- * next // time the handler is invoked if (seq.isLastMessage()) {
- * source.setCurrent(null); } } } // add Acknowledgements (to application
- * messages or explicitly // created Acknowledgement messages only) if
- * (isApplicationMessage ||
- * RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action)) {
- * AttributedURI to = VersionTransformer.convert(maps.getTo()); assert null !=
- * to; addAcknowledgements(rmpsOut, inSeqId, to); } // indicate to the
- * binding that a response is expected from the transport although // the
- * web method is a oneway method if
- * (BindingContextUtils.isOnewayMethod(context) ||
- * RMUtils.getRMConstants().getLastMessageAction().equals(action)) {
- * context.put(OutputStreamMessageContext.ONEWAY_MESSAGE_TF, Boolean.FALSE); } }
- * protected void handleInbound(LogicalMessageContext context) throws
- * SequenceFault { LOG.entering(getClass().getName(), "handleInbound");
- * RMProperties rmps = RMContextUtils.retrieveRMProperties(context, false);
- * final AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(context,
- * false, false); assert null != maps; String action = null; if (null !=
- * maps.getAction()) { action = maps.getAction().getValue(); } if
- * (LOG.isLoggable(Level.FINE)) { LOG.fine("Action: " + action); } if
- * (RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action)) {
- * Object[] parameters =
- * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
- * CreateSequenceResponseType csr =
- * (CreateSequenceResponseType)parameters[0];
- * getServant().createSequenceResponse(getSource(), csr,
- * getProxy().getOfferedIdentifier()); return; } else if
- * (RMUtils.getRMConstants().getCreateSequenceAction().equals(action)) {
- * Object[] parameters =
- * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
- * CreateSequenceType cs = (CreateSequenceType)parameters[0]; final
- * CreateSequenceResponseType csr =
- * getServant().createSequence(getDestination(), cs, maps); Runnable
- * response = new Runnable() { public void run() { try {
- * getProxy().createSequenceResponse(maps, csr); } catch (IOException ex) {
- * ex.printStackTrace(); } catch (SequenceFault sf) { sf.printStackTrace(); } } };
- * getBinding().getBus().getWorkQueueManager().getAutomaticWorkQueue().execute(response);
- * return; } else if
- * (RMUtils.getRMConstants().getTerminateSequenceAction().equals(action)) {
- * Object[] parameters =
- * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
- * TerminateSequenceType cs = (TerminateSequenceType)parameters[0];
- * getServant().terminateSequence(getDestination(), cs.getIdentifier()); } //
- * for application AND out of band messages if (null != rmps) {
- * processAcknowledgments(rmps); processAcknowledgmentRequests(rmps);
- * processSequence(rmps, maps); processDeliveryAssurance(rmps); } } void
- * processAcknowledgments(RMProperties rmps) { Collection<SequenceAcknowledgement>
- * acks = rmps.getAcks(); if (null != acks) { for (SequenceAcknowledgement
- * ack : acks) { getSource().setAcknowledged(ack); } } } void
- * processSequence(RMProperties rmps, AddressingProperties maps) throws
- * SequenceFault { SequenceType s = rmps.getSequence(); if (null == s) {
- * return; } getDestination().acknowledge(s, null == maps.getReplyTo() ?
- * null : maps.getReplyTo().getAddress().getValue()); } void
- * processAcknowledgmentRequests(RMProperties rmps) { Collection<AckRequestedType>
- * requested = rmps.getAcksRequested(); if (null != requested) { for
- * (AckRequestedType ar : requested) { DestinationSequence seq =
- * getDestination().getSequence(ar.getIdentifier()); if (null != seq) {
- * seq.scheduleImmediateAcknowledgement(); } else { LOG.severe("No such
- * sequence."); } } } } boolean processDeliveryAssurance(RMProperties rmps) {
- * SequenceType s = rmps.getSequence(); if (null == s) { return true; }
- * DestinationSequence ds = destination.getSequence(s.getIdentifier());
- * return ds.applyDeliveryAssurance(s.getMessageNumber()); }
- */
-
- /*
-
- protected void setInitialised(ConfigurationHelper ch,
- RMSource s,
- RMDestination d,
- Timer t,
- boolean registered
- ) {
- configurationHelper = ch;
- source = s;
- destination = d;
- timer = t;
- busLifeCycleListenerRegistered = registered;
- initialise();
- }
- */
-
-
-
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Fri Nov 10 04:01:06 2006
@@ -180,7 +180,7 @@
boolean canPiggybackAckOnPartialResponse() {
// TODO: should also check if we allow breaking the WI Profile rule by which no headers
// can be included in a HTTP response
- return getAcksTo().getAddress().getValue().equals(RMConstants.getAnonympusAddress());
+ return getAcksTo().getAddress().getValue().equals(RMConstants.getAnonymousAddress());
}
/**
@@ -277,6 +277,7 @@
}
synchronized void scheduleDeferredAcknowledgement(int delay) {
+
if (null == deferredAcknowledgments) {
deferredAcknowledgments = new ArrayList<DeferredAcknowledgment>();
}
@@ -290,11 +291,13 @@
DeferredAcknowledgment da = new DeferredAcknowledgment();
deferredAcknowledgments.add(da);
destination.getManager().getTimer().schedule(da, delay);
+ LOG.fine("Scheduled acknowledgment to be sent in " + delay + " ms");
}
final class DeferredAcknowledgment extends TimerTask {
public void run() {
+ LOG.fine("timer task: send acknowledgment.");
DestinationSequence.this.scheduleImmediateAcknowledgement();
try {
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Fri Nov 10 04:01:06 2006
@@ -20,20 +20,29 @@
package org.apache.cxf.ws.rm;
import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.datatype.Duration;
import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.ClientImpl;
import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.phase.PhaseInterceptorChain;
import org.apache.cxf.service.model.BindingInfo;
import org.apache.cxf.service.model.BindingOperationInfo;
import org.apache.cxf.service.model.OperationInfo;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.JAXWSAConstants;
import org.apache.cxf.ws.addressing.RelatesToType;
+import org.apache.cxf.ws.addressing.VersionTransformer;
import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
import org.apache.cxf.ws.rm.manager.SourcePolicyType;
@@ -43,7 +52,7 @@
*/
public class Proxy {
- private static final Logger LOG = Logger.getLogger(Proxy.class.getName());
+ private static final Logger LOG = LogUtils.getL7dLogger(Proxy.class);
private RMEndpoint reliableEndpoint;
@@ -58,7 +67,17 @@
}
void acknowledge(DestinationSequence ds) throws IOException {
-
+ OperationInfo oi = reliableEndpoint.getService().getServiceInfo().getInterface()
+ .getOperation(RMConstants.getSequenceAckOperationName());
+ Map<String, Object> requestContext = new HashMap<String, Object>();
+ AddressingPropertiesImpl maps = new AddressingPropertiesImpl();
+ maps.setTo(VersionTransformer.convert(ds.getAcksTo()).getAddress());
+ maps.setReplyTo(reliableEndpoint.getTransportDestination().getAddress());
+ requestContext.put(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES, maps);
+ Map<String, Object> context = CastUtils.cast(
+ Collections.singletonMap(Client.REQUEST_CONTEXT, requestContext),
+ String.class, Object.class);
+ invoke(oi, new Object[] {}, context);
}
public CreateSequenceResponseType createSequence(org.apache.cxf.ws.addressing.EndpointReferenceType to,
@@ -98,20 +117,16 @@
OperationInfo oi = reliableEndpoint.getService().getServiceInfo().getInterface()
.getOperation(RMConstants.getCreateSequenceOperationName());
- Object result = invoke(oi, new Object[] {create});
- LOG.info("result: " + result);
- return (CreateSequenceResponseType)result;
-
+ return (CreateSequenceResponseType)invoke(oi, new Object[] {create}, null);
}
void lastMessage(SourceSequence s) throws IOException {
// TODO
}
- Object invoke(OperationInfo oi, Object[] params) {
+ Object invoke(OperationInfo oi, Object[] params, Map<String, Object> context) {
LOG.log(Level.INFO, "Invoking out-of-band RM protocol message {0}.",
oi == null ? null : oi.getName());
- LOG.log(Level.INFO, "params: " + params);
// assuming we are on the client side
@@ -120,11 +135,15 @@
Endpoint endpoint = reliableEndpoint.getEndpoint();
BindingInfo bi = reliableEndpoint.getBindingInfo();
-
- Client client = new RMClient(bus, endpoint);
+ if (null == reliableEndpoint.getConduit()) {
+ LOG.severe("No conduit to available.");
+ return null;
+ }
+ Client client = new RMClient(bus, endpoint, reliableEndpoint.getConduit());
+
BindingOperationInfo boi = bi.getOperation(oi);
try {
- Object[] result = client.invoke(boi, params, null);
+ Object[] result = client.invoke(boi, params, context);
if (result != null && result.length > 0) {
return result[0];
}
@@ -136,8 +155,8 @@
}
class RMClient extends ClientImpl {
- RMClient(Bus bus, Endpoint endpoint) {
- super(bus, endpoint);
+ RMClient(Bus bus, Endpoint endpoint, Conduit conduit) {
+ super(bus, endpoint, conduit);
}
@Override
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java Fri Nov 10 04:01:06 2006
@@ -93,6 +93,9 @@
private static final String WSRM_TERMINATE_SEQUENCE_ACTION =
WSRM_NAMESPACE_NAME + "/TerminateSequence";
+ private static final String WSRM_SEQUENCE_ACK_ACTION =
+ WSRM_NAMESPACE_NAME + "/SequenceAcknowledgement";
+
private static final String WSRM_LAST_MESSAGE_ACTION =
WSRM_NAMESPACE_NAME + "/LastMessage";
@@ -179,7 +182,7 @@
return WSRM_ACK_REQUESTED_QNAME;
}
- public static String getAnonympusAddress() {
+ public static String getAnonymousAddress() {
return WSA_ANONYMOUS_ADDRESS;
}
@@ -199,6 +202,10 @@
return WSRM_TERMINATE_SEQUENCE_QNAME;
}
+ public static QName getSequenceAckOperationName() {
+ return WSRM_SEQUENCE_ACK_QNAME;
+ }
+
public static String getCreateSequenceAction() {
return WSRM_CREATE_SEQUENCE_ACTION;
}
@@ -209,6 +216,10 @@
public static String getTerminateSequenceAction() {
return WSRM_TERMINATE_SEQUENCE_ACTION;
+ }
+
+ public static String getSequenceAckAction() {
+ return WSRM_SEQUENCE_ACK_ACTION;
}
public static String getLastMessageAction() {
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Fri Nov 10 04:01:06 2006
@@ -73,8 +73,12 @@
* @return true iff message is currently being processed on server side
*/
public static boolean isServerSide(Message message) {
- // TODO
- return false;
+ if (isOutbound(message)) {
+ return message.getExchange().getInMessage() != null;
+ } else {
+ return message.getExchange().getOutMessage() == null
+ && message.getExchange().getFaultMessage() == null;
+ }
}
/**
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Fri Nov 10 04:01:06 2006
@@ -41,6 +41,7 @@
import org.apache.cxf.service.model.OperationInfo;
import org.apache.cxf.service.model.ServiceInfo;
import org.apache.cxf.service.model.UnwrappedOperationInfo;
+import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.addressing.wsdl.UsingAddressing;
public class RMEndpoint {
@@ -56,6 +57,8 @@
private final RMManager manager;
private final Endpoint applicationEndpoint;
+ private Conduit conduit;
+ private org.apache.cxf.transport.Destination transportDestination;
private Source source;
private Destination destination;
private Service service;
@@ -156,7 +159,23 @@
this.source = source;
}
- void initialise() {
+ /**
+ * @return Returns the conduit.
+ */
+ public Conduit getConduit() {
+ return conduit;
+ }
+
+ /**
+ * @return Returns the conduit.
+ */
+ public org.apache.cxf.transport.Destination getTransportDestination() {
+ return transportDestination;
+ }
+
+ void initialise(Conduit c, org.apache.cxf.transport.Destination d) {
+ conduit = c;
+ transportDestination = d;
createService();
createEndpoint();
}
@@ -223,7 +242,6 @@
partInfo.setElementQName(RMConstants.getCreateSequenceOperationName());
partInfo.setElement(true);
partInfo.setTypeClass(CreateSequenceType.class);
-
unwrappedMessageInfo = new MessageInfo(operationInfo, messageInfo.getName());
unwrappedOperationInfo = new UnwrappedOperationInfo(operationInfo);
operationInfo.setUnwrappedOperation(unwrappedOperationInfo);
@@ -239,7 +257,6 @@
partInfo.setElementQName(RMConstants.getCreateSequenceResponseOperationName());
partInfo.setElement(true);
partInfo.setTypeClass(CreateSequenceResponseType.class);
-
unwrappedMessageInfo = new MessageInfo(operationInfo, messageInfo.getName());
unwrappedOperationInfo.setOutput(operationInfo.getOutputName(), unwrappedMessageInfo);
partInfo = unwrappedMessageInfo.addMessagePart("createResponse");
@@ -247,20 +264,6 @@
partInfo.setElement(true);
partInfo.setTypeClass(CreateSequenceResponseType.class);
- /*
- oi = ii.addOperation(RMConstants.getCreateSequenceResponseOperationName());
- mi = oi.createMessage(RMConstants.getCreateSequenceResponseOperationName());
- oi.setInput(mi.getName().getLocalPart(), mi);
- pi = mi.addMessagePart("createResponse");
- pi.setElementQName(RMConstants.getCreateSequenceResponseOperationName());
- pi.setElement(true);
- pi.setTypeClass(CreateSequenceResponseType.class);
- unwrappedInput = new MessageInfo(oi, mi.getName());
- unwrapped = new UnwrappedOperationInfo(oi);
- oi.setUnwrappedOperation(unwrapped);
- unwrapped.setInput(oi.getInputName(), unwrappedInput);
- */
-
operationInfo = ii.addOperation(RMConstants.getTerminateSequenceOperationName());
messageInfo = operationInfo.createMessage(RMConstants.getTerminateSequenceOperationName());
operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
@@ -272,6 +275,14 @@
unwrappedOperationInfo = new UnwrappedOperationInfo(operationInfo);
operationInfo.setUnwrappedOperation(unwrappedOperationInfo);
unwrappedOperationInfo.setInput(operationInfo.getInputName(), unwrappedMessageInfo);
+
+ operationInfo = ii.addOperation(RMConstants.getSequenceAckOperationName());
+ messageInfo = operationInfo.createMessage(RMConstants.getSequenceAckOperationName());
+ operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
+ unwrappedMessageInfo = new MessageInfo(operationInfo, messageInfo.getName());
+ unwrappedOperationInfo = new UnwrappedOperationInfo(operationInfo);
+ operationInfo.setUnwrappedOperation(unwrappedOperationInfo);
+ unwrappedOperationInfo.setInput(operationInfo.getInputName(), unwrappedMessageInfo);
}
void buildBindingInfo(ServiceInfo si) {
@@ -290,19 +301,18 @@
boi.addExtensor(soi);
bi.addOperation(boi);
- /*
- boi = bi.buildOperation(RMConstants.getCreateSequenceResponseOperationName(),
- RMConstants.getCreateSequenceResponseOperationName().getLocalPart(), null);
+ boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(),
+ RMConstants.getTerminateSequenceOperationName().getLocalPart(), null);
soi = new SoapOperationInfo();
- soi.setAction(RMConstants.getCreateSequenceResponseAction());
+ soi.setAction(RMConstants.getTerminateSequenceAction());
boi.addExtensor(soi);
bi.addOperation(boi);
- */
- boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(),
- RMConstants.getTerminateSequenceOperationName().getLocalPart(), null);
+ boi = bi.buildOperation(RMConstants.getSequenceAckOperationName(),
+ null, null);
+ assert null != boi;
soi = new SoapOperationInfo();
- soi.setAction(RMConstants.getTerminateSequenceAction());
+ soi.setAction(RMConstants.getSequenceAckAction());
boi.addExtensor(soi);
bi.addOperation(boi);
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Fri Nov 10 04:01:06 2006
@@ -109,16 +109,19 @@
return;
} else if (RMConstants.getTerminateSequenceAction().equals(action)) {
// servant.terminateSequence(message);
+ } else if (RMConstants.getSequenceAckAction().equals(action)) {
+ processAcknowledgments(rmps);
+ return;
}
// for application AND out of band messages
-
+
Destination destination = getManager().getDestination(message);
if (null != rmps) {
-
- processAcknowledgments(rmps);
+ processAcknowledgments(rmps);
+
processAcknowledgmentRequests(rmps);
processSequence(destination, rmps, maps);
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Fri Nov 10 04:01:06 2006
@@ -31,6 +31,7 @@
import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
import org.apache.cxf.ws.addressing.RelatesToType;
@@ -96,7 +97,20 @@
RMEndpoint rme = reliableEndpoints.get(endpoint);
if (null == rme) {
rme = new RMEndpoint(this, endpoint);
- rme.initialise();
+ Conduit conduit = null;
+ org.apache.cxf.transport.Destination destination = null;
+ if (RMContextUtils.isServerSide(message)) {
+ AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
+ destination = message.getExchange().getDestination();
+ try {
+ conduit = destination.getBackChannel(message, null, maps.getReplyTo());
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ } else {
+ conduit = message.getExchange().getConduit();
+ }
+ rme.initialise(conduit, destination);
reliableEndpoints.put(endpoint, rme);
}
return rme;
@@ -142,10 +156,16 @@
relatesTo.setValue(inSeq != null ? inSeq.getCorrelationID() : null);
} else {
- acksTo = VersionTransformer.convert(maps.getReplyTo());
- // for oneways
- if (RMConstants.getNoneAddress().equals(acksTo.getAddress().getValue())) {
- acksTo = RMUtils.createAnonymousReference2004();
+ acksTo = VersionTransformer.convert(maps.getReplyTo());
+ if (!RMContextUtils.isServerSide(message)
+ && RMConstants.getNoneAddress().equals(acksTo.getAddress().getValue())) {
+ org.apache.cxf.transport.Destination dest = message.getExchange()
+ .getConduit().getBackChannel();
+ if (null == dest) {
+ acksTo = RMUtils.createAnonymousReference2004();
+ } else {
+ acksTo = VersionTransformer.convert(dest.getAddress());
+ }
}
}
@@ -184,8 +204,12 @@
}
if (!isSetSourcePolicy()) {
SourcePolicyType sp = factory.createSourcePolicyType();
- sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
setSourcePolicy(sp);
+
+ }
+ if (!getSourcePolicy().isSetSequenceTerminationPolicy()) {
+ getSourcePolicy().setSequenceTerminationPolicy(
+ factory.createSequenceTerminationPolicyType());
}
if (!isSetDestinationPolicy()) {
DestinationPolicyType dp = factory.createDestinationPolicyType();
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Fri Nov 10 04:01:06 2006
@@ -130,7 +130,11 @@
AttributedURI to = VersionTransformer.convert(maps.getTo());
assert null != to;
addAcknowledgements(destination, rmpsOut, inSeqId, to);
- }
+ }
+
+ if (RMConstants.getSequenceAckAction().equals(action)) {
+ maps.setReplyTo(RMUtils.createNoneReference());
+ }
}
void addAcknowledgements(Destination destination,
@@ -150,8 +154,8 @@
LOG.fine("no need to add an acknowledgements for sequence "
+ seq.getIdentifier().getValue());
} else {
- LOG.fine("sequences acksTo (" + seq.getAcksTo().getAddress().getValue()
- + ") does not match to (" + to.getValue() + ")");
+ LOG.fine("sequences acksTo address (" + seq.getAcksTo().getAddress().getValue()
+ + ") does not match to address (" + to.getValue() + ")");
}
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Fri Nov 10 04:01:06 2006
@@ -28,7 +28,6 @@
import javax.xml.datatype.Duration;
import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.jaxb.DatatypeFactory;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
@@ -44,7 +43,7 @@
*/
public class Servant implements Invoker {
- private static final Logger LOG = LogUtils.getL7dLogger(AbstractRMInterceptor.class);
+ private static final Logger LOG = LogUtils.getL7dLogger(Servant.class);
private RMEndpoint reliableEndpoint;
// REVISIT assumption there is only a single outstanding unattached Identifier
private Identifier unattachedIdentifier;
@@ -55,15 +54,6 @@
public Object invoke(Exchange exchange, Object o) {
LOG.fine("Invoking on RM Endpoint");
- List<Object> params = CastUtils.cast((List<?>)o);
- Object param = params.get(0);
- LOG.fine("param: " + param);
- if (param instanceof CreateSequenceType) {
- CreateSequenceType create = (CreateSequenceType)param;
- LOG.info("CreateSequenceType: " + create);
- LOG.info(" acksTo: " + create.getAcksTo());
- LOG.info(" offer: " + create.getOffer());
- }
OperationInfo oi = exchange.get(OperationInfo.class);
if (RMConstants.getCreateSequenceOperationName().equals(oi.getName())) {
try {
@@ -99,8 +89,6 @@
supportedDuration = DatatypeFactory.PT0S;
}
Expires ex = create.getExpires();
- LOG.fine("expires: " + ex);
- LOG.fine("acksTo: " + create.getAcksTo());
if (null != ex || supportedDuration.isShorterThan(DatatypeFactory.PT0S)) {
Duration effectiveDuration = supportedDuration;
@@ -113,7 +101,6 @@
}
OfferType offer = create.getOffer();
- LOG.fine("offer: " + offer);
if (null != offer) {
AcceptType accept = RMUtils.getWSRMFactory().createAcceptType();
if (dp.isAcceptOffers()) {
@@ -147,7 +134,6 @@
seq.setCorrelationID(maps.getMessageID().getValue());
destination.addSequence(seq);
- LOG.fine("Returning createResponse: " + createResponse);
return createResponse;
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java Fri Nov 10 04:01:06 2006
@@ -40,10 +40,16 @@
InterfaceInfo intf = si.getInterface();
- assertEquals(2, intf.getOperations().size());
+ assertEquals(3, intf.getOperations().size());
String ns = si.getName().getNamespaceURI();
OperationInfo oi = intf.getOperation(new QName(ns, "CreateSequence"));
+ assertNotNull("No operation info.", oi);
+
+ oi = intf.getOperation(new QName(ns, "TerminateSequence"));
+ assertNotNull("No operation info.", oi);
+
+ oi = intf.getOperation(new QName(ns, "SequenceAcknowledgement"));
assertNotNull("No operation info.", oi);
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Fri Nov 10 04:01:06 2006
@@ -30,6 +30,7 @@
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.RelatesToType;
@@ -180,6 +181,14 @@
Method m = RMManager.class.getDeclaredMethod("getSource", new Class[] {Message.class});
RMManager manager = control.createMock(RMManager.class, new Method[] {m});
Message message = control.createMock(Message.class);
+ Exchange exchange = control.createMock(Exchange.class);
+ EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
+ EasyMock.expect(exchange.getOutMessage()).andReturn(message).anyTimes();
+ EasyMock.expect(exchange.getInMessage()).andReturn(null).anyTimes();
+ EasyMock.expect(exchange.getFaultMessage()).andReturn(null).anyTimes();
+ Conduit conduit = control.createMock(Conduit.class);
+ EasyMock.expect(exchange.getConduit()).andReturn(conduit).anyTimes();
+ EasyMock.expect(conduit.getBackChannel()).andReturn(null).anyTimes();
Identifier inSid = control.createMock(Identifier.class);
AddressingProperties maps = control.createMock(AddressingProperties.class);
Source source = control.createMock(Source.class);
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java Fri Nov 10 04:01:06 2006
@@ -51,6 +51,9 @@
greeterBus = bf.createBus(cfgResource);
bf.setDefaultBus(greeterBus);
LOG.info("Initialised bus with cfg file resource: " + cfgResource);
+ greeterBus.getOutInterceptors().add(new JaxwsInterceptorRemover());
+ greeterBus.getOutInterceptors().add(new OutMessageRecorder());
+ greeterBus.getInInterceptors().add(new InMessageRecorder());
GreeterImpl implementor = new GreeterImpl();
String address = "http://localhost:9020/SoapContext/GreeterPort";
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java Fri Nov 10 04:01:06 2006
@@ -45,19 +45,36 @@
private List<Document> inboundMessages;
public MessageFlow(List<byte[]> out, List<byte[]> in) throws Exception {
+ inboundMessages = new ArrayList<Document>();
+ outboundMessages = new ArrayList<Document>();
+ reset(out, in);
+ }
+
+ public void clear() throws Exception {
+ inStreams.clear();
+ outStreams.clear();
+ }
+
+ public final void reset(List<byte[]> out, List<byte[]> in) throws Exception {
+ for (int i = 0; i < inboundMessages.size(); i++) {
+ in.remove(0);
+ }
inStreams = in;
+ for (int i = 0; i < outboundMessages.size(); i++) {
+ out.remove(0);
+ }
outStreams = out;
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
DocumentBuilder parser = factory.newDocumentBuilder();
- inboundMessages = new ArrayList<Document>();
+ inboundMessages.clear();
for (int i = 0; i < inStreams.size(); i++) {
byte[] bytes = inStreams.get(i);
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
Document document = parser.parse(is);
inboundMessages.add(document);
}
- outboundMessages = new ArrayList<Document>();
+ outboundMessages.clear();
for (int i = 0; i < outStreams.size(); i++) {
byte[] bytes = outStreams.get(i);
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=473319&r1=473318&r2=473319
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Fri Nov 10 04:01:06 2006
@@ -42,10 +42,11 @@
public class SequenceTest extends ClientServerTestBase {
private static final Logger LOG = Logger.getLogger(SequenceTest.class.getName());
- private static final String APP_NAMESPACE = "http://celtix.objectweb.org/greeter_control";
- private static final String GREETMEONEWAY_ACTION = APP_NAMESPACE + "/types/Greeter/greetMeOneWay";
- //private static final String GREETME_ACTION = APP_NAMESPACE + "/types/Greeter/greetMe";
- //private static final String GREETME_RESPONSE_ACTION = GREETME_ACTION + "Response";
+ // private static final String APP_NAMESPACE = "http://celtix.objectweb.org/greeter_control";
+ // private static final String GREETMEONEWAY_ACTION = APP_NAMESPACE + "/types/Greeter/greetMeOneWay";
+ // private static final String GREETME_ACTION = APP_NAMESPACE + "/types/Greeter/greetMe";
+ // private static final String GREETME_RESPONSE_ACTION = GREETME_ACTION + "Response";
+ private static final String GREETMEONEWAY_ACTION = null;
private Bus controlBus;
private Control control;
@@ -56,6 +57,7 @@
private boolean doTestOnewayAnonymousAcks = true;
private boolean doTestOnewayDeferredAnonymousAcks = true;
+ private boolean doTestOnewayDeferredNonAnonymousAcks = true;
public static void main(String[] args) {
junit.textui.TestRunner.run(SequenceTest.class);
@@ -128,7 +130,6 @@
mf.verifyMessages(4, true);
String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), GREETMEONEWAY_ACTION,
GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION};
- expectedActions = new String[] {RMConstants.getCreateSequenceAction(), null, null, null};
mf.verifyActions(expectedActions, true);
mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
@@ -162,8 +163,8 @@
// three application messages plus createSequence
mf.verifyMessages(4, true);
- String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), null,
- null, null};
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION, GREETMEONEWAY_ACTION};
mf.verifyActions(expectedActions, true);
mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
@@ -177,6 +178,57 @@
mf.verifyMessageNumbers(new String[] {null, null, null, null}, false);
mf.verifyAcknowledgements(new boolean[] {false, false, false, true}, false);
}
+
+ public void testOnewayDeferredNonAnonymousAcks() throws Exception {
+ if (!doTestOnewayDeferredNonAnonymousAcks) {
+ return;
+ }
+ setupGreeter("org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml");
+
+ greeter.greetMeOneWay("once");
+ greeter.greetMeOneWay("twice");
+
+ // CreateSequence plus two greetMeOneWay requests
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(3, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2"}, true);
+
+ // CreateSequenceResponse plus two partial responses, no
+ // acknowledgments included
+
+ Thread.sleep(2000);
+
+ mf.verifyMessages(4, false);
+ expectedActions = new String[] {null, RMConstants.getCreateSequenceResponseAction(),
+ null, null};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[4], false);
+ mf.verifyAcknowledgements(new boolean[4], false);
+
+ // mf.clear();
+
+ try {
+ Thread.sleep(3 * 1000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+
+ // a standalone acknowledgement should have been sent from the server
+ // side by now
+
+ mf.reset(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(0, true);
+ mf.verifyMessages(1, false);
+
+ }
+
// --- test utilities ---
Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml?view=auto&rev=473319
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml (added)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml Fri Nov 10 04:01:06 2006
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:wsrm-mgmt="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xmlns:http-conf="http://cxf.apache.org/transports/http/configuration"
+ xsi:schemaLocation="
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <bean name="{http://cxf.apache.org/greeter_control}GreeterPort.http-conduit" abstract="true">
+ <property name="client">
+ <value>
+ <http-conf:client DecoupledEndpoint="http://localhost:9999/decoupled_endpoint"/>
+ </value>
+ </property>
+ </bean>
+
+ <bean id="org.apache.cxf.ws.rm.RMManager" class="org.apache.cxf.ws.rm.RMManager">
+ <property name="bus" ref="cxf"/>
+ <property name="destinationPolicy">
+ <value>
+ <wsrm-mgmt:destinationPolicy>
+ <wsrm-mgmt:acksPolicy intraMessageThreshold="0"/>
+ </wsrm-mgmt:destinationPolicy>
+ </value>
+ </property>
+ <property name="sourcePolicy">
+ <value>
+ <wsrm-mgmt:sourcePolicy acksTo="http://localhost:9999/decoupled_endpoint"/>
+ </value>
+ </property>
+
+ <property name="RMAssertion">
+ <value>
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="10000"/>
+ <wsrm-policy:AcknowledgementInterval Milliseconds="2000"/>
+ </wsrm-policy:RMAssertion>
+ </value>
+ </property>
+ </bean>
+
+ <bean id="mapAggregator" class="org.apache.cxf.ws.addressing.MAPAggregator"/>
+ <bean id="mapCodec" class="org.apache.cxf.ws.addressing.soap.MAPCodec"/>
+ <bean id="rmLogicalOut" class="org.apache.cxf.ws.rm.RMOutInterceptor">
+ <property name="bus" ref="cxf"/>
+ </bean>
+ <bean id="rmLogicalIn" class="org.apache.cxf.ws.rm.RMInInterceptor">
+ <property name="bus" ref="cxf"/>
+ </bean>
+ <bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+
+ <!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
+
+ <bean id="cxf" class="org.apache.cxf.bus.spring.SpringBusImpl">
+ <property name="inInterceptors">
+ <list>
+ <ref bean="mapAggregator"/>
+ <ref bean="mapCodec"/>
+ <ref bean="rmLogicalIn"/>
+ <ref bean="rmCodec"/>
+ </list>
+ </property>
+ <property name="inFaultInterceptors">
+ <list>
+ <ref bean="mapAggregator"/>
+ <ref bean="mapCodec"/>
+ <ref bean="rmLogicalIn"/>
+ <ref bean="rmCodec"/>
+ </list>
+ </property>
+ <property name="outInterceptors">
+ <list>
+ <ref bean="mapAggregator"/>
+ <ref bean="mapCodec"/>
+ <ref bean="rmLogicalOut"/>
+ <ref bean="rmCodec"/>
+ </list>
+ </property>
+ <property name="outFaultInterceptors">
+ <list>
+ <ref bean="mapAggregator"/>
+ <ref bean="mapCodec"/>
+ <ref bean="rmLogicalOut"/>
+ <ref bean="rmCodec"/>
+ </list>
+ </property>
+ </bean>
+</beans>
\ No newline at end of file
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/nonanonymous-deferred.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml