You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by pa...@apache.org on 2009/04/03 13:35:12 UTC

svn commit: r761627 - in /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ polling/ util/ workers/ wsrm/

Author: parsonsd
Date: Fri Apr  3 11:35:11 2009
New Revision: 761627

URL: http://svn.apache.org/viewvc?rev=761627&view=rev
Log:
Fixes made to improve compliance with spec's and interop with other implementations:
- AckRequest's now added as piggybacks to application msgs (this is needed as implementations don't have to piggyback acks and therefore could only respond to ackRequest msgs.)
- We can ignore piggybacked ack requests as Sandesha piggyback acks at every opportunity
- Offered EP's can't be set to none so have set to use AcksTo as the offered EP if one hasn't been set
- Updated so that we poll for terminateSeqResponse
- Updated so that we offer for 1.1 as well as 1.0.  This fixes an interop issue with WCF.

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Fri Apr  3 11:35:11 2009
@@ -47,6 +47,7 @@
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RMMsgCreator;
@@ -98,6 +99,15 @@
 
 		//checks weather the ack request was a piggybacked one.
 		boolean piggybackedAckRequest = !(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.ACK_REQUEST);
+
+		//it is a piggybacked ackrequest so we can ignore as we will piggyback acks at every opportunity anyway
+		if(piggybackedAckRequest){
+			if (log.isDebugEnabled())
+			log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, it is a piggybacked ackrequest for seq " +
+					"so we can ignore as we will piggyback an ack " + Boolean.FALSE);
+			//No need to suspend. Just proceed.
+			return false;
+		}
 		
 		String sequenceId = ackRequested.getIdentifier().getIdentifier();
 
@@ -143,10 +153,7 @@
 		
 		//creating the ack message. If the ackRequest was a standalone this will be a out (response) message 
 		MessageContext ackMsgCtx = null;
-//		if (piggybackedAckRequest)
-			ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
-//		else
-//			ackMsgCtx =MessageContextBuilder.createOutMessageContext (msgContext);
+		ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
 			
 		//setting up the RMMsgContext
 		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
