You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by pa...@apache.org on 2009/04/03 13:35:12 UTC
svn commit: r761627 - in
/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2:
msgprocessors/ polling/ util/ workers/ wsrm/
Author: parsonsd
Date: Fri Apr 3 11:35:11 2009
New Revision: 761627
URL: http://svn.apache.org/viewvc?rev=761627&view=rev
Log:
Fixes made to improve compliance with spec's and interop with other implementations:
- AckRequest's now added as piggybacks to application msgs (this is needed as implementations don't have to piggyback acks and therefore could only respond to ackRequest msgs.)
- We can ignore piggybacked ack requests as Sandesha piggyback acks at every opportunity
- Offered EP's can't be set to none so have set to use AcksTo as the offered EP if one hasn't been set
- Updated so that we poll for terminateSeqResponse
- Updated so that we offer for 1.1 as well as 1.0. This fixes an interop issue with WCF.
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Fri Apr 3 11:35:11 2009
@@ -47,6 +47,7 @@
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RMMsgCreator;
@@ -98,6 +99,15 @@
//checks weather the ack request was a piggybacked one.
boolean piggybackedAckRequest = !(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.ACK_REQUEST);
+
+ //it is a piggybacked ackrequest so we can ignore as we will piggyback acks at every opportunity anyway
+ if(piggybackedAckRequest){
+ if (log.isDebugEnabled())
+ log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, it is a piggybacked ackrequest for seq " +
+ "so we can ignore as we will piggyback an ack " + Boolean.FALSE);
+ //No need to suspend. Just proceed.
+ return false;
+ }
String sequenceId = ackRequested.getIdentifier().getIdentifier();
@@ -143,10 +153,7 @@
//creating the ack message. If the ackRequest was a standalone this will be a out (response) message
MessageContext ackMsgCtx = null;
-// if (piggybackedAckRequest)
- ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
-// else
-// ackMsgCtx =MessageContextBuilder.createOutMessageContext (msgContext);
+ ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
//setting up the RMMsgContext
RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
@@ -196,65 +203,14 @@
}
} else {
- //If AcksTo is non-anonymous we will be adding a senderBean entry here. The sender is responsible
- //for sending it out.
-
- SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-
- String key = SandeshaUtil.getUUID();
-
- // dumping to the storage will be done be Sandesha2 Transport Sender
- // storageManager.storeMessageContext(key,ackMsgCtx);
-
- SenderBean ackBean = new SenderBean();
- ackBean.setMessageContextRefKey(key);
- ackBean.setMessageID(ackMsgCtx.getMessageID());
-
- //acks are sent only once.
- ackBean.setReSend(false);
-
- ackBean.setSequenceID(sequenceId);
-
- EndpointReference to = ackMsgCtx.getTo();
- if (to!=null)
- ackBean.setToAddress(to.getAddress());
-
- // this will be set to true in the sender.
- ackBean.setSend(true);
-
- ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(msgContext.getAxisOperation());
long ackInterval = propertyBean.getAcknowledgementInterval();
- // Ack will be sent as stand alone, only after the ackknowledgement interval
+ // Ack will be sent as stand alone, only after the acknowledgement interval
long timeToSend = System.currentTimeMillis() + ackInterval;
- // removing old acks.
- SenderBean findBean = new SenderBean();
- findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
- Collection<SenderBean> coll = senderBeanMgr.find(findBean);
- Iterator<SenderBean> it = coll.iterator();
-
- if (it.hasNext()) {
- SenderBean oldAckBean = (SenderBean) it.next();
- // If there is an old Ack. This Ack will be sent in the old timeToSend.
- timeToSend = oldAckBean.getTimeToSend();
- senderBeanMgr.delete(oldAckBean.getMessageID());
- }
-
- ackBean.setTimeToSend(timeToSend);
-
- msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- // passing the message through sandesha2sender
- SandeshaUtil.executeAndStore(ackRMMsgCtx, key, storageManager);
-
- // inserting the new Ack.
- senderBeanMgr.insert(ackBean);
-
- msgContext.pause();
+ AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, storageManager);
}
if (log.isDebugEnabled())
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Fri Apr 3 11:35:11 2009
@@ -178,58 +178,66 @@
// offered seq id
String offeredSequenceID = offer.getIdentifer().getIdentifier();
+
+ //Need to see if this is a duplicate offer.
+ //If it is we can't accept the offer as we can't be sure it has come from the same client.
+ RMSBean finderBean = new RMSBean ();
+ finderBean.setSequenceID(offeredSequenceID);
+ RMSBean rMSBean = storageManager.getRMSBeanMgr().findUnique(finderBean);
+ boolean offerAccepted = false;
+ String outgoingSideInternalSequenceId = SandeshaUtil
+ .getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+
+ if(rMSBean != null){
+ if (log.isDebugEnabled())
+ log.debug("Duplicate offer so we can't accept as we can't be sure it's from the same client: " + offeredSequenceID);
+ offerAccepted = false;
+ } else {
+ boolean isValidseqID = isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager);
+ offerAccepted = true;
- boolean isValidseqID = isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager);
- boolean offerAccepted = true;
-
- RMSBean rMSBean = null;
- //Before processing this offer any further we need to perform some extra checks
- //on the offered EP if WS-RM Spec 1.1 is being used
- if(isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
- Endpoint endpoint = offer.getEndpoint();
- if (endpoint!=null) {
- //Check to see if the offer endpoint has a value of WSA Anonymous
- String addressingNamespace = (String) createSeqRMMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
- String endpointAddress = endpoint.getEPR().getAddress();
- if(SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace).equals(endpointAddress)){
- //We will still accept this offer but we should warn the user that this MEP is not always reliable or efficient
- if (log.isDebugEnabled())
- log.debug("CSeq msg contains offer with an anonymous EPR");
- log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning, createSeqRMMsg.getMessageContext().getMessageID(),
- offeredSequenceID));
- }
-
- rMSBean = new RMSBean();
- //Set the offered EP
- rMSBean.setOfferedEndPoint(endpointAddress);
+ //Before processing this offer any further we need to perform some extra checks
+ //on the offered EP if WS-RM Spec 1.1 is being used
+ if(isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
+ Endpoint endpoint = offer.getEndpoint();
+ if (endpoint!=null) {
+ //Check to see if the offer endpoint has a value of WSA Anonymous
+ String addressingNamespace = (String) createSeqRMMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+ String endpointAddress = endpoint.getEPR().getAddress();
+ if(SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace).equals(endpointAddress)){
+ //We will still accept this offer but we should warn the user that this MEP is not always reliable or efficient
+ if (log.isDebugEnabled())
+ log.debug("CSeq msg contains offer with an anonymous EPR");
+ log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning, createSeqRMMsg.getMessageContext().getMessageID(),
+ offeredSequenceID));
+ }
+ rMSBean = new RMSBean(); //Set the offered EP
+ rMSBean.setOfferedEndPoint(endpointAddress);
- } else {
- //Don't accept the offer
- if (log.isDebugEnabled())
- log.debug("Offer Refused as it included a null endpoint");
- offerAccepted = false;
+ } else {
+ //Don't accept the offer
+ if (log.isDebugEnabled())
+ log.debug("Offer Refused as it included a null endpoint");
+ offerAccepted = false;
+ }
+ } else if (isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
+ rMSBean = new RMSBean();
}
- } else if (isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
- rMSBean = new RMSBean();
- }
-
- String outgoingSideInternalSequenceId = SandeshaUtil
- .getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
-
- if(isValidseqID){
- // Setting the CreateSequence table entry for the outgoing
- // side.
- rMSBean.setSequenceID(offeredSequenceID);
- rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
- // this is a dummy value
- rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID());
-
- //Try inserting the new RMSBean
- if(!storageManager.getRMSBeanMgr().insert(rMSBean)){
- offerAccepted = false;
+ if(isValidseqID){
+ // Setting the CreateSequence table entry for the outgoing
+ // side.
+ rMSBean.setSequenceID(offeredSequenceID);
+ rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
+ // this is a dummy value
+ rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID());
+
+ //Try inserting the new RMSBean
+ if(!storageManager.getRMSBeanMgr().insert(rMSBean)){
+ offerAccepted = false;
+ }
}
}
-
+
if (offerAccepted) {
if(rmdBean.getToEndpointReference() != null){
rMSBean.setToEndpointReference(rmdBean.getToEndpointReference());
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri Apr 3 11:35:11 2009
@@ -199,6 +199,7 @@
if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
if(rmsBean.isPollingMode()) {
rMDBean.setPollingMode(true);
+ rMDBean.setReplyToEndpointReference(rmsBean.getReplyToEndpointReference());
}
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Fri Apr 3 11:35:11 2009
@@ -63,9 +63,10 @@
msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,rmsBean.getInternalSequenceID());
//shedulling a polling request for the response side.
- if (rmsBean.getOfferedSequence()!=null) {
+ String offeredSeq = rmsBean.getOfferedSequence();
+ if (offeredSeq!=null) {
RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
- RMDBean rMDBean = rMDBeanMgr.retrieve(sequenceId);
+ RMDBean rMDBean = rMDBeanMgr.retrieve(offeredSeq);
if (rMDBean!=null && rMDBean.isPollingMode()) {
PollingManager manager = storageManager.getPollingManager();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java Fri Apr 3 11:35:11 2009
@@ -153,17 +153,31 @@
} else {
if (log.isDebugEnabled())
log.debug("Polling rms " + beanToPoll);
+
// The sequence is there, but we still only poll if we are expecting reply messages,
- // or if we don't have clean ack state. (We assume acks are clean, and only unset
+ // if we don't have clean ack state or we are waiting for a terminateSeqResponse. (We assume acks are clean, and only unset
// this if we find evidence to the contrary).
boolean cleanAcks = true;
- if (beanToPoll.getNextMessageNumber() > -1)
+ boolean waitingForTerminateSeqResponse = false;
+ long repliesExpected = 0;
+
+ if (!force && beanToPoll.getNextMessageNumber() > -1) {
cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
- long repliesExpected = beanToPoll.getExpectedReplies();
+ if(cleanAcks){
+ repliesExpected = beanToPoll.getExpectedReplies();
+ if(repliesExpected == 0){
+ //Need to check if we are waiting for a terminateSeqResponse
+ if(beanToPoll.isTerminated() == false && beanToPoll.isTerminateAdded() == true){
+ waitingForTerminateSeqResponse = true;
+ }
+ }
+ }
+ }
+
if(beanToPoll.getSequenceID() != null){
- if((force || !cleanAcks || repliesExpected > 0) && beanToPoll.getReferenceMessageStoreKey() != null){
+ if((force || !cleanAcks || repliesExpected > 0 || waitingForTerminateSeqResponse) && beanToPoll.getReferenceMessageStoreKey() != null){
pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
- }
+ }
} else {
//If seqID is null on RMS bean then it must be an RMSBean waiting for a createSeqResponse and we want to poll for these
pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
@@ -233,7 +247,7 @@
wireSeqId = rmBean.getSequenceID(); //this case could make us non-RSP compliant
}
- if(log.isDebugEnabled()) log.debug("Debug: PollingManager::pollForSequence, wireAddress=" + wireAddress + ", wireSeqId=" + wireSeqId);
+ if(log.isDebugEnabled()) log.debug("Debug: PollingManager::pollForSequence, wireAddress=" + wireAddress + " wireSeqId=" + wireSeqId);
MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context);
if(referenceMessage!=null){
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java Fri Apr 3 11:35:11 2009
@@ -51,6 +51,7 @@
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.AcksTo;
import org.apache.sandesha2.wsrm.CloseSequence;
import org.apache.sandesha2.wsrm.CloseSequenceResponse;
@@ -141,33 +142,35 @@
// Check if this service includes 2-way operations
boolean twoWayService = false;
AxisService service = applicationMsgContext.getAxisService();
- if (service != null) {
- // if the user has specified this sequence as a one way sequence it should not
- // append the sequence offer.
- if (!JavaUtils.isTrue(applicationMsgContext.getOptions().getProperty(
- SandeshaClientConstants.ONE_WAY_SEQUENCE))) {
- Parameter p = service.getParameter(Sandesha2Constants.SERVICE_CONTAINS_OUT_IN_MEPS);
- if (p != null && p.getValue() != null) {
- twoWayService = ((Boolean) p.getValue()).booleanValue();
- if (log.isDebugEnabled()) log.debug("RMMsgCreator:: twoWayService " + twoWayService);
- }
- }
- }
+ if (service != null) {
+ // if the user has specified this sequence as a one way sequence it should not
+ // append the sequence offer.
+ if (!JavaUtils.isTrue(applicationMsgContext.getOptions().getProperty(
+ SandeshaClientConstants.ONE_WAY_SEQUENCE))) {
+ Parameter p = service.getParameter(Sandesha2Constants.SERVICE_CONTAINS_OUT_IN_MEPS);
+ if (p != null && p.getValue() != null) {
+ twoWayService = ((Boolean) p.getValue()).booleanValue();
+ if (log.isDebugEnabled()) log.debug("RMMsgCreator:: twoWayService " + twoWayService);
+ }
+ }
+ }
// Adding sequence offer - if present. We send an offer if the client has assigned an
- // id, or if we are using WS-RM 1.0 and the service contains out-in MEPs
- boolean autoOffer = false;
- if(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmNamespaceValue)) {
- autoOffer = twoWayService;
- //There may not have been a way to confirm if an OUT_IN MEP is being used.
- //Therefore doing an extra check to see what Axis is using. If it's OUT_IN then we must offer.
+ // id, or if the service contains out-in MEPs
+ boolean autoOffer = twoWayService;
+
+ //There may not have been a way to confirm if an OUT_IN MEP is being used.
+ //Therefore doing an extra check to see what Axis is using. If it's OUT_IN then we must offer.
+ if(applicationMsgContext.getOperationContext() != null && applicationMsgContext.getOperationContext().getAxisOperation() != null){
if(applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_IN
- || applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_OPTIONAL_IN){
+ || applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_OPTIONAL_IN){
autoOffer = true;
}
- } else {
- // We also do some checking at this point to see if MakeConection is required to
- // enable WS-RM 1.1, and write a warning to the log if it has been disabled.
+ }
+
+ // We also do some checking at this point to see if MakeConection is required to
+ // enable WS-RM 1.1, and write a warning to the log if it has been disabled.
+ if(Sandesha2Constants.SPEC_2007_02.NS_URI.equals(rmNamespaceValue)) {
SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
if(twoWayService && !policy.isEnableMakeConnection()) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.makeConnectionWarning);
@@ -187,26 +190,20 @@
Identifier identifier = new Identifier(rmNamespaceValue);
identifier.setIndentifer(offeredSequenceId);
offerPart.setIdentifier(identifier);
- createSequencePart.setSequenceOffer(offerPart);
if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(rmNamespaceValue)) {
// We are going to send an offer, so decide which endpoint to include
EndpointReference offeredEndpoint = (EndpointReference) applicationMsgContext.getProperty(SandeshaClientConstants.OFFERED_ENDPOINT);
+ //If the offeredEndpoint hasn't been set then use the acksTo of the RMSBean
if (offeredEndpoint==null) {
- EndpointReference replyTo = applicationMsgContext.getReplyTo(); //using replyTo as the Endpoint if it is not specified
-
- if (replyTo!=null) {
- offeredEndpoint = SandeshaUtil.cloneEPR(replyTo);
- }
- }
- // Finally fall back to using an anonymous endpoint
- if (offeredEndpoint==null) {
- //The replyTo has already been set to a MC anon with UUID and so will use that same one for the offered endpoint
- offeredEndpoint = rmsBean.getReplyToEndpointReference();
+ offeredEndpoint = rmsBean.getAcksToEndpointReference();
}
+
Endpoint endpoint = new Endpoint (offeredEndpoint, rmNamespaceValue, addressingNamespace);
offerPart.setEndpoint(endpoint);
}
+
+ createSequencePart.setSequenceOffer(offerPart);
}
EndpointReference toEPR = rmsBean.getToEndpointReference();
@@ -549,6 +546,45 @@
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: RMMsgCreator::addAckMessage " + applicationMsg);
}
+
+ /**
+ * Adds an Ack Request for a specific sequence to the given application message.
+ *
+ * @param applicationMsg The Message to which the AckRequest will be added
+ * @param sequenceId - The sequence which we will request the ack for
+ * @throws SandeshaException
+ */
+ public static void addAckRequest(RMMsgContext applicationMsg, String sequenceId, RMSBean rmsBean)
+ throws SandeshaException {
+ if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Entry: RMMsgCreator::addAckRequest " + sequenceId);
+
+ String rmVersion = rmsBean.getRMVersion();
+ String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
+
+ AckRequested ackRequest = new AckRequested(rmNamespaceValue);
+
+ Identifier id = new Identifier(rmNamespaceValue);
+ id.setIndentifer(sequenceId);
+ ackRequest.setIdentifier(id);
+ ackRequest.setMustUnderstand(true);
+ applicationMsg.addAckRequested(ackRequest);
+
+ if (applicationMsg.getWSAAction()==null) {
+ applicationMsg.setAction(SpecSpecificConstants.getAckRequestAction(rmVersion));
+ applicationMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction(rmVersion));
+ }
+
+ if(applicationMsg.getMessageId() == null) {
+ applicationMsg.setMessageId(SandeshaUtil.getUUID());
+ }
+
+ // Ensure the message also contains the token that needs to be used
+ secureOutboundMessage(rmsBean, applicationMsg.getMessageContext());
+
+ if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: RMMsgCreator::addAckRequest " + applicationMsg);
+ }
public static RMMsgContext createMakeConnectionMessage (RMMsgContext referenceRMMessage,
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Fri Apr 3 11:35:11 2009
@@ -232,6 +232,17 @@
transaction = storageManager.getTransaction();
+ //If this is an application msg we need to add an ackRequest to the header
+ if(messageType == Sandesha2Constants.MessageTypes.APPLICATION){
+ //Add an ackRequest
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, senderBean.getSequenceID());
+ RMMsgCreator.addAckRequest(rmMsgCtx, senderBean.getSequenceID(), rmsBean);
+ if (transaction != null && transaction.isActive())
+ transaction.commit();
+
+ transaction = storageManager.getTransaction();
+ }
+
//if this is a sync RM exchange protocol we always have to add an ack
boolean ackPresent = false;
Iterator it = rmMsgCtx.getSequenceAcknowledgements();
@@ -628,21 +639,22 @@
int responseMessageType = responseRMMessage.getMessageType();
if(log.isDebugEnabled()) log.debug("inboundMsgType" + responseMessageType + "outgoing message type " + messageType);
- //if this is an application response msg in response to a make connection then we have to take care with the service context
- if ((messageType == Sandesha2Constants.MessageTypes.APPLICATION && responseMessageType == Sandesha2Constants.MessageTypes.APPLICATION)
- || responseMessageType != Sandesha2Constants.MessageTypes.APPLICATION) {
- if(log.isDebugEnabled()) log.debug("setting service ctx on msg as this is NOT a makeConnection>appResponse exchange pattern");
- responseMessageContext.setServiceContext(msgCtx.getServiceContext());
- }
- else{
- //Setting the AxisService object
+ //if this is an application response or createSeqResponse msg in response to a make connection then we have to take care with the service context
+ if(messageType == Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG
+ && (responseMessageType == Sandesha2Constants.MessageTypes.APPLICATION
+ || responseMessageType == Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE)){
+
+ //Setting the AxisService object
responseMessageContext.setAxisService(msgCtx.getAxisService());
- //we cannot set service ctx for application response msgs since the srvc ctx will not match the op ctx, causing
+ //we cannot set service ctx for application response msgs or createSeqResponse msgs since the srvc ctx will not match the op ctx, causing
//problems with addressing
if(log.isDebugEnabled()) log.debug("NOT setting service ctx for response type " + messageType + ", current srvc ctx =" + responseMessageContext.getServiceContext());
+ }else {
+ if(log.isDebugEnabled()) log.debug("setting service ctx on msg as this is NOT a makeConnection>appResponse or makeConnection>createSeqResponse exchange pattern");
+ responseMessageContext.setServiceContext(msgCtx.getServiceContext());
}
-
+
//If addressing is disabled we will be adding this message simply as the application response of the request message.
Boolean addressingDisabled = (Boolean) msgCtx.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES);
if (addressingDisabled!=null && Boolean.TRUE.equals(addressingDisabled)) {
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java Fri Apr 3 11:35:11 2009
@@ -19,6 +19,8 @@
package org.apache.sandesha2.wsrm;
+import java.util.Iterator;
+
import javax.xml.namespace.QName;
import org.apache.axiom.om.OMElement;
@@ -72,6 +74,15 @@
throw new SandeshaException (message);
}
+ // Sniff the addressing namespace from the Address child of the endpointElement
+ Iterator children = endpointElement.getChildElements();
+ while(children.hasNext() && addressingNamespaceValue == null) {
+ OMElement child = (OMElement) children.next();
+ if("Address".equals(child.getLocalName())) {
+ addressingNamespaceValue = child.getNamespace().getNamespaceURI();
+ }
+ }
+
return this;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org