You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/10/11 06:31:52 UTC

svn commit: r462697 [2/3] - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: handlers/ i18n/ msgprocessors/ storage/beans/ storage/inmemory/ transport/ util/ workers/ wsrm/

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Tue Oct 10 21:31:51 2006
@@ -1,323 +1,320 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *  
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import java.util.Iterator;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingConstants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.addressing.RelatesTo;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.NextMsgBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.SOAPAbstractFactory;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
-import org.apache.sandesha2.util.SpecSpecificConstants;
-import org.apache.sandesha2.wsrm.Accept;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CreateSequenceResponse;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-
-/**
- * Responsible for processing an incoming Create Sequence Response message.
- */
-
-public class CreateSeqResponseMsgProcessor implements MsgProcessor {
-
-	private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
-
-	public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
-
-		if (log.isDebugEnabled())
-			log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
-
-		ConfigurationContext configCtx = createSeqResponseRMMsgCtx.getMessageContext().getConfigurationContext();
-
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
-				.getAxisConfiguration());
-
-		// Processing the create sequence response.
-
-		CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
-				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
-		if (createSeqResponsePart == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqResponse);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		String newOutSequenceId = createSeqResponsePart.getIdentifier().getIdentifier();
-		if (newOutSequenceId == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.newSeqIdIsNull);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		RelatesTo relatesTo = createSeqResponseRMMsgCtx.getMessageContext().getRelatesTo();
-		if (relatesTo == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
-			log.error(message);
-			throw new SandeshaException(message);
-		}
-		String createSeqMsgId = relatesTo.getValue();
-
-		SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
-		CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
-
-		CreateSeqBean createSeqBean = createSeqMgr.retrieve(createSeqMsgId);
-		if (createSeqBean == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		// Check that the create sequence response message proves possession of the correct token
-		String tokenData = createSeqBean.getSecurityTokenData();
-		if(tokenData != null) {
-			SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
-			MessageContext crtSeqResponseCtx = createSeqResponseRMMsgCtx.getMessageContext();
-			OMElement body = crtSeqResponseCtx.getEnvelope().getBody();
-			SecurityToken token = secManager.recoverSecurityToken(tokenData);
-			secManager.checkProofOfPossession(token, body, crtSeqResponseCtx);
-		}
-
-		String internalSequenceId = createSeqBean.getInternalSequenceID();
-		if (internalSequenceId == null || "".equals(internalSequenceId)) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.tempSeqIdNotSet);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-		createSeqResponseRMMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
-		
-		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(createSeqResponseRMMsgCtx);
-		
-		createSeqBean.setSequenceID(newOutSequenceId);
-		createSeqMgr.update(createSeqBean);
-
-		SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
-		if (createSequenceSenderBean == null)
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
-
-		// deleting the create sequence entry.
-		retransmitterMgr.delete(createSeqMsgId);
-
-		// storing new out sequence id
-		SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropertyBeanMgr();
-		SequencePropertyBean outSequenceBean = new SequencePropertyBean(sequencePropertyKey,
-				Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, newOutSequenceId);
-		SequencePropertyBean internalSequenceBean = new SequencePropertyBean(newOutSequenceId,
-				Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, sequencePropertyKey);
-
-		sequencePropMgr.insert(outSequenceBean);
-		sequencePropMgr.insert(internalSequenceBean);
-		
-		// Store the security token under the new sequence id
-		if(tokenData != null) {
-			SequencePropertyBean newToken = new SequencePropertyBean(newOutSequenceId,
-					Sandesha2Constants.SequenceProperties.SECURITY_TOKEN, tokenData);
-			sequencePropMgr.insert(newToken);
-		}
-
-		// processing for accept (offer has been sent)
-		Accept accept = createSeqResponsePart.getAccept();
-		if (accept != null) {
-			// Find offered sequence from internal sequence id.
-			SequencePropertyBean offeredSequenceBean = sequencePropMgr.retrieve(sequencePropertyKey,
-					Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
-
-			// TODO this should be detected in the Fault manager.
-			if (offeredSequenceBean == null) {
-				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
-				log.debug(message);
-				throw new SandeshaException(message);
-			}
-
-			String offeredSequenceId = (String) offeredSequenceBean.getValue();
-
-			EndpointReference acksToEPR = accept.getAcksTo().getAddress().getEpr();
-			SequencePropertyBean acksToBean = new SequencePropertyBean();
-			acksToBean.setName(Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
-			acksToBean.setSequencePropertyKey(offeredSequenceId);
-			acksToBean.setValue(acksToEPR.getAddress());
-
-			sequencePropMgr.insert(acksToBean);
-
-			NextMsgBean nextMsgBean = new NextMsgBean();
-			nextMsgBean.setSequenceID(offeredSequenceId);
-			nextMsgBean.setNextMsgNoToProcess(1);
-			
-
-			boolean pollingMode = false;
-			if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
-				String replyToAddress = SandeshaUtil.getSequenceProperty(sequencePropertyKey, 
-								Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, storageManager);
-				if (replyToAddress!=null) {
-					if (AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyToAddress))
-						pollingMode = true;
-					else if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyToAddress))
-						pollingMode = true;
-					else if (replyToAddress.startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
-						pollingMode = true;
-				}
-			}
-			
-			//Storing the createSequence of the sending side sequence as the reference message.
-			//This can be used when creating new outgoing messages.
-			
-			String createSequenceMsgStoreKey = createSeqBean.getCreateSequenceMsgStoreKey();
-			MessageContext createSequenceMsg = storageManager.retrieveMessageContext(createSequenceMsgStoreKey, configCtx);
-			
-			String newMessageStoreKey = SandeshaUtil.getUUID();
-			storageManager.storeMessageContext(newMessageStoreKey,createSequenceMsg);
-			
-			nextMsgBean.setReferenceMessageKey(newMessageStoreKey);
-			
-			nextMsgBean.setPollingMode(pollingMode);
-			
-			//if PollingMode is true, starting the pollingmanager.
-			if (pollingMode)
-				SandeshaUtil.startPollingManager(configCtx);
-			
-			NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
-			nextMsgMgr.insert(nextMsgBean);
-
-			String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
-
-			SequencePropertyBean specVersionBean = new SequencePropertyBean(offeredSequenceId,
-					Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION, rmSpecVersion);
-			sequencePropMgr.insert(specVersionBean);
-
-			SequencePropertyBean receivedMsgBean = new SequencePropertyBean(offeredSequenceId,
-					Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
-			sequencePropMgr.insert(receivedMsgBean);
-
-			SequencePropertyBean msgsBean = new SequencePropertyBean();
-			msgsBean.setSequencePropertyKey(offeredSequenceId);
-			msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
-			msgsBean.setValue("");
-			sequencePropMgr.insert(msgsBean);
-
-			// setting the addressing version.
-			String addressingNamespace = createSeqResponseRMMsgCtx.getAddressingNamespaceValue();
-			SequencePropertyBean addressingVersionBean = new SequencePropertyBean(offeredSequenceId,
-					Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, addressingNamespace);
-			sequencePropMgr.insert(addressingVersionBean);
-
-			// Store the security token for the offered sequence
-			if(tokenData != null) {
-				SequencePropertyBean newToken = new SequencePropertyBean(offeredSequenceId,
-						Sandesha2Constants.SequenceProperties.SECURITY_TOKEN, tokenData);
-				sequencePropMgr.insert(newToken);
-			}
-		}
-
-		SenderBean target = new SenderBean();
-		target.setInternalSequenceID(internalSequenceId);
-		target.setSend(false);
-		target.setReSend(true);
-
-		Iterator iterator = retransmitterMgr.find(target).iterator();
-		while (iterator.hasNext()) {
-			SenderBean tempBean = (SenderBean) iterator.next();
-
-			// updating the application message
-			String key = tempBean.getMessageContextRefKey();
-			MessageContext applicationMsg = storageManager.retrieveMessageContext(key, configCtx);
-
-			// TODO make following exception message more understandable to the
-			// user (probably some others exceptions messages as well)
-			if (applicationMsg == null)
-				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unavailableAppMsg));
-
-			String rmVersion = SandeshaUtil.getRMVersion(sequencePropertyKey, storageManager);
-			if (rmVersion == null)
-				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
-			String assumedRMNamespace = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
-
-			RMMsgContext applicaionRMMsg = MsgInitializer.initializeMessage(applicationMsg);
-
-			Sequence sequencePart = (Sequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-			if (sequencePart == null) {
-				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
-				log.debug(message);
-				throw new SandeshaException(message);
-			}
-
-			Identifier identifier = new Identifier(assumedRMNamespace);
-			identifier.setIndentifer(newOutSequenceId);
-
-			sequencePart.setIdentifier(identifier);
-
-			try {
-				applicaionRMMsg.addSOAPEnvelope();
-			} catch (AxisFault e) {
-				throw new SandeshaException(e.getMessage());
-			}
-
-			// asking to send the application msssage
-			tempBean.setSend(true);
-			retransmitterMgr.update(tempBean);
-
-			// updating the message. this will correct the SOAP envelope string.
-			storageManager.updateMessageContext(key, applicationMsg);
-		}
-
-		SequenceManager.updateLastActivatedTime(sequencePropertyKey, storageManager);
-
-		createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
-				org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
-
-		createSeqResponseRMMsgCtx.pause();
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage");
-	}
-
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
-		if (log.isDebugEnabled()) {
-			log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
-			log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage");
-		}
-
-	}
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * Responsible for processing an incoming Create Sequence Response message.
+ */
+
+public class CreateSeqResponseMsgProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
+
+	public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
+
+		ConfigurationContext configCtx = createSeqResponseRMMsgCtx.getMessageContext().getConfigurationContext();
+
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+				.getAxisConfiguration());
+
+		// Processing the create sequence response.
+
+		CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
+				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+		if (createSeqResponsePart == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqResponse);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		String newOutSequenceId = createSeqResponsePart.getIdentifier().getIdentifier();
+		if (newOutSequenceId == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.newSeqIdIsNull);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		RelatesTo relatesTo = createSeqResponseRMMsgCtx.getMessageContext().getRelatesTo();
+		if (relatesTo == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
+			log.error(message);
+			throw new SandeshaException(message);
+		}
+		String createSeqMsgId = relatesTo.getValue();
+
+		SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
+		CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
+
+		CreateSeqBean createSeqBean = createSeqMgr.retrieve(createSeqMsgId);
+		if (createSeqBean == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		// Check that the create sequence response message proves possession of the correct token
+		String tokenData = createSeqBean.getSecurityTokenData();
+		if(tokenData != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+			MessageContext crtSeqResponseCtx = createSeqResponseRMMsgCtx.getMessageContext();
+			OMElement body = crtSeqResponseCtx.getEnvelope().getBody();
+			SecurityToken token = secManager.recoverSecurityToken(tokenData);
+			secManager.checkProofOfPossession(token, body, crtSeqResponseCtx);
+		}
+
+		String internalSequenceId = createSeqBean.getInternalSequenceID();
+		if (internalSequenceId == null || "".equals(internalSequenceId)) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.tempSeqIdNotSet);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+		createSeqResponseRMMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
+		
+		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(createSeqResponseRMMsgCtx);
+		
+		createSeqBean.setSequenceID(newOutSequenceId);
+		createSeqMgr.update(createSeqBean);
+
+		SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
+		if (createSequenceSenderBean == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
+
+		// deleting the create sequence entry.
+		retransmitterMgr.delete(createSeqMsgId);
+
+		// storing new out sequence id
+		SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropertyBeanMgr();
+		SequencePropertyBean outSequenceBean = new SequencePropertyBean(sequencePropertyKey,
+				Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, newOutSequenceId);
+		SequencePropertyBean internalSequenceBean = new SequencePropertyBean(newOutSequenceId,
+				Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, sequencePropertyKey);
+
+		sequencePropMgr.insert(outSequenceBean);
+		sequencePropMgr.insert(internalSequenceBean);
+		
+		// Store the security token under the new sequence id
+		if(tokenData != null) {
+			SequencePropertyBean newToken = new SequencePropertyBean(newOutSequenceId,
+					Sandesha2Constants.SequenceProperties.SECURITY_TOKEN, tokenData);
+			sequencePropMgr.insert(newToken);
+		}
+
+		// processing for accept (offer has been sent)
+		Accept accept = createSeqResponsePart.getAccept();
+		if (accept != null) {
+			// Find offered sequence from internal sequence id.
+			SequencePropertyBean offeredSequenceBean = sequencePropMgr.retrieve(sequencePropertyKey,
+					Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
+
+			// TODO this should be detected in the Fault manager.
+			if (offeredSequenceBean == null) {
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
+				log.debug(message);
+				throw new SandeshaException(message);
+			}
+
+			String offeredSequenceId = (String) offeredSequenceBean.getValue();
+
+			EndpointReference acksToEPR = accept.getAcksTo().getAddress().getEpr();
+			SequencePropertyBean acksToBean = new SequencePropertyBean();
+			acksToBean.setName(Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+			acksToBean.setSequencePropertyKey(offeredSequenceId);
+			acksToBean.setValue(acksToEPR.getAddress());
+
+			sequencePropMgr.insert(acksToBean);
+
+			NextMsgBean nextMsgBean = new NextMsgBean();
+			nextMsgBean.setSequenceID(offeredSequenceId);
+			nextMsgBean.setNextMsgNoToProcess(1);
+			
+
+			boolean pollingMode = false;
+			if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
+				String replyToAddress = SandeshaUtil.getSequenceProperty(sequencePropertyKey, 
+								Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, storageManager);
+				if (replyToAddress!=null) {
+					if (AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyToAddress))
+						pollingMode = true;
+					else if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyToAddress))
+						pollingMode = true;
+					else if (replyToAddress.startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
+						pollingMode = true;
+				}
+			}
+			
+			//Storing the referenceMessage of the sending side sequence as the reference message
+			//of the receiving side as well.
+			//This can be used when creating new outgoing messages.
+			
+			String referenceMsgStoreKey = createSeqBean.getReferenceMessageStoreKey();
+			MessageContext referenceMsg = storageManager.retrieveMessageContext(referenceMsgStoreKey, configCtx);
+			
+			String newMessageStoreKey = SandeshaUtil.getUUID();
+			storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
+			
+			nextMsgBean.setReferenceMessageKey(newMessageStoreKey);
+			
+			nextMsgBean.setPollingMode(pollingMode);
+			
+			//if PollingMode is true, starting the pollingmanager.
+			if (pollingMode)
+				SandeshaUtil.startPollingManager(configCtx);
+			
+			NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+			nextMsgMgr.insert(nextMsgBean);
+
+			String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
+
+			SequencePropertyBean specVersionBean = new SequencePropertyBean(offeredSequenceId,
+					Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION, rmSpecVersion);
+			sequencePropMgr.insert(specVersionBean);
+
+			SequencePropertyBean receivedMsgBean = new SequencePropertyBean(offeredSequenceId,
+					Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
+			sequencePropMgr.insert(receivedMsgBean);
+
+			SequencePropertyBean msgsBean = new SequencePropertyBean();
+			msgsBean.setSequencePropertyKey(offeredSequenceId);
+			msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+			msgsBean.setValue("");
+			sequencePropMgr.insert(msgsBean);
+
+			// setting the addressing version.
+			String addressingNamespace = createSeqResponseRMMsgCtx.getAddressingNamespaceValue();
+			SequencePropertyBean addressingVersionBean = new SequencePropertyBean(offeredSequenceId,
+					Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, addressingNamespace);
+			sequencePropMgr.insert(addressingVersionBean);
+
+			// Store the security token for the offered sequence
+			if(tokenData != null) {
+				SequencePropertyBean newToken = new SequencePropertyBean(offeredSequenceId,
+						Sandesha2Constants.SequenceProperties.SECURITY_TOKEN, tokenData);
+				sequencePropMgr.insert(newToken);
+			}
+		}
+
+		SenderBean target = new SenderBean();
+		target.setInternalSequenceID(internalSequenceId);
+		target.setSend(false);
+		target.setReSend(true);
+
+		Iterator iterator = retransmitterMgr.find(target).iterator();
+		while (iterator.hasNext()) {
+			SenderBean tempBean = (SenderBean) iterator.next();
+
+			// updating the application message
+			String key = tempBean.getMessageContextRefKey();
+			MessageContext applicationMsg = storageManager.retrieveMessageContext(key, configCtx);
+
+			// TODO make following exception message more understandable to the
+			// user (probably some others exceptions messages as well)
+			if (applicationMsg == null)
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unavailableAppMsg));
+
+			String rmVersion = SandeshaUtil.getRMVersion(sequencePropertyKey, storageManager);
+			if (rmVersion == null)
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+			String assumedRMNamespace = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
+
+			RMMsgContext applicaionRMMsg = MsgInitializer.initializeMessage(applicationMsg);
+
+			Sequence sequencePart = (Sequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+			if (sequencePart == null) {
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
+				log.debug(message);
+				throw new SandeshaException(message);
+			}
+
+			Identifier identifier = new Identifier(assumedRMNamespace);
+			identifier.setIndentifer(newOutSequenceId);
+
+			sequencePart.setIdentifier(identifier);
+
+			try {
+				applicaionRMMsg.addSOAPEnvelope();
+			} catch (AxisFault e) {
+				throw new SandeshaException(e.getMessage(), e);
+			}
+
+			// asking to send the application msssage
+			tempBean.setSend(true);
+			retransmitterMgr.update(tempBean);
+
+			// updating the message. this will correct the SOAP envelope string.
+			storageManager.updateMessageContext(key, applicationMsg);
+		}
+
+		SequenceManager.updateLastActivatedTime(sequencePropertyKey, storageManager);
+
+		createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
+				org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+
+		createSeqResponseRMMsgCtx.pause();
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage");
+	}
+
+	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+		if (log.isDebugEnabled()) {
+			log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
+			log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage");
+		}
+
+	}
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Tue Oct 10 21:31:51 2006
@@ -17,11 +17,13 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.workers.SenderWorker;
 import org.apache.sandesha2.wsrm.Address;
 import org.apache.sandesha2.wsrm.Identifier;
 import org.apache.sandesha2.wsrm.MakeConnection;