@@ -196,65 +203,14 @@
 			}
 
 		} else {
-			//If AcksTo is non-anonymous we will be adding a senderBean entry here. The sender is responsible 
-			//for sending it out.
-			
-			SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-
-			String key = SandeshaUtil.getUUID();
-
-			// dumping to the storage will be done be Sandesha2 Transport Sender
-			// storageManager.storeMessageContext(key,ackMsgCtx);
-
-			SenderBean ackBean = new SenderBean();
-			ackBean.setMessageContextRefKey(key);
-			ackBean.setMessageID(ackMsgCtx.getMessageID());
-			
-			//acks are sent only once.
-			ackBean.setReSend(false);
-			
-			ackBean.setSequenceID(sequenceId);
-			
-			EndpointReference to = ackMsgCtx.getTo();
-			if (to!=null)
-				ackBean.setToAddress(to.getAddress());
-
-			// this will be set to true in the sender.
-			ackBean.setSend(true);
-
-			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-			ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
 			SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(msgContext.getAxisOperation());
 
 			long ackInterval = propertyBean.getAcknowledgementInterval();
 
-			// Ack will be sent as stand alone, only after the ackknowledgement interval
+			// Ack will be sent as stand alone, only after the acknowledgement interval
 			long timeToSend = System.currentTimeMillis() + ackInterval;
 
-			// removing old acks.
-			SenderBean findBean = new SenderBean();
-			findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-			Collection<SenderBean> coll = senderBeanMgr.find(findBean);
-			Iterator<SenderBean> it = coll.iterator();
-
-			if (it.hasNext()) {
-				SenderBean oldAckBean = (SenderBean) it.next();
-				// If there is an old Ack. This Ack will be sent in the old timeToSend.
-				timeToSend = oldAckBean.getTimeToSend(); 
-				senderBeanMgr.delete(oldAckBean.getMessageID());
-			}
-
-			ackBean.setTimeToSend(timeToSend);
-
-			msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-			
-			// passing the message through sandesha2sender
-		    SandeshaUtil.executeAndStore(ackRMMsgCtx, key, storageManager);
-
-			// inserting the new Ack.
-			senderBeanMgr.insert(ackBean);
-
-			msgContext.pause();			
+			AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, storageManager);
 		}
 		
 		if (log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Fri Apr  3 11:35:11 2009
@@ -178,58 +178,66 @@
 	
 				// offered seq id
 				String offeredSequenceID = offer.getIdentifer().getIdentifier(); 
+
+				//Need to see if this is a duplicate offer.
+				//If it is we can't accept the offer as we can't be sure it has come from the same client.
+				RMSBean finderBean = new RMSBean ();
+				finderBean.setSequenceID(offeredSequenceID);
+				RMSBean rMSBean = storageManager.getRMSBeanMgr().findUnique(finderBean);
+				boolean offerAccepted = false;
+				String outgoingSideInternalSequenceId = SandeshaUtil
+					.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+
+				if(rMSBean != null){
+					if (log.isDebugEnabled())
+						log.debug("Duplicate offer so we can't accept as we can't be sure it's from the same client: " + offeredSequenceID);
+					offerAccepted = false;
+				} else {
+					boolean isValidseqID = isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager);
+					offerAccepted = true;
 				
-				boolean isValidseqID = isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager);
-				boolean offerAccepted = true;
-				
-				RMSBean rMSBean = null;
-				//Before processing this offer any further we need to perform some extra checks 
-				//on the offered EP if WS-RM Spec 1.1 is being used
-				if(isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
-					Endpoint endpoint = offer.getEndpoint();
-					if (endpoint!=null) {
-						//Check to see if the offer endpoint has a value of WSA Anonymous
-						String addressingNamespace = (String) createSeqRMMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
-						String endpointAddress = endpoint.getEPR().getAddress();
-						if(SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace).equals(endpointAddress)){
-							//We will still accept this offer but we should warn the user that this MEP is not always reliable or efficient
-							if (log.isDebugEnabled())
-								log.debug("CSeq msg contains offer with an anonymous EPR");	
-							log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning, createSeqRMMsg.getMessageContext().getMessageID(), 
-									offeredSequenceID));
-						} 
-						
-						rMSBean = new RMSBean();
-						//Set the offered EP
-						rMSBean.setOfferedEndPoint(endpointAddress);
+					//Before processing this offer any further we need to perform some extra checks 
+					//on the offered EP if WS-RM Spec 1.1 is being used
+					if(isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
+						Endpoint endpoint = offer.getEndpoint();
+						if (endpoint!=null) {
+							//Check to see if the offer endpoint has a value of WSA Anonymous
+							String addressingNamespace = (String) createSeqRMMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+							String endpointAddress = endpoint.getEPR().getAddress();
+							if(SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace).equals(endpointAddress)){
+								//We will still accept this offer but we should warn the user that this MEP is not always reliable or efficient
+								if (log.isDebugEnabled())
+									log.debug("CSeq msg contains offer with an anonymous EPR");	
+								log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning, createSeqRMMsg.getMessageContext().getMessageID(), 
+										offeredSequenceID));
+							}
+							rMSBean = new RMSBean();							//Set the offered EP
+							rMSBean.setOfferedEndPoint(endpointAddress);
 						
-					} else {
-						//Don't accept the offer
-						if (log.isDebugEnabled())
-							log.debug("Offer Refused as it included a null endpoint");	
-						offerAccepted = false;
+						} else {
+							//Don't accept the offer
+							if (log.isDebugEnabled())
+								log.debug("Offer Refused as it included a null endpoint");	
+							offerAccepted = false;
+						}
+					} else if (isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
+						rMSBean = new RMSBean(); 
 					}
-				} else if (isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
-					rMSBean = new RMSBean(); 
-				}
-				
-				String outgoingSideInternalSequenceId = SandeshaUtil
-				.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
-				
-				if(isValidseqID){
-					// Setting the CreateSequence table entry for the outgoing
-					// side.
-					rMSBean.setSequenceID(offeredSequenceID);	
-					rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
-					// this is a dummy value
-					rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); 
-					
-					//Try inserting the new RMSBean
-					if(!storageManager.getRMSBeanMgr().insert(rMSBean)){
-						offerAccepted = false;
+					if(isValidseqID){
+						// Setting the CreateSequence table entry for the outgoing
+						// side.
+						rMSBean.setSequenceID(offeredSequenceID);	
+						rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
+						// this is a dummy value
+						rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); 
+					
+						//Try inserting the new RMSBean
+						if(!storageManager.getRMSBeanMgr().insert(rMSBean)){
+							offerAccepted = false;
+						}
 					}
 				}
-				
+
 				if (offerAccepted) {
 					if(rmdBean.getToEndpointReference() != null){
 						rMSBean.setToEndpointReference(rmdBean.getToEndpointReference());

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri Apr  3 11:35:11 2009
@@ -199,6 +199,7 @@
 				if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
 					if(rmsBean.isPollingMode()) {
 						rMDBean.setPollingMode(true);
+						rMDBean.setReplyToEndpointReference(rmsBean.getReplyToEndpointReference());
 					}
 				}
 				

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Fri Apr  3 11:35:11 2009
@@ -63,9 +63,10 @@
 		msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,rmsBean.getInternalSequenceID());
 
 		//shedulling a polling request for the response side.
