You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/04/23 11:55:16 UTC

svn commit: r531400 [4/18] - in /webservices/sandesha/trunk/java/modules: client/ core/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/sandesha2/ core/src/main/java/org/...

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,358 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Collection;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.CreateSequence;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+import org.apache.sandesha2.wsrm.Endpoint;
+import org.apache.sandesha2.wsrm.SequenceOffer;
+
+/**
+ * Responsible for processing an incoming Create Sequence message.
+ */
+
+public class CreateSeqMsgProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(CreateSeqMsgProcessor.class);
+
+	public boolean processInMessage(RMMsgContext createSeqRMMsg) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: CreateSeqMsgProcessor::processInMessage");
+
+		try {
+			CreateSequence createSeqPart = (CreateSequence) createSeqRMMsg
+					.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+			if (createSeqPart == null) {
+				if (log.isDebugEnabled())
+					log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqParts));
+				FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg, 
+																										SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqParts), 
+																										new Exception());
+				// Return false if an Exception hasn't been thrown.
+				if (log.isDebugEnabled())
+					log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);				
+				return false;
+
+			}
+	
+			MessageContext createSeqMsg = createSeqRMMsg.getMessageContext();
+			ConfigurationContext context = createSeqMsg.getConfigurationContext();
+			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+			
+			// If the inbound CreateSequence includes a SecurityTokenReference then
+			// ask the security manager to resolve that to a token for us. We also
+			// check that the Create was secured using the token.
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+			OMElement theSTR = createSeqPart.getSecurityTokenReference();
+			SecurityToken token = null;
+			if(theSTR != null) {
+				MessageContext msgcontext = createSeqRMMsg.getMessageContext();
+				token = secManager.getSecurityToken(theSTR, msgcontext);
+				
+				// The create must be the body part of this message, so we check the
+				// security of that element.
+				OMElement body = msgcontext.getEnvelope().getBody();
+				secManager.checkProofOfPossession(token, body, msgcontext);
+			}
+	
+			//if toAddress is RMAnon we may need to terminate the request side sequence here.
+			EndpointReference toEPR = createSeqMsg.getTo();
+			if (toEPR.hasAnonymousAddress()) {
+	
+				RMSBean findBean = new RMSBean ();
+				findBean.setReplyToEPR(toEPR.getAddress());
+				findBean.setTerminationPauserForCS(true);
+				
+				//TODO recheck
+				RMSBean rmsBean = storageManager.getRMSBeanMgr().findUnique(findBean);
+				if (rmsBean!=null) {					
+					//AckManager hs not done the termination. Do the termination here.
+					MessageContext requestSideRefMessage = storageManager.retrieveMessageContext(rmsBean.getReferenceMessageStoreKey(),context);
+					if (requestSideRefMessage==null) {
+						FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg, 
+								SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getInternalSequenceID()),
+								new Exception());						
+						// Return false if an Exception hasn't been thrown.
+						if (log.isDebugEnabled())
+							log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);				
+						return false;
+					}
+					
+					RMMsgContext requestSideRefRMMessage = MsgInitializer.initializeMessage(requestSideRefMessage);
+					TerminateManager.addTerminateSequenceMessage(requestSideRefRMMessage, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+				}
+			}
+				
+
+			MessageContext outMessage = null;
+	
+			// Create the new sequence id, as well as establishing the beans that handle the
+			// sequence state.
+			RMDBean rmdBean = SequenceManager.setupNewSequence(createSeqRMMsg, storageManager, secManager, token);
+				
+			RMMsgContext createSeqResponse = RMMsgCreator.createCreateSeqResponseMsg(createSeqRMMsg, rmdBean);
+			outMessage = createSeqResponse.getMessageContext();
+			// Set a message ID for this Create Sequence Response message
+			outMessage.setMessageID(SandeshaUtil.getUUID());
+				
+			createSeqResponse.setFlow(MessageContext.OUT_FLOW);
+	
+			// for making sure that this won't be processed again
+			createSeqResponse.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true"); 
+			
+			CreateSequenceResponse createSeqResPart = (CreateSequenceResponse) createSeqResponse
+					.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+	
+				// OFFER PROCESSING
+			SequenceOffer offer = createSeqPart.getSequenceOffer();
+			if (offer != null) {
+				Accept accept = createSeqResPart.getAccept();
+				if (accept == null) {
+					if (log.isDebugEnabled())
+						log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAcceptPart));
+					FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg, 
+																											SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAcceptPart), 
+																											new Exception());
+					// Return false if an Exception hasn't been thrown.
+					if (log.isDebugEnabled())
+						log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);				
+					return false;
+				}
+	
+				// offered seq id
+				String offeredSequenceID = offer.getIdentifer().getIdentifier(); 
+				
+				boolean offerEcepted = offerAccepted(offeredSequenceID, context, createSeqRMMsg, storageManager);
+	
+				if (offerEcepted) {
+					// Setting the CreateSequence table entry for the outgoing
+					// side.
+					RMSBean rMSBean = new RMSBean();
+					rMSBean.setSequenceID(offeredSequenceID);
+					String outgoingSideInternalSequenceId = SandeshaUtil
+							.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+					rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
+					// this is a dummy value
+					rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); 
+						
+					rMSBean.setToEPR(rmdBean.getToEPR());
+					rMSBean.setAcksToEPR(rmdBean.getToEPR());  // The acks need to flow back into this endpoint
+					rMSBean.setReplyToEPR(rmdBean.getReplyToEPR());
+					rMSBean.setLastActivatedTime(System.currentTimeMillis());
+					rMSBean.setRMVersion(rmdBean.getRMVersion());
+					rMSBean.setClientCompletedMessages(new RangeString());
+	
+					// Setting sequence properties for the outgoing sequence.
+					// Only will be used by the server side response path. Will
+					// be wasted properties for the client side.
+						
+					Endpoint endpoint = offer.getEndpoint();
+					if (endpoint!=null) {
+						// setting the OfferedEndpoint
+						rMSBean.setOfferedEndPoint(endpoint.getEPR().getAddress());
+					}
+		
+	
+					// Store the inbound token (if any) with the new sequence
+					rMSBean.setSecurityTokenData(rmdBean.getSecurityTokenData());
+					
+					// If this new sequence has anonymous acksTo, then we must poll for the acks
+					// If the inbound sequence is targetted at the WSRM anonymous URI, we need to start
+					// polling for this sequence.
+					String acksTo = rMSBean.getAcksToEPR();
+					EndpointReference reference = new EndpointReference(acksTo);
+					if ((acksTo == null || reference.hasAnonymousAddress()) &&
+						Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqRMMsg.getRMSpecVersion())) {
+						rMSBean.setPollingMode(true);
+					}
+					
+					// Set the SOAP Version for this sequence.
+					rMSBean.setSoapVersion(SandeshaUtil.getSOAPVersion(createSeqRMMsg.getSOAPEnvelope()));
+
+					storageManager.getRMSBeanMgr().insert(rMSBean);
+					
+					SandeshaUtil.startWorkersForSequence(context, rMSBean);
+					
+				} else {
+					// removing the accept part.
+					createSeqResPart.setAccept(null);
+					createSeqResponse.addSOAPEnvelope();
+				}
+			}
+							
+			//TODO add createSequenceResponse message as the referenceMessage to the RMDBean.
+	
+			outMessage.setResponseWritten(true);
+	
+			rmdBean.setLastActivatedTime(System.currentTimeMillis());
+			
+			// If the inbound sequence is targetted at the anonymous URI, we need to start
+			// polling for this sequence.
+			if (toEPR.hasAnonymousAddress()) {
+				if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqRMMsg.getRMSpecVersion())) {
+					rmdBean.setPollingMode(true);
+				}
+			}
+			
+			storageManager.getRMDBeanMgr().update(rmdBean);
+	
+			SandeshaUtil.startWorkersForSequence(context, rmdBean);
+
+			AxisEngine engine = new AxisEngine(context);
+			try{
+				engine.send(outMessage);				
+			}
+			catch(AxisFault e){
+				FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg, 
+						SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendCreateSeqResponse, 
+																						 SandeshaUtil.getStackTraceFromException(e)), e);
+				// Return false if an Exception hasn't been thrown.
+				if (log.isDebugEnabled())
+					log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);				
+				return false;
+			}
+	
+			EndpointReference replyTo = createSeqMsg.getReplyTo();
+			if(replyTo == null || replyTo.hasAnonymousAddress()) {
+				createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+			} else {
+				createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+			}
+			
+	//		SequencePropertyBean findBean = new SequencePropertyBean ();
+	//		findBean.setName (Sandesha2Constants.SequenceProperties.TERMINATE_ON_CREATE_SEQUENCE);
+	//		findBean.setValue(createSeqMsg.getTo().getAddress());
+			
+			createSeqRMMsg.pause();
+		}
+		catch (Exception e) {
+			if (log.isDebugEnabled())
+				log.debug("Caught an exception processing CreateSequence message", e);
+			// Does the message context already contain a fault ?
+			// If it doesn't then we can add the CreateSequenceRefusedFault.
+			if (createSeqRMMsg.getMessageContext().getProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME) == null && 
+					createSeqRMMsg.getMessageContext().getProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME) == null) {
+				// Add the fault details to the message
+				FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg, SandeshaUtil.getStackTraceFromException(e), e);				
+				
+				// Return false if an Exception hasn't been thrown.
+				if (log.isDebugEnabled())
+					log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);				
+				return false;
+			} 
+				
+			// If we are SOAP12 and we have already processed the fault - rethrow the exception
+			if (createSeqRMMsg.getMessageContext().getProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME) != null) {
+					// throw the original exception
+					if (e instanceof AxisFault)
+						throw (AxisFault)e;
+					 
+					throw new SandeshaException(e);
+			}
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
+		return true;
+	}
+
+	private boolean offerAccepted(String sequenceId, ConfigurationContext configCtx, RMMsgContext createSeqRMMsg,
+			StorageManager storageManager) throws SandeshaException {
+		if (log.isDebugEnabled())
+			log.debug("Enter: CreateSeqMsgProcessor::offerAccepted, " + sequenceId);
+
+		if ("".equals(sequenceId)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
+			return false;
+		}
+
+		RMSBean createSeqFindBean = new RMSBean();
+		createSeqFindBean.setSequenceID(sequenceId);
+		Collection arr = storageManager.getRMSBeanMgr().find(createSeqFindBean);
+
+		if (arr.size() > 0) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
+			return false;
+		}
+		if (sequenceId.length() <= 1) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
+			return false; // Single character offers are NOT accepted.
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + true);
+		return true;
+	}
+
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+		if (log.isDebugEnabled())
+			log.debug("Enter: CreateSeqMsgProcessor::processOutMessage");
+
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+
+		// adding the SANDESHA_LISTENER
+		SandeshaListener faultCallback = (SandeshaListener) msgCtx.getOptions().getProperty(
+				SandeshaClientConstants.SANDESHA_LISTENER);
+		if (faultCallback != null) {
+			OperationContext operationContext = msgCtx.getOperationContext();
+			if (operationContext != null) {
+				operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER, faultCallback);
+			}
+		}
+		if (log.isDebugEnabled())
+			log.debug("Exit: CreateSeqMsgProcessor::processOutMessage " + Boolean.FALSE);
+		return false;
+	}
+	
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,257 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+
+/**
+ * Responsible for processing an incoming Create Sequence Response message.
+ */
+
+public class CreateSeqResponseMsgProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
+
+	public boolean processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
+
+		ConfigurationContext configCtx = createSeqResponseRMMsgCtx.getMessageContext().getConfigurationContext();
+
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+				.getAxisConfiguration());
+
+		// Processing the create sequence response.
+
+		CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
+				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+		if (createSeqResponsePart == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqResponse);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		String newOutSequenceId = createSeqResponsePart.getIdentifier().getIdentifier();
+		if (newOutSequenceId == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.newSeqIdIsNull);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		RelatesTo relatesTo = createSeqResponseRMMsgCtx.getMessageContext().getRelatesTo();
+		String createSeqMsgId = null;
+		if (relatesTo != null) {
+			createSeqMsgId = relatesTo.getValue();
+		} else {
+			// Work out the related message from the operation context
+			OperationContext context = createSeqResponseRMMsgCtx.getMessageContext().getOperationContext();
+			MessageContext createSeq = context.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+			if(createSeq != null) createSeqMsgId = createSeq.getMessageID();
+		}
+		if(createSeqMsgId == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
+			log.error(message);
+			throw new SandeshaException(message);
+		}
+
+		SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+		RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
+
+		RMSBean rmsBean = rmsBeanMgr.retrieve(createSeqMsgId);
+		if (rmsBean == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		// Check that the create sequence response message proves possession of the correct token
+		String tokenData = rmsBean.getSecurityTokenData();
+		if(tokenData != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+			MessageContext crtSeqResponseCtx = createSeqResponseRMMsgCtx.getMessageContext();
+			OMElement body = crtSeqResponseCtx.getEnvelope().getBody();
+			SecurityToken token = secManager.recoverSecurityToken(tokenData);
+			secManager.checkProofOfPossession(token, body, crtSeqResponseCtx);
+		}
+
+		String internalSequenceId = rmsBean.getInternalSequenceID();
+		if (internalSequenceId == null || "".equals(internalSequenceId)) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.tempSeqIdNotSet);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+		createSeqResponseRMMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
+		
+		rmsBean.setSequenceID(newOutSequenceId);
+
+		// We must poll for any reply-to that uses the anonymous URI. If it is a ws-a reply to then
+		// the create must include an offer (or this client cannot be identified). If the reply-to
+		// is the RM anon URI template then the offer is not required.
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
+			String acksTo = rmsBean.getAcksToEPR();
+			EndpointReference reference = new EndpointReference(acksTo);
+			if(acksTo == null || reference.hasAnonymousAddress()) {
+				rmsBean.setPollingMode(true);
+			}
+		}
+
+		SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
+		if (createSequenceSenderBean == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
+
+		// deleting the create sequence entry.
+		retransmitterMgr.delete(createSeqMsgId);
+				
+		// processing for accept (offer has been sent)
+		Accept accept = createSeqResponsePart.getAccept();
+		if (accept != null) {
+
+			// TODO this should be detected in the Fault manager.
+			if (rmsBean.getOfferedSequence() == null) {
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
+				log.debug(message);
+				throw new SandeshaException(message);
+			}
+
+			RMDBean rMDBean = new RMDBean();
+			
+			EndpointReference acksToEPR = accept.getAcksTo().getEPR();
+			rMDBean.setAcksToEPR(acksToEPR.getAddress());
+			rMDBean.setSequenceID(rmsBean.getOfferedSequence());
+			rMDBean.setNextMsgNoToProcess(1);
+			rMDBean.setOutboundInternalSequence(rmsBean.getInternalSequenceID());
+
+			//Storing the referenceMessage of the sending side sequence as the reference message
+			//of the receiving side as well.
+			//This can be used when creating new outgoing messages.
+			
+			String referenceMsgStoreKey = rmsBean.getReferenceMessageStoreKey();
+			MessageContext referenceMsg = storageManager.retrieveMessageContext(referenceMsgStoreKey, configCtx);
+			
+			String newMessageStoreKey = SandeshaUtil.getUUID();
+			storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
+			
+			rMDBean.setReferenceMessageKey(newMessageStoreKey);
+
+			// If this is an offered sequence that needs polling then we need to setup the
+			// rmdBean for polling too, so that it still gets serviced after the outbound
+			// sequence terminates.
+			if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
+				if(rmsBean.isPollingMode()) {
+					rMDBean.setPollingMode(true);
+				}
+			}
+			
+			String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
+			rMDBean.setRMVersion(rmSpecVersion);
+			
+			EndpointReference toEPR = createSeqResponseRMMsgCtx.getTo();
+			if (toEPR==null) {
+				//Most probably this is a sync response message, using the replyTo of the request message
+				OperationContext operationContext = createSeqResponseRMMsgCtx.getMessageContext().getOperationContext();
+				if (operationContext!=null) {
+					MessageContext createSequnceMessage = operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+					if (createSequnceMessage!=null)
+						toEPR = createSequnceMessage.getReplyTo();
+				}
+			}
+			
+			if (toEPR!=null) 
+				rMDBean.setToAddress(toEPR.getAddress());
+			
+			rMDBean.setServerCompletedMessages(new RangeString());
+			RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+
+			// Store the security token for the offered sequence
+			rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
+			
+			rMDBean.setLastActivatedTime(System.currentTimeMillis());
+			
+			rmdBeanMgr.insert(rMDBean);
+			SandeshaUtil.startWorkersForSequence(configCtx, rMDBean);
+		}
+		
+		rmsBean.setLastActivatedTime(System.currentTimeMillis());
+		rmsBeanMgr.update(rmsBean);
+		SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+
+		// Locate and update all of the messages for this sequence, now that we know
+		// the sequence id.
+		SenderBean target = new SenderBean();
+		target.setInternalSequenceID(internalSequenceId);
+		target.setSend(false);
+		
+		Iterator iterator = retransmitterMgr.find(target).iterator();
+		while (iterator.hasNext()) {
+			SenderBean tempBean = (SenderBean) iterator.next();
+
+			// asking to send the application msssage
+			tempBean.setSend(true);
+			tempBean.setSequenceID(newOutSequenceId);
+			retransmitterMgr.update(tempBean);
+		}
+
+		createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
+				org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+
+		createSeqResponseRMMsgCtx.pause();
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
+		return true;
+	}
+
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+		if (log.isDebugEnabled()) {
+			log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
+			log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
+		}
+		return false;
+
+	}
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,192 @@
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.wsrm.Address;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.MakeConnection;
+import org.apache.sandesha2.wsrm.MessagePending;
+
+/**
+ * This class is responsible for processing MakeConnection request messages that come to the system.
+ * MakeConnection is only supported by WSRM 1.1
+ * Here a client can ask for reply messages using a polling mechanism, so even clients without real
+ * endpoints can ask for reliable response messages.
+ */
+public class MakeConnectionProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(MakeConnectionProcessor.class);
+
+	/**
+	 * Prosesses incoming MakeConnection request messages.
+	 * A message is selected by the set of SenderBeans that are waiting to be sent.
+	 * This is processed using a SenderWorker. 
+	 */
+	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::processInMessage " + rmMsgCtx.getSOAPEnvelope().getBody());
+
+		MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION);
+		Address address = makeConnection.getAddress();
+		Identifier identifier = makeConnection.getIdentifier();
+		
+		ConfigurationContext configurationContext = rmMsgCtx.getConfigurationContext();
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+		
+		SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+		
+		//selecting the set of SenderBeans that suit the given criteria.
+		SenderBean findSenderBean = new SenderBean ();
+		findSenderBean.setSend(true);
+		findSenderBean.setTransportAvailable(false);
+		
+		if (address!=null)
+			findSenderBean.setToAddress(address.getAddress());
+		
+		if (identifier!=null)
+			findSenderBean.setSequenceID(identifier.getIdentifier());
+		
+		//finding the beans that go with the criteria of the passed SenderBean
+		//The reSend flag is ignored for this selection, so there is no need to
+		//set it.
+		Collection collection = senderBeanMgr.find(findSenderBean);
+		
+		//removing beans that does not pass the resend test
+		for (Iterator it=collection.iterator();it.hasNext();) {
+			SenderBean bean = (SenderBean) it.next();
+			if (!bean.isReSend() && bean.getSentCount()>0)
+				it.remove();
+		}
+		
+		//selecting a bean to send RANDOMLY. TODO- Should use a better mechanism.
+		int size = collection.size();
+		int itemToPick=-1;
+		
+		boolean pending = false;
+		if (size>0) {
+			Random random = new Random ();
+			itemToPick = random.nextInt(size);
+		}
+
+		if (size>1)
+			pending = true;  //there are more than one message to be delivered using the makeConnection.
+							 //So the MessagePending header should have value true;
+		
+		Iterator it = collection.iterator();
+		
+		SenderBean senderBean = null;
+		for (int item=0;item<size;item++) {
+		    senderBean = (SenderBean) it.next();
+			if (item==itemToPick)
+				break;
+		}
+
+		if (senderBean==null) {
+			if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message found");
+			return false;
+		}
+		
+		replyToPoll(rmMsgCtx, senderBean, storageManager, pending, makeConnection.getNamespaceValue());
+		
+		if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage");
+		return false;
+	}
+	
+	public static void replyToPoll(RMMsgContext pollMessage,
+			SenderBean matchingMessage,
+			StorageManager storageManager,
+			boolean pending,
+			String namespace)
+	throws AxisFault
+	{
+		if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::replyToPoll");
+		TransportOutDescription transportOut = pollMessage.getMessageContext().getTransportOut();
+		if (transportOut==null) {
+			String message = SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.cantSendMakeConnectionNoTransportOut);
+			if(log.isDebugEnabled()) log.debug(message);
+			throw new SandeshaException (message);
+		}
+			
+		String messageStorageKey = matchingMessage.getMessageContextRefKey();
+		MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey,pollMessage.getConfigurationContext());
+		if (returnMessage==null) {
+			String message = "Cannot find the message stored with the key:" + messageStorageKey;
+			if(log.isDebugEnabled()) log.debug(message);
+			// Someone else has either removed the sender & message, or another make connection got here first.
+			return;
+		}
+		
+		if(pending) addMessagePendingHeader(returnMessage, namespace);
+		
+		RMMsgContext returnRMMsg = MsgInitializer.initializeMessage(returnMessage);
+		setTransportProperties (returnMessage, pollMessage);
+		
+		// Link the response to the request
+		OperationContext context = pollMessage.getMessageContext().getOperationContext();
+		if(context == null) {
+			AxisOperation oldOperation = returnMessage.getAxisOperation();
+
+			context = ContextFactory.createOperationContext(oldOperation, returnMessage.getServiceContext()); //new OperationContext(oldOperation);
+
+			context.addMessageContext(pollMessage.getMessageContext());
+			pollMessage.getMessageContext().setOperationContext(context);
+		}
+		context.addMessageContext(returnMessage);
+		returnMessage.setOperationContext(context);
+		
+		returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
+		
+		//running the MakeConnection through a SenderWorker.
+		//This will allow Sandesha2 to consider both of following senarios equally.
+		//  1. A message being sent by the Sender thread.
+		//  2. A message being sent as a reply to an MakeConnection.
+		SenderWorker worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage);
+		worker.setMessage(returnRMMsg);
+		worker.run();
+		
+		if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::replyToPoll");
+	}
+	
+	private static void addMessagePendingHeader (MessageContext returnMessage, String namespace) throws SandeshaException {
+		MessagePending messagePending = new MessagePending(namespace);
+		messagePending.setPending(true);
+		messagePending.toSOAPEnvelope(returnMessage.getEnvelope());
+	}
+
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+		return false;
+	}
+
+	private static void setTransportProperties (MessageContext returnMessage, RMMsgContext makeConnectionMessage) {
+        returnMessage.setProperty(MessageContext.TRANSPORT_OUT,makeConnectionMessage.getProperty(MessageContext.TRANSPORT_OUT));
+        returnMessage.setProperty(Constants.OUT_TRANSPORT_INFO,makeConnectionMessage.getProperty(Constants.OUT_TRANSPORT_INFO));
+        
+        Object contentType = makeConnectionMessage.getProperty(Constants.Configuration.CONTENT_TYPE);
+        returnMessage.setProperty(Constants.Configuration.CONTENT_TYPE, contentType);
+
+        returnMessage.setTransportOut(makeConnectionMessage.getMessageContext().getTransportOut());
+	}
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,44 @@
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SequenceEntry;
+import org.apache.sandesha2.wsrm.MessagePending;
+
+public class MessagePendingProcessor {
+
+	private static final Log log = LogFactory.getLog(MessagePendingProcessor.class);
+	
+	public void processMessagePendingHeaders(RMMsgContext message) throws AxisFault {
+		
+		if (log.isDebugEnabled())
+			log.debug("Enter: MessagePendingProcessor::processMessagePendingHeaders");
+
+		MessagePending messagePending = (MessagePending) message.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
+		if (messagePending!=null) {
+			boolean pending = messagePending.isPending();
+			if (pending) {
+				SequenceEntry entry = (SequenceEntry) message.getProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY);
+				if(entry != null) {
+					ConfigurationContext context = message.getConfigurationContext();
+					StorageManager storage = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+					PollingManager pollingManager = storage.getPollingManager();
+					if(pollingManager != null) pollingManager.schedulePollingRequest(entry.getSequenceId(), entry.isRmSource());
+				}
+			}
+		}
+			
+		
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: MessagePendingProcessor::processMessagePendingHeaders");
+	}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,43 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.sandesha2.RMMsgContext;
+
+/**
+ * The message processor interface.
+ */
+
+public interface MsgProcessor {
+	
+	/**
+	 * @param rmMsgCtx
+	 * @return true if the msg context has been paused
+	 * @throws AxisFault
+	 */
+	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+	
+	/**
+	 * 
+	 * @param rmMsgCtx
+	 * @return true if the msg context has been paused
+	 * @throws AxisFault
+	 */
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+}
\ No newline at end of file

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,52 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+
+/**
+ * Used to get a suitable message processor. Given the message type.
+ */
+
+public class MsgProcessorFactory {
+
+	public static MsgProcessor getMessageProcessor(RMMsgContext rmMessageContext) {
+
+		int messageType = rmMessageContext.getMessageType();
+
+		switch (messageType) {
+		case (Sandesha2Constants.MessageTypes.CREATE_SEQ):
+			return new CreateSeqMsgProcessor();
+		case (Sandesha2Constants.MessageTypes.TERMINATE_SEQ):
+			return new TerminateSeqMsgProcessor();
+		case (Sandesha2Constants.MessageTypes.TERMINATE_SEQ_RESPONSE):
+			return new TerminateSeqResponseMsgProcessor();
+		case (Sandesha2Constants.MessageTypes.APPLICATION):
+			return new ApplicationMsgProcessor();
+		case (Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE):
+			return new CreateSeqResponseMsgProcessor();
+		case (Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE):
+			return new CloseSequenceProcessor();
+		case (Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG):
+			return new MakeConnectionProcessor ();
+		default:
+			return null;
+		}
+	}
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * Responsible for processing the Sequence header (if present) on an incoming
+ * message.
+ */
+
+public class SequenceProcessor {
+
+	private static final Log log = LogFactory.getLog(SequenceProcessor.class);
+
+	public InvocationResponse processSequenceHeader(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: SequenceProcessor::processSequenceHeader");
+		InvocationResponse result = InvocationResponse.CONTINUE;
+		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+		if(sequence != null) {
+			// This is a reliable message, so hand it on to the main routine
+			result = processReliableMessage(rmMsgCtx);
+		} else {
+			if (log.isDebugEnabled())
+				log.debug("Message does not contain a sequence header");
+		}
+		if (log.isDebugEnabled())
+			log.debug("Exit: SequenceProcessor::processSequenceHeader " + result);
+		return result;
+	}
+	
+	public InvocationResponse processReliableMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: SequenceProcessor::processReliableMessage");
+
+		InvocationResponse result = InvocationResponse.CONTINUE;
+		
+		if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+				&& rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
+			return result;
+		}
+
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
+		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+		String sequenceId = sequence.getIdentifier().getIdentifier();
+		long msgNo = sequence.getMessageNumber().getMessageNumber();
+		boolean lastMessage = sequence.getLastMessage() != null;
+		
+		// Check that both the Sequence header and message body have been secured properly
+		RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
+		RMDBean bean = mgr.retrieve(sequenceId);
+		
+		if(bean != null && bean.getSecurityTokenData() != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+			
+			QName seqName = new QName(rmMsgCtx.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.SEQUENCE);
+			
+			SOAPEnvelope envelope = msgCtx.getEnvelope();
+			OMElement body = envelope.getBody();
+			OMElement seqHeader = envelope.getHeader().getFirstChildWithName(seqName);
+			
+			SecurityToken token = secManager.recoverSecurityToken(bean.getSecurityTokenData());
+			
+			secManager.checkProofOfPossession(token, seqHeader, msgCtx);
+			secManager.checkProofOfPossession(token, body, msgCtx);
+		}
+		
+		// Store the inbound sequence id, number and lastMessage onto the operation context
+		OperationContext opCtx = msgCtx.getOperationContext();
+		if(opCtx != null) {
+			opCtx.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID, sequenceId);
+			opCtx.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_MESSAGE_NUMBER, new Long(msgNo));
+			if(lastMessage) opCtx.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE, Boolean.TRUE);
+		}
+		
+		// setting acked msg no range
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+		if (configCtx == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		if (FaultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: SequenceProcessor::processReliableMessage, Unknown sequence");
+			return InvocationResponse.ABORT;
+		}
+
+		// throwing a fault if the sequence is terminated
+		if (FaultManager.checkForSequenceTerminated(rmMsgCtx, sequenceId, bean)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: SequenceProcessor::processReliableMessage, Sequence terminated");
+			return InvocationResponse.ABORT;
+		}
+		
+		// throwing a fault if the sequence is closed.
+		if (FaultManager.checkForSequenceClosed(rmMsgCtx, sequenceId, bean)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: SequenceProcessor::processReliableMessage, Sequence closed");
+			return InvocationResponse.ABORT;
+		}
+		FaultManager.checkForLastMsgNumberExceeded(rmMsgCtx, storageManager);
+		
+		if (FaultManager.checkForMessageRolledOver(rmMsgCtx, sequenceId, msgNo)) {
+			
+			if (log.isDebugEnabled())
+				log.debug("Exit: SequenceProcessor::processReliableMessage, Message rolled over " + msgNo);
+			
+			return InvocationResponse.ABORT;
+		}
+
+		// Pause the messages bean if not the right message to invoke.
+		
+		// updating the last activated time of the sequence.
+		bean.setLastActivatedTime(System.currentTimeMillis());
+		
+		if (lastMessage) {
+			//setting this as the LastMessage number
+			bean.setLastInMessageId(msgCtx.getMessageID());
+		}
+		
+		EndpointReference replyTo = rmMsgCtx.getReplyTo();
+		String key = SandeshaUtil.getUUID(); // key to store the message.
+		// updating the Highest_In_Msg_No property which gives the highest
+		// message number retrieved from this sequence.
+		long highestInMsgNo = bean.getHighestInMessageNumber();
+
+		if (msgNo > highestInMsgNo) {
+			// If WS-Addressing is turned off there may not be a message id written into the SOAP
+			// headers, but we can still fake one up to help us match up requests and replies within
+			// this end of the connection.
+			String messageId = msgCtx.getMessageID();
+			if(messageId == null) {
+				messageId = SandeshaUtil.getUUID();
+				msgCtx.setMessageID(messageId);
+			}
+			
+			bean.setHighestInMessageId(messageId);
+			bean.setHighestInMessageNumber(msgNo);
+		}
+		
+		String specVersion = rmMsgCtx.getRMSpecVersion();
+		if (rmMsgCtx.getMessageContext().getAxisOperation().getName().getLocalPart().equals(Sandesha2Constants.RM_DUPLICATE_OPERATION.getLocalPart())
+				&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
+			// this is a duplicate message and the invocation type is EXACTLY_ONCE. We try to return
+			// ack messages at this point, as if someone is sending duplicates then they may have
+			// missed earlier acks. We also have special processing for sync 2-way with RM 1.0
+			if((replyTo==null || replyTo.hasAnonymousAddress()) &&
+			   (specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
+
+			  SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+			  SenderBean findSenderBean = new SenderBean ();
+			  findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+			  findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
+			  findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
+			  findSenderBean.setSend(true);
+		
+			  SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
+			    
+			  // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
+			  // processor. This will use this thread to re-send the reply, writing it into the transport.
+			  // As the reply is now written we do not want to continue processing, or suspend, so we abort.
+			  if(replyMessageBean != null) {
+			  	if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
+			   	MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
+					result = InvocationResponse.ABORT;
+					if (log.isDebugEnabled())
+						log.debug("Exit: SequenceProcessor::processReliableMessage, replayed message: " + result);
+					return result;
+			  }
+		  }
+			
+			EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
+			
+			// Send an Ack if needed.
+			//We are not sending acks for duplicate messages in the RM 1.0 anon InOut case.
+			//If a standalone ack get sent before the actualy message (I.e. before the original msg get
+			//replied), the client may take this as a InOnly message and may avoid looking for the application
+			//response.
+			if (!(Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmMsgCtx.getRMSpecVersion()) && 
+					rmMsgCtx.getReplyTo().hasAnonymousAddress())) {
+				sendAckIfNeeded(bean, sequenceId, rmMsgCtx, storageManager, true, acksTo.hasAnonymousAddress());	
+			}
+			
+			result = InvocationResponse.ABORT;
+			if (log.isDebugEnabled())
+				log.debug("Exit: SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
+			return result;
+		}
+		
+		// If the message is a reply to an outbound message then we can update the RMSBean that
+		// matches.
+		String outboundSequence = bean.getOutboundInternalSequence();
+		if(outboundSequence != null) {
+			RMSBean outBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, outboundSequence);
+			if(outBean != null && outBean.getExpectedReplies() > 0 ) {
+				outBean.setExpectedReplies(outBean.getExpectedReplies() - 1);
+				RMSBeanMgr outMgr = storageManager.getRMSBeanMgr();
+				outMgr.update(outBean);
+			}
+		}
+		
+		// Set the last activated time
+		bean.setLastActivatedTime(System.currentTimeMillis());
+		
+		// Update the RMD bean
+		mgr.update(bean);
+		
+		// If we are doing sync 2-way over WSRM 1.0, then we may just have received one of
+		// the reply messages that we were looking for. If so we can remove the matching sender bean.
+		int mep = msgCtx.getAxisOperation().getAxisSpecifMEPConstant();
+		if(specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0) &&
+				mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+			RelatesTo relatesTo = msgCtx.getRelatesTo();
+			if(relatesTo != null) {
+				String messageId = relatesTo.getValue();
+				SenderBean matcher = new SenderBean();
+				matcher.setMessageID(messageId);
+				SenderBean sender = storageManager.getSenderBeanMgr().findUnique(matcher);
+				if(sender != null) {
+					if(log.isDebugEnabled()) log.debug("Deleting sender for sync-2-way message");
+					storageManager.removeMessageContext(sender.getMessageContextRefKey());
+					storageManager.getSenderBeanMgr().delete(messageId);
+					
+					// Try and terminate the corresponding outbound sequence
+					RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sender.getSequenceID());
+					TerminateManager.checkAndTerminate(rmMsgCtx.getConfigurationContext(), storageManager, rmsBean);
+				}
+			}
+		}
+
+		//setting properties for the messageContext
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo));
+		
+		// We only create an ack message if:
+		// - We have anonymous acks, and the backchannel is free
+		// - We have async acks
+		boolean backchannelFree = (replyTo != null && !replyTo.hasAnonymousAddress()) ||
+									WSDLConstants.MEP_CONSTANT_IN_ONLY == mep;
+		EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
+		if (acksTo.hasAnonymousAddress() && backchannelFree) {
+			Object responseWritten = msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
+			if (responseWritten==null || !Constants.VALUE_TRUE.equals(responseWritten)) {
+				RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
+				msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+				AcknowledgementManager.sendAckNow(ackRMMsgContext);
+			}
+		} else if (!acksTo.hasAnonymousAddress()) {
+			SandeshaPolicyBean policyBean = SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation());
+			long ackInterval = policyBean.getAcknowledgementInterval();
+			long timeToSend = System.currentTimeMillis() + ackInterval;
+			
+			RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
+
+			AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, timeToSend, storageManager);
+		}
+		
+		// If this message matches the WSRM 1.0 pattern for an empty last message (e.g.
+		// the sender wanted to signal the last message, but didn't have an application
+		// message to send) then we do not need to send the message on to the application.
+		if(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_LAST_MESSAGE.equals(msgCtx.getWSAAction()) ||
+		   Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_LAST_MESSAGE.equals(msgCtx.getSoapAction())) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: SequenceProcessor::processReliableMessage, got WSRM 1.0 lastmessage, aborting");
+			return InvocationResponse.ABORT;
+		}
+		
+		// If the storage manager has an invoker, then they may be implementing inOrder, or
+		// transactional delivery. Either way, if they have one we should use it.
+		SandeshaThread invoker = storageManager.getInvoker();
+		if (invoker != null) {
+			// Whatever the MEP, we stop processing here and the invoker will do the real work. We only
+			// SUSPEND if we need to keep the backchannel open for the response... we may as well ABORT
+			// to let other cases end more quickly.
+			if(backchannelFree) {
+				result = InvocationResponse.ABORT;
+			} else {
+				result = InvocationResponse.SUSPEND;
+			}
+			InvokerBeanMgr storageMapMgr = storageManager.getInvokerBeanMgr();
+
+			storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
+			storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
+
+			// This will avoid performing application processing more than once.
+			rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		}
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: SequenceProcessor::processReliableMessage " + result);
+		
+		return result;
+	}
+
+
+	private void sendAckIfNeeded(RMDBean rmdBean, String sequenceId, RMMsgContext rmMsgCtx, 
+			StorageManager storageManager, boolean serverSide, boolean anonymousAcksTo)
+					throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: SequenceProcessor::sendAckIfNeeded " + sequenceId);
+
+			RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
+					rmMsgCtx, rmdBean, sequenceId, storageManager, serverSide);
+
+			if (anonymousAcksTo) {
+				rmMsgCtx.getMessageContext().getOperationContext().
+					setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+				AcknowledgementManager.sendAckNow(ackRMMsgCtx);
+			} else {				
+				long ackInterval = SandeshaUtil.getPropertyBean(
+						rmMsgCtx.getMessageContext().getAxisService())
+						.getAcknowledgementInterval();
+				long timeToSend = System.currentTimeMillis() + ackInterval;
+				AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, storageManager);
+			}			
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: SequenceProcessor::sendAckIfNeeded");
+	}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,413 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.http.server.AxisHttpResponseImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+/**
+ * Responsible for processing an incoming Terminate Sequence message.
+ */
+
+public class TerminateSeqMsgProcessor extends WSRMMessageSender implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
+
+	public boolean processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
+
+		MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
+
+		// Processing the terminate message
+		// TODO Add terminate sequence message logic.
+		TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMsg
+				.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+		if (terminateSequence == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noTerminateSeqPart);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		String sequenceId = terminateSequence.getIdentifier().getIdentifier();
+		if (sequenceId == null || "".equals(sequenceId)) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidSequenceID, null);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+		
+		ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+		
+		// Check that the sender of this TerminateSequence holds the correct token
+		RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+		if(rmdBean != null && rmdBean.getSecurityTokenData() != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+			OMElement body = terminateSeqRMMsg.getSOAPEnvelope().getBody();
+			SecurityToken token = secManager.recoverSecurityToken(rmdBean.getSecurityTokenData());
+			secManager.checkProofOfPossession(token, body, terminateSeqRMMsg.getMessageContext());
+		}
+
+		if (FaultManager.checkForUnknownSequence(terminateSeqRMMsg, sequenceId, storageManager)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: TerminateSeqMsgProcessor::processInMessage, unknown sequence");
+			return false;
+		}
+
+		// add the terminate sequence response if required.
+		RMMsgContext terminateSequenceResponse = null;
+		if (SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
+			terminateSequenceResponse = getTerminateSequenceResponse(terminateSeqRMMsg, rmdBean, sequenceId, storageManager);
+
+		setUpHighestMsgNumbers(context, storageManager, sequenceId, terminateSeqRMMsg);
+		
+		
+		
+		boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(context.getAxisConfiguration()).isInOrder();
+		
+		
+		//if the invocation is inOrder and if this is RM 1.1 there is a posibility of all the messages having eleady being invoked.
+		//In this case we should do the full termination.
+		
+		boolean doFullTermination = false;
+		
+		if (inOrderInvocation) {
+
+			long highestMsgNo = rmdBean.getHighestInMessageNumber();
+			long nextMsgToProcess = rmdBean.getNextMsgNoToProcess();
+			
+			if (nextMsgToProcess>highestMsgNo) {
+				//all the messages have been invoked, u can do the full termination
+				doFullTermination = true;
+			}
+		} else {
+			//for not-inorder case, always do the full termination.
+			doFullTermination = true;
+		}
+		
+		if (doFullTermination) {
+			TerminateManager.cleanReceivingSideAfterInvocation(sequenceId, storageManager);
+			TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, storageManager);
+		} else
+			TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, storageManager);
+
+		rmdBean.setTerminated(true);		
+		rmdBean.setLastActivatedTime(System.currentTimeMillis());
+		storageManager.getRMDBeanMgr().update(rmdBean);
+
+
+		//sending the terminate sequence response
+		if (terminateSequenceResponse != null) {
+			
+			MessageContext outMessage = terminateSequenceResponse.getMessageContext();
+			EndpointReference toEPR = outMessage.getTo();
+			
+			AxisEngine engine = new AxisEngine(terminateSeqMsg
+					.getConfigurationContext());
+						
+			outMessage.setServerSide(true);
+						
+			try {							
+				engine.send(outMessage);
+			} catch (AxisFault e) {
+				if (log.isDebugEnabled())
+					log.debug("Unable to send terminate sequence response", e);
+				
+				throw new SandeshaException(
+						SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse), e);
+			}
+			
+			if (toEPR.hasAnonymousAddress()) {
+				terminateSeqMsg.getOperationContext().setProperty(
+						org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+			} else {
+				terminateSeqMsg.getOperationContext().setProperty(
+						org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+			}
+
+		} else {
+			//if RM 1.0 Anonymous scenario we will be trying to attache the TerminateSequence of the response side 
+			//as the response message.
+			
+			String outgoingSideInternalSeqId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
+			SenderBean senderFindBean = new SenderBean ();
+			senderFindBean.setInternalSequenceID(outgoingSideInternalSeqId);
+			senderFindBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+			senderFindBean.setSend(true);
+			senderFindBean.setReSend(false);
+			
+			SenderBean outgoingSideTerminateBean = storageManager.getSenderBeanMgr().findUnique(senderFindBean);
+			if (outgoingSideTerminateBean!=null) {
+			
+				EndpointReference toEPR = new EndpointReference (outgoingSideTerminateBean.getToAddress());
+				if (toEPR.hasAnonymousAddress()) {
+					String messageKey = outgoingSideTerminateBean
+							.getMessageContextRefKey();
+					MessageContext message = storageManager
+							.retrieveMessageContext(messageKey, context);
+
+					RMMsgContext rmMessage = MsgInitializer.initializeMessage(message);
+					
+					// attaching the this outgoing terminate message as the
+					// response to the incoming terminate message.
+					message.setTransportOut(terminateSeqMsg.getTransportOut());
+					message.setProperty(MessageContext.TRANSPORT_OUT,terminateSeqMsg.getProperty(MessageContext.TRANSPORT_OUT));
+					message.setProperty(Constants.OUT_TRANSPORT_INFO, terminateSeqMsg.getProperty(Constants.OUT_TRANSPORT_INFO));
+	                
+					terminateSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+					
+					AxisEngine engine = new AxisEngine(context);
+					try {							
+						engine.send(message);
+					} catch (AxisFault e) {
+						if (log.isDebugEnabled())
+							log.debug("Unable to send terminate sequence response", e);
+						
+						throw new SandeshaException(
+								SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse), e);
+					}
+					
+					MessageRetransmissionAdjuster.adjustRetransmittion(rmMessage, outgoingSideTerminateBean, context, storageManager);
+				}
+				
+			}
+
+		}
+		
+		terminateSeqMsg.pause();
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: TerminateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
+		return true;
+	}
+
+	private void setUpHighestMsgNumbers(ConfigurationContext configCtx, StorageManager storageManager,
+			String sequenceId, RMMsgContext terminateRMMsg) throws SandeshaException {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
+
+		RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
+		RMDBean bean = mgr.retrieve(sequenceId);
+
+		long highestInMsgNo = bean.getHighestInMessageNumber();
+
+		// following will be valid only for the server side, since the obtained
+		// int. seq ID is only valid there.
+		String responseSideInternalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
+		
+		
+		long highestOutMsgNo = 0;
+		try {
+			boolean addResponseSideTerminate = false;
+			if (highestInMsgNo == 0) {
+				addResponseSideTerminate = false;
+			} else {
+				// Mark up the highest inbound message as if it had the last message flag on it.
+				// 
+				String inMsgId = bean.getHighestInMessageId();
+				bean.setLastInMessageId(inMsgId);
+				
+				// Update the RMDBean
+				storageManager.getRMDBeanMgr().update(bean);
+				
+				// If an outbound message has already gone out with that relatesTo, then we can terminate
+				// right away.
+				RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, responseSideInternalSequenceId);
+
+				if(rmsBean != null) {
+					String highestOutRelatesTo = rmsBean.getHighestOutRelatesTo();
+					if (highestOutRelatesTo != null && highestOutRelatesTo.equals(inMsgId)) {
+						highestOutMsgNo = rmsBean.getHighestOutMessageNumber();
+						addResponseSideTerminate = true;
+						
+						// It is possible that the message has gone out, but not been acked yet. In that case
+						// we can store the HIGHEST_OUT_MSG_NUMBER as the LAST_OUT_MESSAGE_NO, so that when the
+						// ack arrives we will terminate the sequence
+						rmsBean.setLastOutMessage(highestOutMsgNo);
+						storageManager.getRMSBeanMgr().update(rmsBean);
+					}
+				}
+			}
+
+			// If all the out message have been acked, add the outgoing
+			// terminate seq msg.
+			String outgoingSequnceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(responseSideInternalSequenceId, storageManager); 
+
+			if (addResponseSideTerminate && highestOutMsgNo > 0 && responseSideInternalSequenceId != null
+					&& outgoingSequnceID != null) {
+				boolean allAcked = SandeshaUtil.isAllMsgsAckedUpto(highestOutMsgNo, responseSideInternalSequenceId, storageManager);
+
+				if (allAcked)
+				{
+					RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outgoingSequnceID);
+					if (!rmsBean.isTerminateAdded()) {
+						TerminateManager.addTerminateSequenceMessage(terminateRMMsg, rmsBean.getInternalSequenceID(), outgoingSequnceID , storageManager);
+						String referenceMsgKey = rmsBean.getReferenceMessageStoreKey();
+						if (referenceMsgKey==null) {
+							String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referenceMessageNotSetForSequence,rmsBean.getSequenceID());
+							throw new SandeshaException (message);
+						}
+						
+						MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey, configCtx);
+						
+						if (referenceMessage==null) {
+							String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getSequenceID());
+							throw new SandeshaException (message);
+						}
+						
+						RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
+									
+					}
+				}
+			}
+		} catch (AxisFault e) {
+			throw new SandeshaException(e);
+		}
+		if (log.isDebugEnabled())
+			log.debug("Exit: TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
+	}
+
+	private RMMsgContext getTerminateSequenceResponse(RMMsgContext terminateSeqRMMsg, RMDBean rmdBean, String sequenceId,
+			StorageManager storageManager) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: TerminateSeqMsgProcessor::addTerminateSequenceResponse, " + sequenceId);
+
+		RMMsgContext terminateSeqResponseRMMsg = RMMsgCreator.createTerminateSeqResponseMsg(terminateSeqRMMsg, rmdBean);
+		MessageContext outMessage = terminateSeqResponseRMMsg.getMessageContext();
+
+		RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, rmdBean, 
+				sequenceId,	storageManager, true);
+		
+		// copy over the ack parts
+		Iterator iter = ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+		while (iter.hasNext()) {
+			SequenceAcknowledgement seqAck = (SequenceAcknowledgement) iter.next();
+			terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, seqAck);
+		}
+		
+		terminateSeqResponseRMMsg.addSOAPEnvelope();
+
+		terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+		terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		outMessage.setResponseWritten(true);
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: TerminateSeqMsgProcessor::addTerminateSequenceResponse");
+
+		return terminateSeqResponseRMMsg;
+	}
+
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
+
+		// Get the parent processor to setup the out message
+		setupOutMessage(rmMsgCtx);
+		
+		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(getStorageManager(), getInternalSequenceID());
+		
+		// Check if the sequence is already terminated (stored on the internal sequenceid)
+		if (rmsBean.isTerminateAdded()) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
+			log.debug(message);
+			if (log.isDebugEnabled())
+				log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage, sequence previously terminated");
+			return true;
+		}
+
+		AxisOperation terminateOp = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
+				rmMsgCtx.getRMSpecVersion(),
+				getMsgContext().getAxisService());
+	
+		OperationContext opcontext = ContextFactory.createOperationContext(terminateOp, getMsgContext().getServiceContext());
+		opcontext.setParent(getMsgContext().getServiceContext());
+
+		getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(),	opcontext);
+
+		getMsgContext().setOperationContext(opcontext);
+		getMsgContext().setAxisOperation(terminateOp);
+
+		TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
+				.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+		terminateSequencePart.getIdentifier().setIndentifer(getOutSequenceID());
+
+		rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(getRMVersion()));
+		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(getRMVersion()));
+		
+		rmsBean.setTerminateAdded(true);
+
+		// Update the RMSBean with the terminate added flag
+		getStorageManager().getRMSBeanMgr().update(rmsBean);
+
+		// Send the outgoing message
+		// Set a retransmitter lastSentTime so that terminate will be send with
+		// some delay.
+		// Otherwise this get send before return of the current request (ack).
+		// TODO: refine the terminate delay.
+		sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.TERMINATE_SEQ, Sandesha2Constants.TERMINATE_DELAY);		
+
+		// Pause the message context
+		rmMsgCtx.pause();
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
+		return true;
+	}
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,96 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.TerminateSequenceResponse;
+
+/**
+ * To process terminate sequence response messages.
+ */
+public class TerminateSeqResponseMsgProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
+	
+	public boolean processInMessage(RMMsgContext terminateResRMMsg)
+			throws AxisFault { 
+		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
+		
+		MessageContext msgContext = terminateResRMMsg.getMessageContext();
+		ConfigurationContext context = terminateResRMMsg.getConfigurationContext();
+		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+		
+		TerminateSequenceResponse tsResponse = (TerminateSequenceResponse)
+		  terminateResRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
+		
+		String sequenceId = tsResponse.getIdentifier().getIdentifier();
+		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceId);
+
+		// Check that the sender of this TerminateSequence holds the correct token
+		if(rmsBean != null && rmsBean.getSecurityTokenData() != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+			OMElement body = terminateResRMMsg.getSOAPEnvelope().getBody();
+			SecurityToken token = secManager.recoverSecurityToken(rmsBean.getSecurityTokenData());
+			secManager.checkProofOfPossession(token, body, msgContext);
+		}
+
+		msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,rmsBean.getInternalSequenceID());
+
+		//shedulling a polling request for the response side.
+		if (rmsBean.getOfferedSequence()!=null) {
+			RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
+			RMDBean rMDBean = rMDBeanMgr.retrieve(sequenceId);
+			
+			if (rMDBean!=null && rMDBean.isPollingMode()) {
+				PollingManager manager = storageManager.getPollingManager();
+				if(manager != null) manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+			}
+		}
+
+		TerminateManager.terminateSendingSide (rmsBean, storageManager);
+		
+		// Stop this message travelling further through the Axis runtime
+		terminateResRMMsg.pause();
+
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
+		return true;
+  }
+
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
+		return false;
+	}
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,111 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgreceivers;
+
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+*Currently this is a dummy Msg Receiver.
+*All the necessary RM logic happens at MessageProcessors.
+*This only ensures that the defaults Messsage Receiver does not get called for RM control messages.
+*/
+
+
+public class RMMessageReceiver extends AbstractMessageReceiver {
+
+	private static final Log log = LogFactory.getLog(RMMessageReceiver.class);
+	
+	public final void receive(MessageContext msgCtx) throws AxisFault {
+		if(log.isDebugEnabled()) log.debug("Entry: RMMessageReceiver::receive");
+		
+    RMMsgContext rmMsgCtx = null;
+    
+    if (msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT) != null)
+      rmMsgCtx = (RMMsgContext)msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT);
+    else
+      rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+    
+		if(log.isDebugEnabled()) log.debug("MsgReceiver got type: " + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));	
+
+		// Note that some messages (such as stand-alone acks) will be routed here, but
+		// the headers will already have been processed. Therefore we should not assume
+		// that we will have a MsgProcessor every time.
+		MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+		if(msgProcessor != null) {
+			Transaction transaction = null;
+			
+			try {
+				ConfigurationContext context = msgCtx.getConfigurationContext();
+				StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());				
+				transaction = storageManager.getTransaction();
+
+				msgProcessor.processInMessage(rmMsgCtx);
+
+			} catch (Exception e) {
+				if (log.isDebugEnabled())
+					log.debug("Exception caught during processInMessage", e);
+				// message should not be sent in a exception situation.
+				msgCtx.pause();
+	
+				if (transaction != null) {
+					try {
+						transaction.rollback();
+						transaction = null;
+					} catch (Exception e1) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+						log.debug(message, e);
+					}
+				}
+	
+				if (!(e instanceof AxisFault)) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.inMsgError, e.toString());
+					throw new AxisFault(message, e);
+				}
+				
+				throw (AxisFault)e;
+			} finally {
+				if (transaction != null) {
+					try {
+						transaction.commit();
+					} catch (Exception e) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+						log.debug(message, e);
+					}
+				}
+			}
+		}	
+
+		if(log.isDebugEnabled()) log.debug("Exit: RMMessageReceiver::receive");
+	}
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org