+import org.apache.sandesha2.wsrm.MessagePending;
 
 /**
  * This class is responsible for processing MakeConnection request messages that come to the system.
@@ -58,16 +60,31 @@
 			findSenderBean.setSequenceID(identifier.getIdentifier());
 		
 		//finding the beans that go with the criteria of the passed SenderBean
+		
+		//beans with reSend=true
+		findSenderBean.setReSend(true);
 		Collection collection = senderBeanMgr.find(findSenderBean);
 		
+		//beans with reSend=false
+		findSenderBean.setReSend (false);
+		Collection collection2 = senderBeanMgr.find(findSenderBean);
+		
+		//all possible beans
+		collection.addAll(collection2);
+		
 		//selecting a bean to send RANDOMLY. TODO- Should use a better mechanism.
 		int size = collection.size();
 		int itemToPick=-1;
 		
+		boolean pending = false;
 		if (size>0) {
 			Random random = new Random ();
 			itemToPick = random.nextInt(size);
 		}
+
+		if (size>1)
+			pending = true;  //there are more than one message to be delivered using the makeConnection.
+							 //So the MessagePending header should have value true;
 		
 		Iterator it = collection.iterator();
 		
@@ -91,8 +108,10 @@
 			
 		String messageStorageKey = senderBean.getMessageContextRefKey();
 		MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey,configurationContext);
+		RMMsgContext returnRMMsg = MsgInitializer.initializeMessage(returnMessage);
 		
-		addMessagePendingHeader ();
+		
+		addMessagePendingHeader (returnRMMsg,pending);
 		
 		setTransportProperties (returnMessage, rmMsgCtx);
 		
@@ -111,7 +130,13 @@
 		worker.run();
 	}
 	
-	private void addMessagePendingHeader (){
+	private void addMessagePendingHeader (RMMsgContext returnMessage, boolean pending) throws SandeshaException {
+		String rmNamespace = returnMessage.getRMNamespaceValue();
+		MessagePending messagePending = new MessagePending (rmNamespace);
+		messagePending.setPending(pending);
+		
+		messagePending.toSOAPEnvelope(returnMessage.getSOAPEnvelope());
+		
 	}
 
 	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java Tue Oct 10 21:31:51 2006
@@ -1,31 +1,30 @@
-/*
- * Copyright  1999-2004 The Apache Software Foundation.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import org.apache.axis2.AxisFault;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.SandeshaException;
-
-/**
- * The message processor interface.
- */
-
-public interface MsgProcessor {
-	public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.sandesha2.RMMsgContext;
+
+/**
+ * The message processor interface.
+ */
+
+public interface MsgProcessor {
+	public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Oct 10 21:31:51 2006
@@ -1,472 +1,470 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *  
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.MessageContextConstants;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.OutInAxisOperation;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.util.Utils;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.RMMsgCreator;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
-import org.apache.sandesha2.util.SpecSpecificConstants;
-import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-import org.apache.sandesha2.wsrm.TerminateSequence;
-
-/**
- * Responsible for processing an incoming Terminate Sequence message.
- */
-
-public class TerminateSeqMsgProcessor implements MsgProcessor {
-
-	private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
-
-	public void processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
-
-		if (log.isDebugEnabled())
-			log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
-
-		MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
-
-		// Processing the terminate message
-		// TODO Add terminate sequence message logic.
-		TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMsg
-				.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-		if (terminateSequence == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noTerminateSeqPart);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		String sequenceId = terminateSequence.getIdentifier().getIdentifier();
-		if (sequenceId == null || "".equals(sequenceId)) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidSequenceID, null);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-		
-		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(terminateSeqRMMsg);
-
-		ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
-		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropertyBeanMgr();
-		
-		// Check that the sender of this TerminateSequence holds the correct token
-		SequencePropertyBean tokenBean = sequencePropertyBeanMgr.retrieve(sequencePropertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
-		if(tokenBean != null) {
-			SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
-			OMElement body = terminateSeqRMMsg.getSOAPEnvelope().getBody();
-			SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
-			secManager.checkProofOfPossession(token, body, terminateSeqRMMsg.getMessageContext());
-		}
-
-		FaultManager faultManager = new FaultManager();
-		SandeshaException fault = faultManager.checkForUnknownSequence(terminateSeqRMMsg, sequenceId,
-				storageManager);
-		if (fault != null) {
-			throw fault;
-		}
-
-
-		SequencePropertyBean terminateReceivedBean = new SequencePropertyBean();
-		terminateReceivedBean.setSequencePropertyKey(sequencePropertyKey);
-		terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
-		terminateReceivedBean.setValue("true");
-
-		sequencePropertyBeanMgr.insert(terminateReceivedBean);
-
-		// add the terminate sequence response if required.
-		RMMsgContext terminateSequenceResponse = null;
-		if (SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
-			terminateSequenceResponse = getTerminateSequenceResponse(terminateSeqRMMsg, sequencePropertyKey, sequenceId, storageManager);
-
-		setUpHighestMsgNumbers(context, storageManager,sequencePropertyKey, sequenceId, terminateSeqRMMsg);
-
-		TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequencePropertyKey, sequenceId, storageManager);
-
-		SequencePropertyBean terminatedBean = new SequencePropertyBean(sequencePropertyKey,
-				Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED, Sandesha2Constants.VALUE_TRUE);
-
-		sequencePropertyBeanMgr.insert(terminatedBean);
-
-		// removing an entry from the listener
-		String transport = terminateSeqMsg.getTransportIn().getName().getLocalPart();
-
-		SequenceManager.updateLastActivatedTime(sequencePropertyKey, storageManager);
-
-		//sending the terminate sequence response
-		if (terminateSequenceResponse != null) {
-			
-			MessageContext outMessage = terminateSequenceResponse.getMessageContext();
-			EndpointReference toEPR = outMessage.getTo();
-			
-			AxisEngine engine = new AxisEngine(terminateSeqMsg
-					.getConfigurationContext());
-			engine.send(outMessage);
-
-			String addressingNamespaceURI = SandeshaUtil
-					.getSequenceProperty(
-							sequencePropertyKey,
-							Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,
-							storageManager);
-
-			String anonymousURI = SpecSpecificConstants
-					.getAddressingAnonymousURI(addressingNamespaceURI);
-
-			if (anonymousURI.equals(toEPR.getAddress())) {
-				terminateSeqMsg.getOperationContext().setProperty(
-						org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
-			} else {
-				terminateSeqMsg.getOperationContext().setProperty(
-						org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
-			}
-
-		}
-		
-		terminateSeqMsg.pause();
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: TerminateSeqMsgProcessor::processInMessage");
-	}
-
-	private void setUpHighestMsgNumbers(ConfigurationContext configCtx, StorageManager storageManager,
-			String requestSidesequencePropertyKey, String sequenceId, RMMsgContext terminateRMMsg) throws SandeshaException {
-
-		if (log.isDebugEnabled())
-			log.debug("Enter: TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
-
-		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
-		String highestImMsgNumberStr = SandeshaUtil.getSequenceProperty(requestSidesequencePropertyKey,
-				Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
-		String highestImMsgKey = SandeshaUtil.getSequenceProperty(requestSidesequencePropertyKey,
-				Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
-
-		long highestInMsgNo = 0;
-		if (highestImMsgNumberStr != null) {
-			if (highestImMsgKey == null)
-				throw new SandeshaException(SandeshaMessageHelper.getMessage(
-						SandeshaMessageKeys.highestMsgKeyNotStored, sequenceId));
-
-			highestInMsgNo = Long.parseLong(highestImMsgNumberStr);
-		}
-
-		// following will be valid only for the server side, since the obtained
-		// int. seq ID is only valid there.
-		String responseSideInternalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
-		
-		//sequencePropertyKey is equal to the internalSequenceId for the outgoing sequence.
-		String responseSideSequencePropertyKey = responseSideInternalSequenceId;
-		
-		long highestOutMsgNo = 0;
-		try {
-			boolean addResponseSideTerminate = false;
-			if (highestInMsgNo == 0) {
-				addResponseSideTerminate = false;
-			} else {
-
-				// setting the last in message property
-				SequencePropertyBean lastInMsgBean = new SequencePropertyBean(requestSidesequencePropertyKey,
-						Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO, highestImMsgNumberStr);
-				seqPropMgr.insert(lastInMsgBean);
-
-				MessageContext highestInMsg = storageManager.retrieveMessageContext(highestImMsgKey, configCtx);
-
-				// TODO get the out message in a storage friendly manner.
-				MessageContext highestOutMessage = highestInMsg.getOperationContext().getMessageContext(
-						OperationContextFactory.MESSAGE_LABEL_FAULT_VALUE);
-
-				if (highestOutMessage == null || highestOutMessage.getEnvelope() == null)
-					highestOutMessage = highestInMsg.getOperationContext().getMessageContext(
-							OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
-
-				if (highestOutMessage != null) {
-					if (highestOutMessage.getEnvelope() == null)
-						throw new SandeshaException(SandeshaMessageHelper
-								.getMessage(SandeshaMessageKeys.outMsgHasNoEnvelope));
-
-					RMMsgContext highestOutRMMsg = MsgInitializer.initializeMessage(highestOutMessage);
-					Sequence seqPartOfOutMsg = (Sequence) highestOutRMMsg
-							.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
-					if (seqPartOfOutMsg != null) {
-
-						// response message of the last in message can be
-						// considered as the last out message.
-						highestOutMsgNo = seqPartOfOutMsg.getMessageNumber().getMessageNumber();
-						SequencePropertyBean highestOutMsgBean = new SequencePropertyBean(
-								responseSideSequencePropertyKey ,
-								Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO, new Long(highestOutMsgNo)
-										.toString());
-
-						seqPropMgr.insert(highestOutMsgBean);
-						addResponseSideTerminate = true;
-					}
-				}
-			}
-
-			// If all the out message have been acked, add the outgoing
-			// terminate seq msg.
-			String outgoingSqunceID = SandeshaUtil.getSequenceProperty(responseSideSequencePropertyKey,
-					Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager);
-			if (addResponseSideTerminate && highestOutMsgNo > 0 && responseSideSequencePropertyKey != null
-					&& outgoingSqunceID != null) {
-				boolean allAcked = SandeshaUtil.isAllMsgsAckedUpto(highestOutMsgNo, responseSideSequencePropertyKey,
-						storageManager);
-
-				if (allAcked)
-					TerminateManager.addTerminateSequenceMessage(terminateRMMsg, outgoingSqunceID,
-							responseSideSequencePropertyKey, storageManager);
-			}
-		} catch (AxisFault e) {
-			throw new SandeshaException(e);
-		}
-		if (log.isDebugEnabled())
-			log.debug("Exit: TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
-	}
-
-	private RMMsgContext getTerminateSequenceResponse(RMMsgContext terminateSeqRMMsg, String sequencePropertyKey,String sequenceId,
-			StorageManager storageManager) throws AxisFault {
-
-		if (log.isDebugEnabled())
-			log.debug("Enter: TerminateSeqMsgProcessor::addTerminateSequenceResponse, " + sequenceId);
-
-		MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
-
-		MessageContext outMessage = null;
-
-		try {
-			outMessage = Utils.createOutMessageContext(terminateSeqMsg);
-		} catch (AxisFault e1) {
-			throw new SandeshaException(e1);
-		}
-
-		RMMsgContext terminateSeqResponseRMMsg = RMMsgCreator.createTerminateSeqResponseMsg(terminateSeqRMMsg,
-				outMessage, storageManager);
-
-		RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, sequencePropertyKey, 
-				sequenceId,	storageManager);
-		
-		Iterator iter = ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-		
-		if (iter.hasNext()) {
-			SequenceAcknowledgement seqAck = (SequenceAcknowledgement) iter.next();
-			if (seqAck==null) {
-				String message = "No SequenceAcknowledgement part is present";
-				throw new SandeshaException (message);
-			}
-		
-			terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, seqAck);
-		} else {
-			//TODO 
-		}
-		
-		terminateSeqResponseRMMsg.addSOAPEnvelope();
-
-		terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
-		terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-		outMessage.setResponseWritten(true);
-		
-		if (log.isDebugEnabled())
-			log.debug("Exit: TerminateSeqMsgProcessor::addTerminateSequenceResponse");
-
-		return terminateSeqResponseRMMsg;
-
-
-	}
-
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
-
-		if (log.isDebugEnabled())
-			log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
-
-		MessageContext msgContext = rmMsgCtx.getMessageContext();
-		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
-		Options options = msgContext.getOptions();
-
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
-				configurationContext.getAxisConfiguration());
-
-		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
-		String toAddress = rmMsgCtx.getTo().getAddress();
-		String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-		String internalSeqenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
-		String outSequenceID = SandeshaUtil.getSequenceProperty(internalSeqenceID,
-				Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager);
-		if (outSequenceID == null)
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSeqenceID));
-
-		// / Transaction addTerminateSeqTransaction =
-		// storageManager.getTransaction();
-
-		String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
-				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
-
-		// registring an InOutOperationContext for this.
-		// since the serviceContext.fireAndForget only sets a inOnly One
-		// this does not work when there is a terminateSequnceResponse
-		// TODO do processing of terminateMessagesCorrectly., create a new
-		// message instead of sendign the one given by the serviceClient
-		// TODO important
-
-		AxisOperation outInAxisOp = new OutInAxisOperation(new QName("temp"));
-
-		AxisOperation referenceInOutOperation = msgContext.getAxisService()
-				.getOperation(
-						new QName(Sandesha2Constants.RM_IN_OUT_OPERATION_NAME));
-		if (referenceInOutOperation == null) {
-			String messge = "Cant find the recerence RM InOut operation";
-			throw new SandeshaException(messge);
-		}
-
-		outInAxisOp.setParent(msgContext.getAxisService());
-		// setting flows
-		// outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation.getRemainingPhasesInFlow());
-		outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation
-				.getRemainingPhasesInFlow());
-
-		OperationContext opcontext = OperationContextFactory
-				.createOperationContext(
-						WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, outInAxisOp);
-		opcontext.setParent(msgContext.getServiceContext());
-		configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),
-				opcontext);
-
-		msgContext.setOperationContext(opcontext);
-		msgContext.setAxisOperation(outInAxisOp);
-		
-		if (terminated != null && "true".equals(terminated)) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
-			log.debug(message);
-			return;
-		}
-
-		TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
-				.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-		terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
-
-		rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
-		msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-		rmMsgCtx.setTo(new EndpointReference(toAddress));
-
-		String rmVersion = SandeshaUtil.getRMVersion(internalSeqenceID, storageManager);
-		if (rmVersion == null)
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
-		rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
-		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
-
-		String transportTo = SandeshaUtil.getSequenceProperty(internalSeqenceID,
-				Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
-		if (transportTo != null) {
-			rmMsgCtx.setProperty(MessageContextConstants.TRANSPORT_URL, transportTo);
-		}
-
-		try {
-			rmMsgCtx.addSOAPEnvelope();
-		} catch (AxisFault e) {
-			throw new SandeshaException(e.getMessage());
-		}
-
-		String key = SandeshaUtil.getUUID();
-
-		SenderBean terminateBean = new SenderBean();
-		terminateBean.setMessageContextRefKey(key);
-
-		storageManager.storeMessageContext(key, msgContext);
-
-		// Set a retransmitter lastSentTime so that terminate will be send with
-		// some delay.
-		// Otherwise this get send before return of the current request (ack).
-		// TODO: refine the terminate delay.
-		terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
-
-		terminateBean.setMessageID(msgContext.getMessageID());
-		
-		EndpointReference to = msgContext.getTo();
-		if (to!=null)
-			terminateBean.setToAddress(to.getAddress());
-		
-		// this will be set to true at the sender.
-		terminateBean.setSend(true);
-
-		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
-		terminateBean.setReSend(false);
-
-		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
-		retramsmitterMgr.insert(terminateBean);
-
-		SequencePropertyBean terminateAdded = new SequencePropertyBean();
-		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
-		terminateAdded.setSequencePropertyKey(outSequenceID);
-		terminateAdded.setValue("true");
-
-		seqPropMgr.insert(terminateAdded);
-
-		rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
-
-		SandeshaUtil.executeAndStore(rmMsgCtx, key);
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage");
-	}
-
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.OutInAxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.Utils;
+import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+/**
+ * Responsible for processing an incoming Terminate Sequence message.
+ */
+
+public class TerminateSeqMsgProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
+
+	public void processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
+
+		MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
+
+		// Processing the terminate message
+		// TODO Add terminate sequence message logic.
+		TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMsg
+				.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+		if (terminateSequence == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noTerminateSeqPart);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		String sequenceId = terminateSequence.getIdentifier().getIdentifier();
+		if (sequenceId == null || "".equals(sequenceId)) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidSequenceID, null);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+		
+		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(terminateSeqRMMsg);
+
+		ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropertyBeanMgr();
+		
+		// Check that the sender of this TerminateSequence holds the correct token
+		SequencePropertyBean tokenBean = sequencePropertyBeanMgr.retrieve(sequencePropertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+		if(tokenBean != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+			OMElement body = terminateSeqRMMsg.getSOAPEnvelope().getBody();
+			SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+			secManager.checkProofOfPossession(token, body, terminateSeqRMMsg.getMessageContext());
+		}
+
+		FaultManager faultManager = new FaultManager();
+		SandeshaException fault = faultManager.checkForUnknownSequence(terminateSeqRMMsg, sequenceId,
+				storageManager);
+		if (fault != null) {
+			throw fault;
+		}
+
+
+		SequencePropertyBean terminateReceivedBean = new SequencePropertyBean();
+		terminateReceivedBean.setSequencePropertyKey(sequencePropertyKey);
+		terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
+		terminateReceivedBean.setValue("true");
+
+		sequencePropertyBeanMgr.insert(terminateReceivedBean);
+
+		// add the terminate sequence response if required.
+		RMMsgContext terminateSequenceResponse = null;
+		if (SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
+			terminateSequenceResponse = getTerminateSequenceResponse(terminateSeqRMMsg, sequencePropertyKey, sequenceId, storageManager);
+
+		setUpHighestMsgNumbers(context, storageManager,sequencePropertyKey, sequenceId, terminateSeqRMMsg);
+
+		TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequencePropertyKey, sequenceId, storageManager);
+
+		SequencePropertyBean terminatedBean = new SequencePropertyBean(sequencePropertyKey,
+				Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED, Sandesha2Constants.VALUE_TRUE);
+
+		sequencePropertyBeanMgr.insert(terminatedBean);
+
+		SequenceManager.updateLastActivatedTime(sequencePropertyKey, storageManager);
+
+		//sending the terminate sequence response
+		if (terminateSequenceResponse != null) {
+			
+			MessageContext outMessage = terminateSequenceResponse.getMessageContext();
+			EndpointReference toEPR = outMessage.getTo();
+			
+			AxisEngine engine = new AxisEngine(terminateSeqMsg
+					.getConfigurationContext());
+			
+			
+			outMessage.setServerSide(false);
+			engine.send(outMessage);
+
+			String addressingNamespaceURI = SandeshaUtil
+					.getSequenceProperty(
+							sequencePropertyKey,
+							Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,
+							storageManager);
+
+			String anonymousURI = SpecSpecificConstants
+					.getAddressingAnonymousURI(addressingNamespaceURI);
+
+			if (anonymousURI.equals(toEPR.getAddress())) {
+				terminateSeqMsg.getOperationContext().setProperty(
+						org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+			} else {
+				terminateSeqMsg.getOperationContext().setProperty(
+						org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+			}
+
+		}
+		
+		terminateSeqMsg.pause();
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: TerminateSeqMsgProcessor::processInMessage");
+	}
+
+	private void setUpHighestMsgNumbers(ConfigurationContext configCtx, StorageManager storageManager,
+			String requestSidesequencePropertyKey, String sequenceId, RMMsgContext terminateRMMsg) throws SandeshaException {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
+
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+
+		String highestImMsgNumberStr = SandeshaUtil.getSequenceProperty(requestSidesequencePropertyKey,
+				Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
+		String highestImMsgKey = SandeshaUtil.getSequenceProperty(requestSidesequencePropertyKey,
+				Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
+
+		long highestInMsgNo = 0;
+		if (highestImMsgNumberStr != null) {
+			if (highestImMsgKey == null)
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(
+						SandeshaMessageKeys.highestMsgKeyNotStored, sequenceId));
+
+			highestInMsgNo = Long.parseLong(highestImMsgNumberStr);
+		}
+
+		// following will be valid only for the server side, since the obtained
+		// int. seq ID is only valid there.
+		String responseSideInternalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
+		
+		//sequencePropertyKey is equal to the internalSequenceId for the outgoing sequence.
+		String responseSideSequencePropertyKey = responseSideInternalSequenceId;
+		
+		long highestOutMsgNo = 0;
+		try {
+			boolean addResponseSideTerminate = false;
+			if (highestInMsgNo == 0) {
+				addResponseSideTerminate = false;
+			} else {
+
+				// setting the last in message property
+				SequencePropertyBean lastInMsgBean = new SequencePropertyBean(requestSidesequencePropertyKey,
+						Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO, highestImMsgNumberStr);
+				seqPropMgr.insert(lastInMsgBean);
+
+				MessageContext highestInMsg = storageManager.retrieveMessageContext(highestImMsgKey, configCtx);
+
+				// TODO get the out message in a storage friendly manner.
+				MessageContext highestOutMessage = highestInMsg.getOperationContext().getMessageContext(
+						OperationContextFactory.MESSAGE_LABEL_FAULT_VALUE);
+
+				if (highestOutMessage == null || highestOutMessage.getEnvelope() == null)
+					highestOutMessage = highestInMsg.getOperationContext().getMessageContext(
+							OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
+
+				if (highestOutMessage != null) {
+					if (highestOutMessage.getEnvelope() == null)
+						throw new SandeshaException(SandeshaMessageHelper
+								.getMessage(SandeshaMessageKeys.outMsgHasNoEnvelope));
+
+					RMMsgContext highestOutRMMsg = MsgInitializer.initializeMessage(highestOutMessage);
+					Sequence seqPartOfOutMsg = (Sequence) highestOutRMMsg
+							.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+
+					if (seqPartOfOutMsg != null) {
+
+						// response message of the last in message can be
+						// considered as the last out message.
+						highestOutMsgNo = seqPartOfOutMsg.getMessageNumber().getMessageNumber();
+						SequencePropertyBean highestOutMsgBean = new SequencePropertyBean(
+								responseSideSequencePropertyKey ,
+								Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO, new Long(highestOutMsgNo)
+										.toString());
+
+						seqPropMgr.insert(highestOutMsgBean);
+						addResponseSideTerminate = true;
+					}
+				}
+			}
+
+			// If all the out message have been acked, add the outgoing
+			// terminate seq msg.
+			String outgoingSqunceID = SandeshaUtil.getSequenceProperty(responseSideSequencePropertyKey,
+					Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager);
+			if (addResponseSideTerminate && highestOutMsgNo > 0 && responseSideSequencePropertyKey != null
+					&& outgoingSqunceID != null) {
+				boolean allAcked = SandeshaUtil.isAllMsgsAckedUpto(highestOutMsgNo, responseSideSequencePropertyKey,
+						storageManager);
+
+				if (allAcked)
+					TerminateManager.addTerminateSequenceMessage(terminateRMMsg, outgoingSqunceID,
+							responseSideSequencePropertyKey, storageManager);
+			}
+		} catch (AxisFault e) {
+			throw new SandeshaException(e);
+		}
+		if (log.isDebugEnabled())
+			log.debug("Exit: TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
+	}
+
+	private RMMsgContext getTerminateSequenceResponse(RMMsgContext terminateSeqRMMsg, String sequencePropertyKey,String sequenceId,
+			StorageManager storageManager) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: TerminateSeqMsgProcessor::addTerminateSequenceResponse, " + sequenceId);
+
+		MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
+
+		MessageContext outMessage = null;
+
+		try {
+			outMessage = Utils.createOutMessageContext(terminateSeqMsg);
+		} catch (AxisFault e1) {
+			throw new SandeshaException(e1);
+		}
+
+		RMMsgContext terminateSeqResponseRMMsg = RMMsgCreator.createTerminateSeqResponseMsg(terminateSeqRMMsg,
+				outMessage, storageManager);
+
+		RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, sequencePropertyKey, 
+				sequenceId,	storageManager);
+		
+		Iterator iter = ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+		
+		if (iter.hasNext()) {
+			SequenceAcknowledgement seqAck = (SequenceAcknowledgement) iter.next();
+			if (seqAck==null) {
+				String message = "No SequenceAcknowledgement part is present";
+				throw new SandeshaException (message);
+			}
+		
+			terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, seqAck);
+		} else {
+			//TODO 
+		}
+		
+		terminateSeqResponseRMMsg.addSOAPEnvelope();
+
+		terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+		terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		outMessage.setResponseWritten(true);
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: TerminateSeqMsgProcessor::addTerminateSequenceResponse");
+
+		return terminateSeqResponseRMMsg;
+
+
+	}
+
+	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
+
+		MessageContext msgContext = rmMsgCtx.getMessageContext();
+		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
+		Options options = msgContext.getOptions();
+
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+				configurationContext.getAxisConfiguration());
+
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+
+		String toAddress = rmMsgCtx.getTo().getAddress();
+		String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+		String internalSeqenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
+
+		String outSequenceID = SandeshaUtil.getSequenceProperty(internalSeqenceID,
+				Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager);
+		if (outSequenceID == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSeqenceID));
+
+		// / Transaction addTerminateSeqTransaction =
+		// storageManager.getTransaction();
+
+		String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
+				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
+
+		// registring an InOutOperationContext for this.
+		// since the serviceContext.fireAndForget only sets a inOnly One
+		// this does not work when there is a terminateSequnceResponse
+		// TODO do processing of terminateMessagesCorrectly., create a new
+		// message instead of sendign the one given by the serviceClient
+		// TODO important
+
+		AxisOperation outInAxisOp = new OutInAxisOperation(new QName("temp"));
+
+		AxisOperation referenceInOutOperation = msgContext.getAxisService()
+				.getOperation(
+						new QName(Sandesha2Constants.RM_IN_OUT_OPERATION_NAME));
+		if (referenceInOutOperation == null) {
+			String messge = "Cant find the recerence RM InOut operation";
+			throw new SandeshaException(messge);
+		}
+
+		outInAxisOp.setParent(msgContext.getAxisService());
+		// setting flows
+		// outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation.getRemainingPhasesInFlow());
+		outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation
+				.getRemainingPhasesInFlow());
+
+		OperationContext opcontext = OperationContextFactory
+				.createOperationContext(
+						WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, outInAxisOp);
+		opcontext.setParent(msgContext.getServiceContext());
+		configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),
+				opcontext);
+
+		msgContext.setOperationContext(opcontext);
+		msgContext.setAxisOperation(outInAxisOp);
+		
+		if (terminated != null && "true".equals(terminated)) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
+			log.debug(message);
+			return;
+		}
+
+		TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
+				.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+		terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
+
+		rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+		msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		rmMsgCtx.setTo(new EndpointReference(toAddress));
+
+		String rmVersion = SandeshaUtil.getRMVersion(internalSeqenceID, storageManager);
+		if (rmVersion == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+		rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+		String transportTo = SandeshaUtil.getSequenceProperty(internalSeqenceID,
+				Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
+		if (transportTo != null) {
+			rmMsgCtx.setProperty(MessageContextConstants.TRANSPORT_URL, transportTo);
+		}
+
+		try {
+			rmMsgCtx.addSOAPEnvelope();
+		} catch (AxisFault e) {
+			throw new SandeshaException(e.getMessage(),e);
+		}
+
+		String key = SandeshaUtil.getUUID();
+
+		SenderBean terminateBean = new SenderBean();
+		terminateBean.setMessageContextRefKey(key);
+
+		storageManager.storeMessageContext(key, msgContext);
+
+		// Set a retransmitter lastSentTime so that terminate will be send with
+		// some delay.
+		// Otherwise this get send before return of the current request (ack).
+		// TODO: refine the terminate delay.
+		terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
+
+		terminateBean.setMessageID(msgContext.getMessageID());
+		
+		EndpointReference to = msgContext.getTo();
+		if (to!=null)
+			terminateBean.setToAddress(to.getAddress());
+		
+		// this will be set to true at the sender.
+		terminateBean.setSend(true);
+
+		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+		terminateBean.setReSend(false);
+
+		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+		retramsmitterMgr.insert(terminateBean);
+
+		SequencePropertyBean terminateAdded = new SequencePropertyBean();
+		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+		terminateAdded.setSequencePropertyKey(outSequenceID);
+		terminateAdded.setValue("true");
+
+		seqPropMgr.insert(terminateAdded);
+
+		rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+
+		SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage");
+	}
+
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Tue Oct 10 21:31:51 2006
@@ -1,90 +1,89 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *  
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.TerminateSequence;
-import org.apache.sandesha2.wsrm.TerminateSequenceResponse;
-
-/**
- * To process terminate sequence response messages.
- */
-public class TerminateSeqResponseMsgProcessor implements MsgProcessor {
-
-	private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
-	
-	public void processInMessage(RMMsgContext terminateResRMMsg)
-			throws AxisFault { 
-		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
-		
-		MessageContext msgContext = terminateResRMMsg.getMessageContext();
-		ConfigurationContext context = terminateResRMMsg.getConfigurationContext();
-		
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
-		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropertyBeanMgr();
-		
-		TerminateSequenceResponse tsResponse = (TerminateSequenceResponse)
-		  terminateResRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
-		
-		String sequenceId = tsResponse.getIdentifier().getIdentifier();
-		String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceId,
-				Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
-		msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceID);
-		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(terminateResRMMsg);
-
-		// Check that the sender of this TerminateSequence holds the correct token
-		SequencePropertyBean tokenBean = sequencePropertyBeanMgr.retrieve(sequencePropertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
-		if(tokenBean != null) {
-			SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
-			OMElement body = terminateResRMMsg.getSOAPEnvelope().getBody();
-			SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
-			secManager.checkProofOfPossession(token, body, msgContext);
-		}
-
-		ConfigurationContext configContext = msgContext.getConfigurationContext();
-
-
-		TerminateManager.terminateSendingSide (configContext, sequencePropertyKey,internalSequenceID, msgContext.isServerSide(),
-				storageManager);
-		
-		// Stop this message travelling further through the Axis runtime
-		terminateResRMMsg.pause();
-
-		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage");
-  }
-
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
-		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
-		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage");
-	}
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.TerminateSequenceResponse;
+
+/**
+ * To process terminate sequence response messages.
+ */
+public class TerminateSeqResponseMsgProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
+	
+	public void processInMessage(RMMsgContext terminateResRMMsg)
+			throws AxisFault { 
+		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
+		
+		MessageContext msgContext = terminateResRMMsg.getMessageContext();
+		ConfigurationContext context = terminateResRMMsg.getConfigurationContext();
+		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropertyBeanMgr();
+		
+		TerminateSequenceResponse tsResponse = (TerminateSequenceResponse)
+		  terminateResRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
+		
+		String sequenceId = tsResponse.getIdentifier().getIdentifier();
+		String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceId,
+				Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
+		msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceID);
+		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(terminateResRMMsg);
+
+		// Check that the sender of this TerminateSequence holds the correct token
+		SequencePropertyBean tokenBean = sequencePropertyBeanMgr.retrieve(sequencePropertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+		if(tokenBean != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+			OMElement body = terminateResRMMsg.getSOAPEnvelope().getBody();
+			SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+			secManager.checkProofOfPossession(token, body, msgContext);
+		}
+
+		ConfigurationContext configContext = msgContext.getConfigurationContext();
+
+
+		TerminateManager.terminateSendingSide (configContext, sequencePropertyKey,internalSequenceID, msgContext.isServerSide(),
+				storageManager);
+		
+		// Stop this message travelling further through the Axis runtime
+		terminateResRMMsg.pause();
+
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage");
+  }
+
+	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage");
+	}
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java Tue Oct 10 21:31:51 2006
@@ -62,6 +62,13 @@
 	 */
 	private String createSequenceMsgStoreKey;
 	
+	/**
+	 * This is stored here, so that the message pointed by this can be used as a reference when Sandesha
+	 * want the generate new messages. (e.g. MakeConnection). Create sequence message could not be used 
+	 * here since it may be subjected to things like encryption.
+	 */
+	private String referenceMessageStoreKey;
+	
 	public CreateSeqBean() {
 	}
 
@@ -104,6 +111,15 @@
 
 	public void setCreateSequenceMsgStoreKey(String createSequenceMsgStoreKey) {
 		this.createSequenceMsgStoreKey = createSequenceMsgStoreKey;
+	}
+
+
+	public String getReferenceMessageStoreKey() {
+		return referenceMessageStoreKey;
+	}
+
+	public void setReferenceMessageStoreKey(String referenceMessageStoreKey) {
+		this.referenceMessageStoreKey = referenceMessageStoreKey;
 	}
 	
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Tue Oct 10 21:31:51 2006
@@ -123,8 +123,8 @@
 			if (bean.isSend() != temp.isSend())
 				add = false;
 
-//			if (bean.isReSend() != temp.isReSend())
-//				add = false;
+			if (bean.isReSend() != temp.isReSend())
+				add = false;
 			
 			if (add)
 				beans.add(temp);



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org