-		if (rmsBean.getOfferedSequence()!=null) {
+		String offeredSeq = rmsBean.getOfferedSequence();
+		if (offeredSeq!=null) {
 			RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
-			RMDBean rMDBean = rMDBeanMgr.retrieve(sequenceId);
+			RMDBean rMDBean = rMDBeanMgr.retrieve(offeredSeq);
 			
 			if (rMDBean!=null && rMDBean.isPollingMode()) {
 				PollingManager manager = storageManager.getPollingManager();

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java Fri Apr  3 11:35:11 2009
@@ -153,17 +153,31 @@
 		} else {
 			if (log.isDebugEnabled())
 				log.debug("Polling rms " + beanToPoll);
+			
 			// The sequence is there, but we still only poll if we are expecting reply messages,
-			// or if we don't have clean ack state. (We assume acks are clean, and only unset
+			// if we don't have clean ack state or we are waiting for a terminateSeqResponse. (We assume acks are clean, and only unset
 			// this if we find evidence to the contrary).
 			boolean cleanAcks = true;
-			if (beanToPoll.getNextMessageNumber() > -1)
+			boolean waitingForTerminateSeqResponse = false;
+			long repliesExpected = 0;
+			
+			if (!force && beanToPoll.getNextMessageNumber() > -1) {
 				cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
-			long  repliesExpected = beanToPoll.getExpectedReplies();
+				if(cleanAcks){
+					repliesExpected = beanToPoll.getExpectedReplies();
+					if(repliesExpected == 0){
+						//Need to check if we are waiting for a terminateSeqResponse
+						if(beanToPoll.isTerminated() == false && beanToPoll.isTerminateAdded() == true){
+							waitingForTerminateSeqResponse = true;
+						}
+					}
+				}
+			}
+
 			if(beanToPoll.getSequenceID() != null){
-				if((force || !cleanAcks || repliesExpected > 0) && beanToPoll.getReferenceMessageStoreKey() != null){
+				if((force || !cleanAcks || repliesExpected > 0 || waitingForTerminateSeqResponse) && beanToPoll.getReferenceMessageStoreKey() != null){
 					pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
-				}
+				} 
 			} else {
 				//If seqID is null on RMS bean then it must be an RMSBean waiting for a createSeqResponse and we want to poll for these
 		        pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
@@ -233,7 +247,7 @@
 			wireSeqId = rmBean.getSequenceID(); //this case could make us non-RSP compliant
 		}
 		
-		if(log.isDebugEnabled()) log.debug("Debug: PollingManager::pollForSequence, wireAddress=" + wireAddress + ", wireSeqId=" + wireSeqId);
+		if(log.isDebugEnabled()) log.debug("Debug: PollingManager::pollForSequence, wireAddress=" + wireAddress + " wireSeqId=" + wireSeqId);
 		
 		MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context);
 		if(referenceMessage!=null){

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java Fri Apr  3 11:35:11 2009
@@ -51,6 +51,7 @@
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.RMSequenceBean;
 import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.AcksTo;
 import org.apache.sandesha2.wsrm.CloseSequence;
 import org.apache.sandesha2.wsrm.CloseSequenceResponse;
@@ -141,33 +142,35 @@
 		// Check if this service includes 2-way operations
 		boolean twoWayService = false;
 		AxisService service = applicationMsgContext.getAxisService();
-        if (service != null) {
-            // if the user has specified this sequence as a one way sequence it should not
-            // append the sequence offer.
-            if (!JavaUtils.isTrue(applicationMsgContext.getOptions().getProperty(
-                    SandeshaClientConstants.ONE_WAY_SEQUENCE))) {
-                Parameter p = service.getParameter(Sandesha2Constants.SERVICE_CONTAINS_OUT_IN_MEPS);
-                if (p != null && p.getValue() != null) {
-                    twoWayService = ((Boolean) p.getValue()).booleanValue();
-                    if (log.isDebugEnabled()) log.debug("RMMsgCreator:: twoWayService " + twoWayService);
-                }
-            }
-        }
+		if (service != null) {
+			// if the user has specified this sequence as a one way sequence it should not
+			// append the sequence offer.
+			if (!JavaUtils.isTrue(applicationMsgContext.getOptions().getProperty(
+				SandeshaClientConstants.ONE_WAY_SEQUENCE))) {
+				Parameter p = service.getParameter(Sandesha2Constants.SERVICE_CONTAINS_OUT_IN_MEPS);
+				if (p != null && p.getValue() != null) {
+					twoWayService = ((Boolean) p.getValue()).booleanValue();
+					if (log.isDebugEnabled()) log.debug("RMMsgCreator:: twoWayService " + twoWayService);
+				}
+			}
+		}
 
 		// Adding sequence offer - if present. We send an offer if the client has assigned an
-		// id, or if we are using WS-RM 1.0 and the service contains out-in MEPs
-		boolean autoOffer = false;
-		if(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmNamespaceValue)) {
-			autoOffer = twoWayService;
-                        //There may not have been a way to confirm if an OUT_IN MEP is being used.
-			//Therefore doing an extra check to see what Axis is using.  If it's OUT_IN then we must offer.
+		// id, or if the service contains out-in MEPs
+		boolean autoOffer = twoWayService;
+
+		//There may not have been a way to confirm if an OUT_IN MEP is being used.
+		//Therefore doing an extra check to see what Axis is using.  If it's OUT_IN then we must offer.
+		if(applicationMsgContext.getOperationContext() != null && applicationMsgContext.getOperationContext().getAxisOperation() != null){
 			if(applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_IN
-			     || applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_OPTIONAL_IN){
+				|| applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_OPTIONAL_IN){
 				autoOffer = true;
 			}
-		} else {
-			// We also do some checking at this point to see if MakeConection is required to
-			// enable WS-RM 1.1, and write a warning to the log if it has been disabled.
+		}
+
+		// We also do some checking at this point to see if MakeConection is required to
+		// enable WS-RM 1.1, and write a warning to the log if it has been disabled.
+		if(Sandesha2Constants.SPEC_2007_02.NS_URI.equals(rmNamespaceValue)) {
 			SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
 			if(twoWayService && !policy.isEnableMakeConnection()) {
 				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.makeConnectionWarning);
@@ -187,26 +190,20 @@
 			Identifier identifier = new Identifier(rmNamespaceValue);
 			identifier.setIndentifer(offeredSequenceId);
 			offerPart.setIdentifier(identifier);
-			createSequencePart.setSequenceOffer(offerPart);
 			
 			if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(rmNamespaceValue)) {
 				// We are going to send an offer, so decide which endpoint to include
 				EndpointReference offeredEndpoint = (EndpointReference) applicationMsgContext.getProperty(SandeshaClientConstants.OFFERED_ENDPOINT);
+				//If the offeredEndpoint hasn't been set then use the acksTo of the RMSBean
 				if (offeredEndpoint==null) {
-					EndpointReference replyTo = applicationMsgContext.getReplyTo();  //using replyTo as the Endpoint if it is not specified
-				
-					if (replyTo!=null) {
-						offeredEndpoint = SandeshaUtil.cloneEPR(replyTo);
-					}
-				}
-				// Finally fall back to using an anonymous endpoint
-				if (offeredEndpoint==null) {
-					//The replyTo has already been set to a MC anon with UUID and so will use that same one for the offered endpoint  
-					offeredEndpoint = rmsBean.getReplyToEndpointReference();
+					offeredEndpoint = rmsBean.getAcksToEndpointReference();
 				}
+				
 				Endpoint endpoint = new Endpoint (offeredEndpoint, rmNamespaceValue, addressingNamespace);
 				offerPart.setEndpoint(endpoint);
 			}
+			
+			createSequencePart.setSequenceOffer(offerPart);
 		}
 
 		EndpointReference toEPR = rmsBean.getToEndpointReference();
@@ -549,6 +546,45 @@
 		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
 			log.debug("Exit: RMMsgCreator::addAckMessage " + applicationMsg);
 	}
+
+	/**
+	 * Adds an Ack Request for a specific sequence to the given application message.
+	 * 
+	 * @param applicationMsg The Message to which the AckRequest will be added
+	 * @param sequenceId - The sequence which we will request the ack for
+	 * @throws SandeshaException
+	 */
+	public static void addAckRequest(RMMsgContext applicationMsg, String sequenceId, RMSBean rmsBean)
+			throws SandeshaException {
+		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Entry: RMMsgCreator::addAckRequest " + sequenceId);
+		
+		String rmVersion = rmsBean.getRMVersion();
+		String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
+		
+		AckRequested ackRequest = new AckRequested(rmNamespaceValue);	
+		
+		Identifier id = new Identifier(rmNamespaceValue);
+		id.setIndentifer(sequenceId);
+		ackRequest.setIdentifier(id);
+		ackRequest.setMustUnderstand(true);
+		applicationMsg.addAckRequested(ackRequest);
+
+		if (applicationMsg.getWSAAction()==null) {
+			applicationMsg.setAction(SpecSpecificConstants.getAckRequestAction(rmVersion));
+			applicationMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction(rmVersion));
+		}
+		
+		if(applicationMsg.getMessageId() == null) {
+			applicationMsg.setMessageId(SandeshaUtil.getUUID());
+		}
+				
+		// Ensure the message also contains the token that needs to be used
+		secureOutboundMessage(rmsBean, applicationMsg.getMessageContext());
+			
+		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
+			log.debug("Exit: RMMsgCreator::addAckRequest " + applicationMsg);
+	}
 	
 	
 	public static RMMsgContext createMakeConnectionMessage (RMMsgContext referenceRMMessage,

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Fri Apr  3 11:35:11 2009
@@ -232,6 +232,17 @@
 			
 			transaction = storageManager.getTransaction();
 
+			//If this is an application msg we need to add an ackRequest to the header		
+			if(messageType == Sandesha2Constants.MessageTypes.APPLICATION){				
+				//Add an ackRequest				
+				RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, senderBean.getSequenceID());				
+				RMMsgCreator.addAckRequest(rmMsgCtx, senderBean.getSequenceID(), rmsBean);								
+				if (transaction != null && transaction.isActive()) 					
+					transaction.commit();
+				
+				transaction = storageManager.getTransaction();			
+			}						
+			
 			//if this is a sync RM exchange protocol we always have to add an ack
 			boolean ackPresent = false;
 			Iterator it = rmMsgCtx.getSequenceAcknowledgements();
