You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/05/30 06:53:45 UTC

svn commit: r542750 - in /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2: ./ msgprocessors/ util/ workers/

Author: chamikara
Date: Tue May 29 21:53:44 2007
New Revision: 542750

URL: http://svn.apache.org/viewvc?view=rev&rev=542750
Log:
A correction to set the messageType of empty body LastMessages correctly.

Added a new MessageType for 'Polling Response Messages'.

Made LastMessage Response and Polling Response messages OutOnly. So they will go with OutOnly operations. this should be the case 
since these does not relate to the request message.

Removed the duplicate MessageRetransmissionAdjuster.adjustRetransmittion call in the MakeConnectionProcessor. This get called in the
SenderWorker.

Stopped the FaultManager from throwing out Faults directly. This causes problems in scenarios such as Ack piggybacked Application msgs.


A fix to transfer RequestResponsTransport object from the request message to the PollingResponse and LastMessageResponse messages. 
SenderWorker was fixed to pick this correctly.


Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java Tue May 29 21:53:44 2007
@@ -269,9 +269,11 @@
 		
 		int LAST_MESSAGE = 12;
 
-    int DUPLICATE_MESSAGE = 13;
+		int DUPLICATE_MESSAGE = 13;
+		
+		int POLL_RESPONSE_MESSAGE = 14;
 
-		int MAX_MESSAGE_TYPE = 13;
+		int MAX_MESSAGE_TYPE = 14;
 	}
 
 	public interface MessageParts {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Tue May 29 21:53:44 2007
@@ -530,7 +530,13 @@
 		appMsgEntry.setMessageID(rmMsg.getMessageId());
 		appMsgEntry.setMessageNumber(messageNumber);
 		appMsgEntry.setLastMessage(lastMessage);
-		appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+		
+		SOAPEnvelope envelope = rmMsg.getSOAPEnvelope();
+		if (lastMessage && envelope!=null && envelope.getBody().getFirstOMChild()==null)
+			appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.LAST_MESSAGE);
+		else
+			appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+		
 		appMsgEntry.setInboundSequenceId(inboundSequence);
 		appMsgEntry.setInboundMessageNumber(inboundMessageNumber);
 		if (outSequenceID == null) {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java Tue May 29 21:53:44 2007
@@ -9,10 +9,8 @@
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.description.OutInAxisOperation;
 import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.storage.StorageManager;
@@ -21,11 +19,9 @@
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
-import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.wsrm.Sequence;
-import org.ietf.jgss.MessageProp;
 
 public class LastMessageProcessor  implements MsgProcessor {
 
@@ -67,9 +63,11 @@
 			//there is a RMS sequence without a LastMsg entry
 			
 			MessageContext msgContext = rmMsgCtx.getMessageContext();
-//			MessageContext outMessage = MessageContextBuilder.createOutMessageContext(msgContext);
 			
-			MessageContext outMessageContext = new MessageContext ();
+			AxisOperation operation = SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.LAST_MESSAGE, 
+					rmMsgCtx.getRMSpecVersion() , msgContext.getAxisService());
+			MessageContext outMessageContext = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, operation);
+			
 			outMessageContext.setServerSide(true);
 			
 			outMessageContext.setTransportOut(msgContext.getTransportOut());
@@ -85,16 +83,7 @@
 			if (outMessageContext.getOptions()==null)
 				outMessageContext.setOptions(new Options ());
 			
-			outMessageContext.setConfigurationContext(msgContext.getConfigurationContext());
-			outMessageContext.setServiceContext(msgContext.getServiceContext());
-			outMessageContext.setAxisService(msgContext.getAxisService());
-			
-			AxisOperation operation = SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.LAST_MESSAGE, 
-																	rmMsgCtx.getRMSpecVersion() , msgContext.getAxisService());
-			
-			OperationContext operationContext = new OperationContext (operation,msgContext.getServiceContext());
-			operationContext.addMessageContext(outMessageContext);
-			
+			OperationContext operationContext = outMessageContext.getOperationContext();
 			String inboundSequenceId = (String) msgContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
 			operationContext.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID, 
 					inboundSequenceId);
@@ -103,13 +92,12 @@
 			operationContext.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_MESSAGE_NUMBER, 
 					inboundMSgNo);
 			
-			outMessageContext.setAxisOperation(operation);
-			outMessageContext.setOperationContext(operationContext);
-			
 			outMessageContext.getOptions().setAction(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_LAST_MESSAGE);
 
 			//says that the inbound msg of this was a LastMessage - so the new msg will also be a LastMessage
 			outMessageContext.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE, Boolean.TRUE);
+			outMessageContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
+			msgContext.getOperationContext().setProperty (Constants.RESPONSE_WRITTEN,Constants.VALUE_TRUE);
 			
 			AxisEngine engine = new AxisEngine (rmMsgCtx.getConfigurationContext());
 			engine.send(outMessageContext);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Tue May 29 21:53:44 2007
@@ -11,7 +11,9 @@
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.OutOnlyAxisOperation;
 import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
@@ -168,7 +170,10 @@
 		setTransportProperties (returnMessage, pollMessage);
 		
 		// Link the response to the request
-		OperationContext context = pollMessage.getMessageContext().getOperationContext();
+
+		AxisOperation operation = SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE, pollMessage.getRMSpecVersion(), pollMessage.getMessageContext().getAxisService());
+		OperationContext context = new OperationContext (operation, pollMessage.getMessageContext().getServiceContext());
+		
 		if(context == null) {
 			AxisOperation oldOperation = returnMessage.getAxisOperation();
 
@@ -181,12 +186,11 @@
 		returnMessage.setOperationContext(context);
 		
 		returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
+		returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, pollMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
 		
-		// Update the senderBeans send time.
-		boolean continueSend = 
-			MessageRetransmissionAdjuster.adjustRetransmittion(returnRMMsg, matchingMessage, returnRMMsg.getConfigurationContext(), storageManager);
+		//marking pollMessage as responsed
+		pollMessage.getMessageContext().getOperationContext().setProperty (Constants.RESPONSE_WRITTEN,Constants.VALUE_TRUE);
 		
-		//
 		// Commit the current transaction, so that the SenderWorker can do it's own locking
 		if(transaction != null && transaction.isActive()) transaction.commit();
 		
@@ -194,11 +198,10 @@
 		//This will allow Sandesha2 to consider both of following senarios equally.
 		//  1. A message being sent by the Sender thread.
 		//  2. A message being sent as a reply to an MakeConnection.
-		if (continueSend) {
-			SenderWorker worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage, returnRMMsg.getRMSpecVersion());
-			worker.setMessage(returnRMMsg);
-			worker.run();
-		}
+		SenderWorker worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
+		worker.setMessage(returnRMMsg);
+		worker.run();
+
 		
 		if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::replyToPoll");
 	}

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Tue May 29 21:53:44 2007
@@ -210,7 +210,12 @@
 
 			  SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
 			  SenderBean findSenderBean = new SenderBean ();
-			  findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+			  
+			  if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.LAST_MESSAGE)
+				  findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.LAST_MESSAGE);
+			  else
+				  findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+			  
 			  findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
 			  findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
 			  findSenderBean.setSend(true);
@@ -297,7 +302,10 @@
 				SenderBean sender = storageManager.getSenderBeanMgr().findUnique(matcher);
 				if(sender != null) {
 					if(log.isDebugEnabled()) log.debug("Deleting sender for sync-2-way message");
+					
 					storageManager.removeMessageContext(sender.getMessageContextRefKey());
+					
+					//this causes the request to be deleted even without an ack.
 					storageManager.getSenderBeanMgr().delete(messageId);
 					
 					// Try and terminate the corresponding outbound sequence

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Tue May 29 21:53:44 2007
@@ -459,44 +459,30 @@
 		
 		String SOAPNamespaceValue = factory.getSoapVersionURI();
 		
-		if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) {
-			reason.addSOAPText(reasonText);
-			referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
-			referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_REASON_LOCAL_NAME, reason);
-			referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
-		} else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals (SOAPNamespaceValue)) {
-			reason.setText(data.getReason());
-			referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
-			referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
-			referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME, reason);
-			// Need to send this message as the Axis Layer doesn't set the "SequenceFault" header
-			MessageContext faultMessageContext = 
-				MessageContextBuilder.createFaultMessageContext(referenceRMMsgContext.getMessageContext(), null);
+		reason.setText(data.getReason());
+		referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
+		referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
+		referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME, reason);
+		// Need to send this message as the Axis Layer doesn't set the "SequenceFault" header
+		MessageContext faultMessageContext = 
+		MessageContextBuilder.createFaultMessageContext(referenceRMMsgContext.getMessageContext(), null);
 