@@ -628,21 +639,22 @@
 				int responseMessageType = responseRMMessage.getMessageType();
 				if(log.isDebugEnabled()) log.debug("inboundMsgType" + responseMessageType + "outgoing message type " + messageType);
 				 				
-				//if this is an application response msg in response to a make connection then we have to take care with the service context
-				if ((messageType == Sandesha2Constants.MessageTypes.APPLICATION && responseMessageType == Sandesha2Constants.MessageTypes.APPLICATION)
-					|| responseMessageType != Sandesha2Constants.MessageTypes.APPLICATION) {
-					if(log.isDebugEnabled()) log.debug("setting service ctx on msg as this is NOT a makeConnection>appResponse exchange pattern");
-					responseMessageContext.setServiceContext(msgCtx.getServiceContext());
-				}
-				else{
-                                        //Setting the AxisService object
+				//if this is an application response or createSeqResponse msg in response to a make connection then we have to take care with the service context
+				if(messageType == Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG 
+						&& (responseMessageType == Sandesha2Constants.MessageTypes.APPLICATION 
+								|| responseMessageType == Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE)){
+				
+					//Setting the AxisService object
 					responseMessageContext.setAxisService(msgCtx.getAxisService());
 
-					//we cannot set service ctx for application response msgs since the srvc ctx will not match the op ctx, causing
+					//we cannot set service ctx for application response msgs or createSeqResponse msgs since the srvc ctx will not match the op ctx, causing
 					//problems with addressing
 					if(log.isDebugEnabled()) log.debug("NOT setting service ctx for response type " + messageType + ", current srvc ctx =" + responseMessageContext.getServiceContext());
+				}else {
+					if(log.isDebugEnabled()) log.debug("setting service ctx on msg as this is NOT a makeConnection>appResponse or makeConnection>createSeqResponse exchange pattern");
+					responseMessageContext.setServiceContext(msgCtx.getServiceContext());
 				}
-				
+	
 				//If addressing is disabled we will be adding this message simply as the application response of the request message.
 				Boolean addressingDisabled = (Boolean) msgCtx.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES);
 				if (addressingDisabled!=null && Boolean.TRUE.equals(addressingDisabled)) {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java Fri Apr  3 11:35:11 2009
@@ -19,6 +19,8 @@
 
 package org.apache.sandesha2.wsrm;
 
+import java.util.Iterator;
+
 import javax.xml.namespace.QName;
 
 import org.apache.axiom.om.OMElement;
@@ -72,6 +74,15 @@
 			throw new SandeshaException (message);
 		}
 		
+		// Sniff the addressing namespace from the Address child of the endpointElement
+		Iterator children = endpointElement.getChildElements();
+		while(children.hasNext() && addressingNamespaceValue == null) {
+			OMElement child = (OMElement) children.next();
+			if("Address".equals(child.getLocalName())) {
+				addressingNamespaceValue = child.getNamespace().getNamespaceURI();
+			}
+		}
+		
 		return this;
 	}
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org