-			SOAPFaultEnvelopeCreator.addSOAPFaultEnvelope(faultMessageContext, Sandesha2Constants.SOAPVersion.v1_1, data, referenceRMMsgContext.getRMNamespaceValue());			
+		SOAPFaultEnvelopeCreator.addSOAPFaultEnvelope(faultMessageContext, Sandesha2Constants.SOAPVersion.v1_1, data, referenceRMMsgContext.getRMNamespaceValue());			
 			
-			referenceRMMsgContext.getMessageContext().getOperationContext().setProperty(
+		referenceRMMsgContext.getMessageContext().getOperationContext().setProperty(
 					org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
 						
-			// Set the action
-			faultMessageContext.setWSAAction(
+		// Set the action
+		faultMessageContext.setWSAAction(
 					SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
 			
-			if (log.isDebugEnabled())
-				log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader());
-			// Send the message
-			AxisEngine engine = new AxisEngine(faultMessageContext.getConfigurationContext());
-			engine.sendFault(faultMessageContext);
+		if (log.isDebugEnabled())
+			log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader());
+		// Send the message
+		AxisEngine engine = new AxisEngine(faultMessageContext.getConfigurationContext());
+		engine.sendFault(faultMessageContext);
 			
-			return;
-		} else {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSoapVersion);
-			throw new SandeshaException (message);
-		}
-		AxisFault fault = new AxisFault(faultColdValue.getTextAsQName(), data.getReason(), "", "", data.getDetail());
-	  fault.setFaultAction(SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
-		throw fault;		
-		
+		return;
 	}
 
 	public static boolean isRMFault (String faultSubcodeValue) {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Tue May 29 21:53:44 2007
@@ -420,6 +420,9 @@
 			newMessageContext.setProperty(MessageContext.TRANSPORT_OUT, referenceMessage
 					.getProperty(MessageContext.TRANSPORT_OUT));
 			
+			newMessageContext.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, 
+					referenceMessage.getProperty(AddressingConstants.WS_ADDRESSING_VERSION));
+			
 			copyConfiguredProperties (referenceMessage,newMessageContext);
 
 			//copying the serverSide property

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java Tue May 29 21:53:44 2007
@@ -349,10 +349,13 @@
 			case Sandesha2Constants.MessageTypes.LAST_MESSAGE:
 				result = service.getOperation(Sandesha2Constants.RM_OUT_ONLY_OPERATION);
 				break;			
-      case Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE:
-        result = service.getOperation(Sandesha2Constants.RM_DUPLICATE_OPERATION);
-        break;
-      }
+			case Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE:
+				result = service.getOperation(Sandesha2Constants.RM_DUPLICATE_OPERATION);
+				break;
+			case Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE:
+				result = service.getOperation(Sandesha2Constants.RM_OUT_ONLY_OPERATION);
+				break;	
+			}
 		} else if(rmSpecLevel.equals(Sandesha2Constants.SPEC_VERSIONS.v1_1)) {
 			switch(messageType) {
 			case Sandesha2Constants.MessageTypes.CREATE_SEQ:
@@ -365,6 +368,9 @@
 			case Sandesha2Constants.MessageTypes.ACK_REQUEST:
 				result = service.getOperation(Sandesha2Constants.RM_OUT_ONLY_OPERATION);
 				break;
+			case Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE:
+				result = service.getOperation(Sandesha2Constants.RM_OUT_ONLY_OPERATION);
+				break;		
 			}
 		}
 		

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Tue May 29 21:53:44 2007
@@ -35,12 +35,15 @@
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
 import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
@@ -60,7 +63,12 @@
 	private SenderBean senderBean = null;
 	private RMMsgContext messageToSend = null;
 	private String rmVersion = null;
+	private RMDBean incomingSequenceBean = null;
 	
+	public void setIncomingSequenceBean(RMDBean incomingSequenceBean) {
+		this.incomingSequenceBean = incomingSequenceBean;
+	}
+
 	public SenderWorker (ConfigurationContext configurationContext, SenderBean senderBean, String rmVersion) {
 		this.configurationContext = configurationContext;
 		this.senderBean = senderBean;
@@ -134,16 +142,20 @@
 			// or the message can't go anywhere. If there is nothing here then we leave the
 			// message in the sender queue, and a MakeConnection (or a retransmitted request)
 			// will hopefully pick it up soon.
-			RequestResponseTransport t = null;
 			Boolean makeConnection = (Boolean) msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE);
 			EndpointReference toEPR = msgCtx.getTo();
 
 			MessageContext inMsg = null;
 			OperationContext op = msgCtx.getOperationContext();
-			if (op != null)
-				inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-			if (inMsg != null)
-				t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+			
+			RequestResponseTransport t = (RequestResponseTransport) msgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+			
+			if (t==null) {
+				if (op != null)
+					inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+				if (inMsg != null)
+					t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+			}
 
 			// If we are anonymous, and this is not a makeConnection, then we must have a transport waiting
 			if((toEPR==null || toEPR.hasAnonymousAddress()) &&
@@ -386,7 +398,8 @@
 		
 		// Lock the message to enable retransmission update
 		senderBean = storageManager.getSenderBeanMgr().retrieve(senderBean.getMessageID());
-
+		int messageType = senderBean.getMessageType();
+		
 		// Only continue if we find a SenderBean
 		if (senderBean == null)
 			return false;
@@ -397,7 +410,9 @@
 		
 		Identifier id = null;
 
-		if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+		if(messageType == Sandesha2Constants.MessageTypes.APPLICATION ||
+		   messageType == Sandesha2Constants.MessageTypes.LAST_MESSAGE) {
+			
 			String namespace = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
 			Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 			if(sequence == null) {
@@ -417,17 +432,18 @@
 				sequence.setIdentifier(id);
 				
 				rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, sequence);
+				
 			}
 			
-		} else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+		} else if(messageType == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
 			TerminateSequence terminate = (TerminateSequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
 			id = terminate.getIdentifier();
 
-		} else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
+		} else if(messageType == Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
 			CloseSequence close = (CloseSequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
 			id = close.getIdentifier();
 		
-		} else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.ACK_REQUEST) {
+		} else if(messageType == Sandesha2Constants.MessageTypes.ACK_REQUEST) {
 			// The only time that we can have a message of this type is when we are sending a
 			// stand-alone ack request, and in that case we only expect to find a single ack
 			// request header in the message.
@@ -447,6 +463,36 @@
 
 			// Write the changes back into the message context
 			rmMsgContext.addSOAPEnvelope();
+		}
+		
+		//if this is an sync WSRM 1.0 case we always have to add an ack
+		boolean ackPresent = false;
+		Iterator it = rmMsgContext.getMessageParts (Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+		if (it.hasNext()) 
+			ackPresent = true;
+		
+		if (!ackPresent && rmMsgContext.getMessageContext().isServerSide() 
+				&&
+			(messageType==Sandesha2Constants.MessageTypes.APPLICATION || 
+		     messageType==Sandesha2Constants.MessageTypes.APPLICATION ||
+		     messageType==Sandesha2Constants.MessageTypes.UNKNOWN ||
+		     messageType==Sandesha2Constants.MessageTypes.LAST_MESSAGE)) {
+			
+			String inboundSequenceId = senderBean.getInboundSequenceId();
+			if (inboundSequenceId==null)
+				throw new SandeshaException ("InboundSequenceID is not set for the sequence:" + id);
+			
+			RMDBean findBean = new RMDBean ();
+			findBean.setSequenceID(inboundSequenceId);
+			
+			if (incomingSequenceBean==null) {
+				RMDBeanMgr rmdMgr = storageManager.getRMDBeanMgr();
+				incomingSequenceBean = rmdMgr.findUnique(findBean);
+			}
+			
+			if (incomingSequenceBean!=null) 
+				RMMsgCreator.addAckMessage(rmMsgContext, inboundSequenceId, incomingSequenceBean);
+			
 		}
 		
 		return true;



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org