You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/03/17 06:02:01 UTC

svn commit: r386537 [1/2] - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/ client/reports/ handlers/ msgprocessors/ storage/beanmanagers/ storage/inmemory/ util/ workers/

Author: chamikara
Date: Thu Mar 16 21:01:58 2006
New Revision: 386537

URL: http://svn.apache.org/viewcvs?rev=386537&view=rev
Log:
Updates to support most of the WSRX scenarios 1.1 upto 2.3
Correnctions to the Policy handling code.

Removed:
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/SequenceReport.java
Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Thu Mar 16 21:01:58 2006
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 
 import org.apache.axis2.AxisFault;
@@ -46,6 +47,7 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaPropertyBean;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.AcknowledgementRange;
 import org.apache.sandesha2.wsrm.Sequence;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 import org.apache.ws.commons.soap.SOAPEnvelope;
@@ -196,7 +198,7 @@
 		
 		ArrayList completedMsgList = null;
 		if (completedMessagesBean!=null) {
-			completedMsgList = SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
+			completedMsgList = SandeshaUtil.getArrayListFromMsgsString (completedMessagesBean.getValue());
 		} else {
 			String message = "Completed messages bean is null, for the sequence " + sequenceID;
 			throw new SandeshaException (message);
@@ -295,11 +297,7 @@
 
 			referenceRMMessage.getMessageContext().setProperty(
 					Sandesha2Constants.ACK_WRITTEN, "true");
-//			try {
-//				engine.send(ackRMMsgCtx.getMessageContext());
-//			} catch (AxisFault e1) {
-//				throw new SandeshaException(e1.getMessage());
-//			}
+			
 			return ackRMMsgCtx;
 			
 		} else {
@@ -311,9 +309,6 @@
 
 			String key = SandeshaUtil.getUUID();
 			
-			//dumping to the storage will be done be Sandesha2 Transport Sender
-			//storageManager.storeMessageContext(key,ackMsgCtx);
-			
 			SenderBean ackBean = new SenderBean();
 			ackBean.setMessageContextRefKey(key);
 			ackBean.setMessageID(ackMsgCtx.getMessageID());
@@ -326,32 +321,7 @@
 					Sandesha2Constants.VALUE_FALSE);
 			
 			ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-			
-			//the internalSequenceId value of the retransmitter Table for the
-			// messages related to an incoming
-			//sequence is the actual sequence ID
-
-//			RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
-//					.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
-		
-//			long ackInterval = PropertyManager.getInstance()
-//					.getAcknowledgementInterval();
-			
-			Parameter param = referenceMsg.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
-			
-			SandeshaPropertyBean propertyBean = null;
-			if (param!=null) {
-				propertyBean = (SandeshaPropertyBean)  param.getValue();
-			}else {
-				propertyBean = PropertyManager.getInstance().getPropertyBean();
-			}
-			
-			
-			long ackInterval = propertyBean.getAcknowledgementInaterval();
-			
-			//			if (policyBean != null) {
-//				ackInterval = policyBean.getAcknowledgementInaterval();
-//			}
+			long ackInterval = SandeshaUtil.getPropretyBean(referenceMsg).getAcknowledgementInaterval();
 			
 			//Ack will be sent as stand alone, only after the retransmitter
 			// interval.
@@ -374,40 +344,50 @@
 			}
 			
 			ackBean.setTimeToSend(timeToSend);
-
 			storageManager.storeMessageContext(key,ackMsgCtx);
 			
 			//inserting the new ack.
 			retransmitterBeanMgr.insert(ackBean);
-
 			asyncAckTransaction.commit();
 
 			//passing the message through sandesha2sender
-
 			ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
 			ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
-			
 			ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
-			
 			ackMsgCtx.setTransportOut(new Sandesha2TransportOutDesc ());
-			
-//			AxisEngine engine = new AxisEngine (configurationContext);
-//			try {
-//				engine.send(ackMsgCtx);
-//			} catch (AxisFault e) {
-//				throw new SandeshaException (e.getMessage());
-//			}
-			
 			RMMsgContext ackRMMessageCtx = MsgInitializer.initializeMessage(ackMsgCtx);
-			
-			SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
-			
+			SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);	
 			referenceMsg.pause(); 
-			
 			return ackRMMessageCtx;
+		}	
+	}
+	
+	public static boolean verifySequenceCompletion(Iterator ackRangesIterator,
+			long lastMessageNo) {
+		HashMap startMap = new HashMap();
+
+		while (ackRangesIterator.hasNext()) {
+			AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator
+					.next();
+			startMap.put(new Long(temp.getLowerValue()), temp);
 		}
-		
-		
-		
+
+		long start = 1;
+		boolean loop = true;
+		while (loop) {
+			AcknowledgementRange temp = (AcknowledgementRange) startMap
+					.get(new Long(start));
+			if (temp == null) {
+				loop = false;
+				continue;
+			}
+
+			if (temp.getUpperValue() >= lastMessageNo)
+				return true;
+
+			start = temp.getUpperValue() + 1;
+		}
+
+		return false;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Thu Mar 16 21:01:58 2006
@@ -77,7 +77,7 @@
 			
 			String ACTION_TERMINATE_SEQUENCE_RESPONSE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/TerminateSequenceResponse";
 			
-			String ACTION_ACK_REQUEST = "http://docs.oasis-open.org/ws-rx/wsrm/200602/AckRequested";
+			String ACTION_ACK_REQUEST = "http://docs.oasis-open.org/ws-rx/wsrm/200510/AckRequested";
 			
 			String ACTION_CLOSE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CloseSequence";
 			
@@ -250,8 +250,6 @@
 
 		String NEXT_MESSAGE_NUMBER = "NextMsgNo";
 
-		String LAST_OUT_MESSAGE = "LastOutMessage";
-
 		String INCOMING_SEQUENCE_LIST = "IncomingSequenceList";
 
 		String CHECK_RESPONSE = "CheckResponse";
@@ -280,9 +278,17 @@
 
 		String REQUEST_SIDE_SEQUENCE_ID = "RequestSideSequenceID"; 		//used only at the server side
 		
-		String HIGHEST_MSG_NO = "HighestMessageNumber";
+		String HIGHEST_IN_MSG_NUMBER = "HighestInMsgNumber";
+		
+		String HIGHEST_IN_MSG_KEY = "HighestInMsgKey";
+		
+		String HIGHEST_OUT_MSG_NUMBER = "HighestOutMsgNumber";
+		
+		String HIGHEST_OUT_MSG_KEY = "HighestOutMsgKey";
 		
+		String LAST_OUT_MESSAGE_NO = "LastOutMessage";
 		
+		String LAST_IN_MESSAGE_NO = "LastInMessage";
 	}
 
 	public interface SOAPVersion {
@@ -443,8 +449,6 @@
 	
 	String VALUE_FALSE = "false";
 	
-	String SANDESHA2_INTERNAL_SEQUENCE_ID = "Sandesha2IntSeq";
-	
 	String MESSAGE_STORE_KEY = "Sandesha2MessageStoreKey";
 
 	String ORIGINAL_TRANSPORT_OUT_DESC = "Sandesha2OriginalTransportSender";
@@ -462,4 +466,9 @@
 	String INTERNAL_SEQUENCE_PREFIX = "Sandesha2InternalSequence";
 	
 	String SANDESHA2_POLICY_BEAN = "Sandesha2PolicyBean";
+	
+	String LIST_SEPERATOR = ",";
+	
+	String LIST_PART_SEPERATOR = "-";
+	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Thu Mar 16 21:01:58 2006
@@ -169,10 +169,10 @@
 	}
 	
 	private static boolean isRequiredForResponseSide (String name) {
-		if (name==null && name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE))
+		if (name==null && name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO))
 			return false;
 		
-		if (name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE))
+		if (name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO))
 			return false;
 		
 		return false;
@@ -331,6 +331,234 @@
 		SandeshaUtil.stopSenderForTheSequence(internalSequenceId);
 	}
 	
+//	public void terminateSequence (String outSequenceID,String internalSequenceID,ConfigurationContext configCtx) {
+//
+//		
+//		StorageManager storageManager = SandeshaUtil
+//				.getSandeshaStorageManager(configCtx);
+//
+//		Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+//		
+//		SequencePropertyBeanMgr seqPropMgr = storageManager
+//				.getSequencePropretyBeanMgr();
+//
+//		SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceID,
+//				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+//
+//		if (terminated != null && terminated.getValue() != null
+//				&& "true".equals(terminated.getValue())) {
+//			String message = "Terminate was added previously.";
+//			log.info(message);
+//			return;
+//		}
+//
+//		RMMsgContext terminateRMMessage = RMMsgCreator
+//				.createTerminateSequenceMessage(outSequenceID);
+//		terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+//		terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+//		
+//		SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceID,
+//				Sandesha2Constants.SequenceProperties.TO_EPR);
+//
+//		EndpointReference toEPR = new EndpointReference ( toBean.getValue());
+//		if (toEPR == null) {
+//			String message = "To EPR has an invalid value";
+//			throw new SandeshaException(message);
+//		}
+//
+//		terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
+//		terminateRMMessage.setFrom(new EndpointReference(
+//				Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+//		terminateRMMessage.setFaultTo(new EndpointReference(
+//				Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+//		
+//		String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID,configCtx);
+//		if (rmVersion==null)
+//			throw new SandeshaException ("Cant find the rmVersion of the given message");
+//		terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+//		terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+//
+//		SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+//		if (transportToBean!=null) {
+//			terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
+//		}
+//		
+//		try {
+//			terminateRMMessage.addSOAPEnvelope();
+//		} catch (AxisFault e) {
+//			throw new SandeshaException(e.getMessage());
+//		}
+//
+//		String key = SandeshaUtil.getUUID();
+//		
+//		SenderBean terminateBean = new SenderBean();
+//		terminateBean.setMessageContextRefKey(key);
+//
+//		
+//		storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext());
+//
+//		
+//		//Set a retransmitter lastSentTime so that terminate will be send with
+//		// some delay.
+//		//Otherwise this get send before return of the current request (ack).
+//		//TODO: refine the terminate delay.
+//		terminateBean.setTimeToSend(System.currentTimeMillis()
+//				+ Sandesha2Constants.TERMINATE_DELAY);
+//
+//		terminateBean.setMessageID(terminateRMMessage.getMessageId());
+//		
+//		//this will be set to true at the sender.
+//		terminateBean.setSend(true);
+//		
+//		terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+//				Sandesha2Constants.VALUE_FALSE);
+//		
+//		terminateBean.setReSend(false);
+//
+//		SenderBeanMgr retramsmitterMgr = storageManager
+//				.getRetransmitterBeanMgr();
+//
+//		retramsmitterMgr.insert(terminateBean);
+//		
+//		SequencePropertyBean terminateAdded = new SequencePropertyBean();
+//		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+//		terminateAdded.setSequenceID(outSequenceID);
+//		terminateAdded.setValue("true");
+//
+//		seqPropMgr.insert(terminateAdded);
+//		
+//		//This should be dumped to the storage by the sender
+//		TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
+//		terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+//		terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+//		terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+//		terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
+//		addTerminateSeqTransaction.commit();
+//		
+//	    AxisEngine engine = new AxisEngine (incomingAckRMMsg.getMessageContext().getConfigurationContext());
+//	    try {
+//			engine.send(terminateRMMessage.getMessageContext());
+//		} catch (AxisFault e) {
+//			throw new SandeshaException (e.getMessage());
+//		}
+//	    
+//	}
 
+	
+	public static void addTerminateSequenceMessage(RMMsgContext referenceMessage,
+			String outSequenceId, String internalSequenceId)
+			throws SandeshaException {
+
+		
+		ConfigurationContext configurationContext = referenceMessage.getMessageContext().getConfigurationContext();
+		StorageManager storageManager = SandeshaUtil
+				.getSandeshaStorageManager(configurationContext);
+
+		Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+		
+		SequencePropertyBeanMgr seqPropMgr = storageManager
+				.getSequencePropretyBeanMgr();
+
+		SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceId,
+				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+
+		if (terminated != null && terminated.getValue() != null
+				&& "true".equals(terminated.getValue())) {
+			String message = "Terminate was added previously.";
+			log.info(message);
+//			return;
+		}
+
+		RMMsgContext terminateRMMessage = RMMsgCreator
+				.createTerminateSequenceMessage(referenceMessage, outSequenceId,internalSequenceId);
+		terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+		terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+		
+		SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceId,
+				Sandesha2Constants.SequenceProperties.TO_EPR);
+
+		EndpointReference toEPR = new EndpointReference ( toBean.getValue());
+		if (toEPR == null) {
+			String message = "To EPR has an invalid value";
+			throw new SandeshaException(message);
+		}
+
+		terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
+		terminateRMMessage.setFrom(new EndpointReference(
+				Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+		terminateRMMessage.setFaultTo(new EndpointReference(
+				Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+		
+		String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,configurationContext);
+		if (rmVersion==null)
+			throw new SandeshaException ("Cant find the rmVersion of the given message");
+		terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+		terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+		SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+		if (transportToBean!=null) {
+			terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
+		}
+		
+		try {
+			terminateRMMessage.addSOAPEnvelope();
+		} catch (AxisFault e) {
+			throw new SandeshaException(e.getMessage());
+		}
+
+		String key = SandeshaUtil.getUUID();
+		
+		SenderBean terminateBean = new SenderBean();
+		terminateBean.setMessageContextRefKey(key);
+
+		
+		storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext());
+
+		
+		//Set a retransmitter lastSentTime so that terminate will be send with
+		// some delay.
+		//Otherwise this get send before return of the current request (ack).
+		//TODO: refine the terminate delay.
+		terminateBean.setTimeToSend(System.currentTimeMillis()
+				+ Sandesha2Constants.TERMINATE_DELAY);
+
+		terminateBean.setMessageID(terminateRMMessage.getMessageId());
+		
+		//this will be set to true at the sender.
+		terminateBean.setSend(true);
+		
+		terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+				Sandesha2Constants.VALUE_FALSE);
+		
+		terminateBean.setReSend(false);
+
+		SenderBeanMgr retramsmitterMgr = storageManager
+				.getRetransmitterBeanMgr();
+
+		retramsmitterMgr.insert(terminateBean);
+		
+		SequencePropertyBean terminateAdded = new SequencePropertyBean();
+		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+		terminateAdded.setSequenceID(outSequenceId);
+		terminateAdded.setValue("true");
+
+		seqPropMgr.insert(terminateAdded);
+		
+		//This should be dumped to the storage by the sender
+		TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
+		terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+		terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+		terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+		terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
+		addTerminateSeqTransaction.commit();
+		
+	    AxisEngine engine = new AxisEngine (configurationContext);
+	    try {
+			engine.send(terminateRMMessage.getMessageContext());
+		} catch (AxisFault e) {
+			throw new SandeshaException (e.getMessage());
+		}
+	    
+	}
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java Thu Mar 16 21:01:58 2006
@@ -20,12 +20,18 @@
 import java.util.Collection;
 import java.util.Iterator;
 
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.AcknowledgementManager;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.SpecSpecificConstants;
 import org.apache.sandesha2.client.reports.RMReport;
 import org.apache.sandesha2.client.reports.SequenceReport;
 import org.apache.sandesha2.storage.StorageManager;
@@ -38,6 +44,14 @@
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.ws.commons.om.OMException;
+import org.apache.ws.commons.soap.SOAP12Constants;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
+import org.apache.ws.commons.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.ws.commons.soap.impl.llom.soap12.SOAP12Factory;
 
 /**
  * Contains all the Sandesha2Constants of Sandesha2.
@@ -199,6 +213,8 @@
 	}
 	
 	private static void fillOutgoingSequenceInfo (SequenceReport report,String outSequenceID, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException  { 
+		report.setSequenceID(outSequenceID);
+		
 		ArrayList completedMessageList =  AcknowledgementManager.getClientCompletedMessagesList (outSequenceID,seqPropMgr);
 		
 		Iterator iter = completedMessageList.iterator();
@@ -226,9 +242,8 @@
 		ArrayList completedMessageList =  AcknowledgementManager.getServerCompletedMessagesList (sequenceID,seqPropMgr);
 
 		Iterator iter = completedMessageList.iterator();
-		while (iter.hasNext()) {
-			Long lng = new Long (Long.parseLong((String) iter.next()));
-			sequenceReport.addCompletedMessage(lng);
+		while (iter.hasNext()) {;
+			sequenceReport.addCompletedMessage((Long) iter.next());
 		}
 		
 		sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_IN);
@@ -315,9 +330,75 @@
 		return rmReport;
 	}
 	
-	public String InternalSequenceID (String to, String sequenceKey) {
+	public static String getInternalSequenceID (String to, String sequenceKey) {
 		return SandeshaUtil.getInternalSequenceID(to,sequenceKey);
 	}
 	
 
+	public static void terminateSequence (String toEPR, String sequenceKey, ServiceClient serviceClient,ConfigurationContext configurationContext) throws SandeshaException { 
+
+		String internalSequenceID = SandeshaUtil.getInternalSequenceID(toEPR,sequenceKey);
+				
+		SequenceReport sequenceReport = Sandesha2ClientAPI.getOutgoingSequenceReport(internalSequenceID,configurationContext);
+		if (sequenceReport==null)
+			throw new SandeshaException ("Cannot generate the sequence report for the given internalSequenceID");
+		if (sequenceReport.getSequenceStatus()!=SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
+			throw new SandeshaException ("Canot terminate the sequence since it is not active");
+		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+		SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+		if (sequenceIDBean==null)
+			throw new SandeshaException ("SequenceIdBean is not set");
+		
+		String sequenceID = sequenceIDBean.getValue();
+
+		if (sequenceID==null)
+			throw new SandeshaException ("Cannot find the sequenceID");
+		
+		Options options = serviceClient.getOptions();
+		
+		String rmSpecVersion = (String) options.getProperty(Sandesha2ClientAPI.RM_SPEC_VERSION);
+
+		if (rmSpecVersion==null) 
+			rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion ();
+		
+		String oldAction = options.getAction();
+		
+		options.setAction(SpecSpecificConstants.getTerminateSequenceAction(rmSpecVersion));		
+		
+		SOAPEnvelope dummyEnvelope = null;
+		SOAPFactory factory = null;
+		String soapNamespaceURI = options.getSoapVersionURI();
+		if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(soapNamespaceURI)) {
+			factory = new SOAP12Factory ();
+			dummyEnvelope = factory.getDefaultEnvelope();
+		}else  {
+			factory = new SOAP11Factory ();
+			dummyEnvelope = factory.getDefaultEnvelope();
+		}
+		
+		String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
+		
+		TerminateSequence terminateSequence = new TerminateSequence (factory,rmNamespaceValue);
+		Identifier identifier = new Identifier (factory,rmNamespaceValue);
+		identifier.setIndentifer(sequenceID);
+		terminateSequence.setIdentifier(identifier);
+		
+		terminateSequence.toSOAPEnvelope(dummyEnvelope);
+		
+	    String oldSequenceKey = (String) options.getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
+	    options.setProperty(Sandesha2ClientAPI.SEQUENCE_KEY,sequenceKey);
+		try {
+			serviceClient.fireAndForget(dummyEnvelope.getBody().getFirstChildWithName(new QName (rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.TERMINATE_SEQUENCE)));
+		} catch (AxisFault e) {
+			throw new SandeshaException ("Could not invoke the service client", e);
+		}
+
+		if (oldSequenceKey!=null)
+			options.setProperty(Sandesha2ClientAPI.SEQUENCE_KEY,oldSequenceKey);
+		
+		options.setAction(oldAction);
+	}
+	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java Thu Mar 16 21:01:58 2006
@@ -84,7 +84,7 @@
 	}
 	
 	public void addToOutgoingInternalSequenceMap (String outSequenceID, String internalSequenceID) {
-		outgoingInternalSequenceIDMap.put(outSequenceID,outSequenceID);
+		outgoingInternalSequenceIDMap.put(outSequenceID,internalSequenceID);
 	}
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Thu Mar 16 21:01:58 2006
@@ -84,16 +84,9 @@
 		MsgProcessor msgProcessor = MsgProcessorFactory
 				.getMessageProcessor(rmMsgCtx.getMessageType());
 
-		if (msgProcessor == null) {
-//			String message = "An Invalid RM message was received. Sandesha2 cant forward this request";
-//			log.debug(message);
-//			throw new AxisFault(message);
-			
-			return;  //this is not a rm message
-		}
-
 		try {
-			msgProcessor.processMessage(rmMsgCtx);
+			if (msgProcessor!=null)
+				msgProcessor.processInMessage(rmMsgCtx);
 		} catch (SandeshaException se) {
 			String message = "Error in processing the message";
 			log.debug(message);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Thu Mar 16 21:01:58 2006
@@ -41,6 +41,9 @@
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.SpecSpecificConstants;
 import org.apache.sandesha2.client.Sandesha2ClientAPI;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
@@ -114,755 +117,30 @@
 		StorageManager storageManager = SandeshaUtil
 				.getSandeshaStorageManager(context);
 
-		ServiceContext serviceContext = msgCtx.getServiceContext();
-		OperationContext operationContext = msgCtx.getOperationContext();
 
-		// continue only if an possible application message
-		if (!(rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.UNKNOWN)) {
-			return;
-		}
-		
-		MessageContext requestMessageCtx = msgCtx.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
-		if (requestMessageCtx!=null) {
-			RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMessageCtx);
-			Sequence reqSeqPart = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-			if (reqSeqPart==null) {
-				//this is not a rm intended message
-				return;
-			}
-		}
-
-		Parameter policyParam = msgCtx
-				.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
-		if (policyParam == null) {
-			SandeshaPropertyBean propertyBean = PropertyManager.getInstance()
-					.getPropertyBean();
-			Parameter parameter = new Parameter();
-			parameter.setName(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
-			parameter.setValue(propertyBean);
-
-			// TODO this should be addede to the AxisMessage
-			if (msgCtx.getAxisOperation() != null)
-				msgCtx.getAxisOperation().addParameter(parameter);
-			else if (msgCtx.getAxisService() != null)
-				msgCtx.getAxisService().addParameter(parameter);
-		}
-
-		SequencePropertyBeanMgr seqPropMgr = storageManager
-				.getSequencePropretyBeanMgr();
-
-		Transaction outHandlerTransaction = storageManager.getTransaction();
-
-		boolean serverSide = msgCtx.isServerSide();
-
-		// setting message Id if null
-		if (msgCtx.getMessageID() == null) {
-			msgCtx.setMessageID(SandeshaUtil.getUUID());
-		}
-
-		// find internal sequence id
-		String internalSequenceId = null;
-
-		/* Internal sequence id is the one used to refer to the sequence (since
-		actual sequence id is not available when first msg arrives)
-		server side - sequenceId if the incoming sequence
-		client side - wsaTo + SeequenceKey */
-
-		if (serverSide) {
-			// getting the request message and rmMessage.
-			MessageContext reqMsgCtx = msgCtx.getOperationContext()
-					.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-
-			RMMsgContext requestRMMsgCtx = MsgInitializer
-					.initializeMessage(reqMsgCtx);
-
-			Sequence reqSequence = (Sequence) requestRMMsgCtx
-					.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-			if (reqSequence == null) {
-				String message = "Sequence part is null";
-				log.debug(message);
-				throw new SandeshaException(message);
-			}
-
-			String incomingSeqId = reqSequence.getIdentifier().getIdentifier();
-			if (incomingSeqId == null || incomingSeqId == "") {
-				String message = "Invalid seqence Id";
-				log.debug(message);
-				throw new SandeshaException(message);
-			}
-
-			internalSequenceId = SandeshaUtil
-					.getServerSideInternalSeqIdFromIncomingSeqId(incomingSeqId);
-
-		} else {
-			// set the internal sequence id for the client side.
-			EndpointReference toEPR = msgCtx.getTo();
-			if (toEPR == null || toEPR.getAddress() == null
-					|| "".equals(toEPR.getAddress())) {
-				String message = "TO End Point Reference is not set correctly. This is a must for the sandesha client side.";
-				log.debug(message);
-				throw new AxisFault(message);
-			}
-
-			String to = toEPR.getAddress();
-			String sequenceKey = (String) msgCtx
-					.getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
-
-			internalSequenceId = SandeshaUtil.getInternalSequenceID(to,
-					sequenceKey);
-
-		}
-
-		/* checking weather the user has given the messageNumber (most of the cases this will not be the case).
-		In that case the system will generate the message number */
-
-		//User should set it as a long object.
-		Long messageNumberLng = (Long) msgCtx.getProperty(Sandesha2ClientAPI.MESSAGE_NUMBER);
-		
-		long givenMessageNumber = -1;
-		if (messageNumberLng!=null) {
-			givenMessageNumber = messageNumberLng.longValue();
-			if (givenMessageNumber<=0) {
-				throw new SandeshaException ("The givem message number value is invalid (has to be larger than zero)");
-			}
-		}
-		
-		//the message number that was last used. 
-		long systemMessageNumber = getPreviousMsgNo(context, internalSequenceId);
-		
-		//The number given by the user has to be larger than the last stored number.
-		if (givenMessageNumber>0 && givenMessageNumber<=systemMessageNumber) {
-			String message = "The given message number is not larger than value of the last sent message.";
-			throw new SandeshaException (message);
-		}
-		
-		//Finding the correct message number.
-		long messageNumber = -1;
-		if (givenMessageNumber>0)          // if given message number is valid use it. (this is larger than the last stored due to the last check)
-			messageNumber = givenMessageNumber;
-		else if (systemMessageNumber>0) {	//if system message number is valid use it.
-			messageNumber = systemMessageNumber+1;
-		} else {         //This is the fist message (systemMessageNumber = -1)
-			messageNumber = 1;
-		}
-		
-		//saving the used message number
-		if (!dummyMessage)
-			setNextMsgNo(context,internalSequenceId,messageNumber);
-		
-		boolean sendCreateSequence = false;
 
-		SequencePropertyBean outSeqBean = seqPropMgr.retrieve(
-				internalSequenceId,
-				Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
-
-		// setting async ack endpoint for the server side. (if present)
-		if (serverSide) {
-			String incomingSequenceID = SandeshaUtil
-					.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
-			SequencePropertyBean incomingToBean = seqPropMgr.retrieve(
-					incomingSequenceID,
-					Sandesha2Constants.SequenceProperties.TO_EPR);
-			if (incomingToBean != null) {
-				String incomingTo = incomingToBean.getValue();
-				msgCtx.setProperty(Sandesha2ClientAPI.AcksTo, incomingTo);
-			}
-		}
-
-		
-		String specVersion = null;
-		
-		if (msgCtx.isServerSide()) {
-			//in the server side, get the RM version from the request sequence.
-			MessageContext requestMessageContext = msgCtx.getOperationContext().getMessageContext(AxisOperationFactory.MESSAGE_LABEL_IN_VALUE);
-			if (requestMessageContext==null) 
-				throw new SandeshaException ("Request message context is null, cant find out the request side sequenceID");
-			
-			RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(requestMessageContext);
-			Sequence sequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-			
-			String requestSequenceID = sequence.getIdentifier().getIdentifier();
-			SequencePropertyBean specVersionBean = seqPropMgr.retrieve(requestSequenceID,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
-			if (specVersionBean==null) 
-				throw new SandeshaException ("SpecVersion sequence property bean is not available for the incoming sequence. Cant find the RM version for outgoing side");
-			
-			specVersion = specVersionBean.getValue();
-			
-			
-		} else {
-			//in the client side, user will set the RM version.
-			specVersion = (String) msgCtx.getProperty(Sandesha2ClientAPI.RM_SPEC_VERSION);
-		}
-		
-		
-		if (specVersion==null) {
-			specVersion = Sandesha2Constants.SPEC_VERSIONS.WSRM;   //TODO change the default to WSRX.
+		MsgProcessor msgProcessor = null;
+		int messageType = rmMsgCtx.getMessageType();
+		if (messageType==Sandesha2Constants.MessageTypes.UNKNOWN) {
+			MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+			if (requestMsgCtx!=null) {  //for the server side
+				RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
+				Sequence sequencePart = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+				if (sequencePart!=null)
+					msgProcessor = new ApplicationMsgProcessor ();// a rm intended message.
+			} else if (!msgCtx.isServerSide()) //if client side.
+			    msgProcessor = new ApplicationMsgProcessor ();
+		}else  {
+			msgProcessor = MsgProcessorFactory.getMessageProcessor(messageType);
 		}
 		
-		
-		if (messageNumber == 1) {
-			if (outSeqBean == null) {
-				sendCreateSequence = true;   // message number being one and not having an out sequence, implies that a create sequence has to be send.
-			}
-
-			// if fist message - setup the sending side sequence - both for the server and the client sides
-			SequenceManager.setupNewClientSequence(msgCtx, internalSequenceId,specVersion);
-		}
-
-		if (sendCreateSequence) {
-			SequencePropertyBean responseCreateSeqAdded = seqPropMgr
-					.retrieve(
-							internalSequenceId,
-							Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT);
-
-			if (responseCreateSeqAdded == null) {
-				responseCreateSeqAdded = new SequencePropertyBean(
-						internalSequenceId,
-						Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT,
-						"true");
-				seqPropMgr.insert(responseCreateSeqAdded);
-
-				String acksTo = null;
-				if (serviceContext != null) {
-					acksTo = (String) msgCtx
-							.getProperty(Sandesha2ClientAPI.AcksTo);
-				}
-
-				if (msgCtx.isServerSide()) {
-					// we do not set acksTo value to anonymous when the create
-					// sequence is send from the server.
-					MessageContext requestMessage = operationContext
-							.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
-					if (requestMessage == null) {
-						String message = "Request message is not present";
-						log.debug(message);
-						throw new SandeshaException(message);
-					}
-					acksTo = requestMessage.getTo().getAddress();
-
-				} else {
-					if (acksTo == null)
-						acksTo = Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
-				}
-
-				if (!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo) && !serverSide) {
-					String transportIn = (String) context
-							.getProperty(MessageContext.TRANSPORT_IN);
-					if (transportIn == null)
-						transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
-				} else if (acksTo == null && serverSide) {
-					String incomingSequencId = SandeshaUtil
-							.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
-					SequencePropertyBean bean = seqPropMgr.retrieve(
-							incomingSequencId,
-							Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
-					if (bean != null) {
-						EndpointReference acksToEPR = new EndpointReference(bean.getValue());
-						if (acksToEPR != null)
-							acksTo = (String) acksToEPR.getAddress();
-					}
-				} else if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS
-						.equals(acksTo)) {
-					// set transport in.
-					Object trIn = msgCtx
-							.getProperty(MessageContext.TRANSPORT_IN);
-					if (trIn == null) {
-						//TODO
-					}
-				}
-
-				addCreateSequenceMessage(rmMsgCtx, internalSequenceId, acksTo);
-			}
-		}
-
-		SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
-		if (env == null) {
-			SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(
-					SandeshaUtil.getSOAPVersion(env)).getDefaultEnvelope();
-			rmMsgCtx.setSOAPEnvelop(envelope);
-		}
-
-		SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
-		if (soapBody == null) {
-			String message = "Invalid SOAP message. Body is not present";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		String messageId1 = SandeshaUtil.getUUID();
-		if (rmMsgCtx.getMessageId() == null) {
-			rmMsgCtx.setMessageId(messageId1);
-		}
-
-		
-		
-		
-		
-
-		
-		if (serverSide) {
-			// let the request end with 202 if a ack has not been
-			// written in the incoming thread.
-			
-			MessageContext reqMsgCtx = msgCtx.getOperationContext()
-					.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-
-			if (reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN) == null
-					|| !"true".equals(reqMsgCtx
-							.getProperty(Sandesha2Constants.ACK_WRITTEN)))
-				reqMsgCtx.getOperationContext().setProperty(
-						org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
-		}
-		
-		
-		EndpointReference toEPR = msgCtx.getTo();
-		if (toEPR == null) {
-			String message = "To EPR is not found";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-		String to = toEPR.getAddress();
-		
-		String operationName = msgCtx.getOperationContext().getAxisOperation().getName().getLocalPart();
-		
-		if (msgCtx.getWSAAction() == null) {
-			msgCtx.setWSAAction(to + "/" + operationName);
-		}
-
-		if (msgCtx.getSoapAction() == null) {
-			msgCtx.setSoapAction("\"" + to + "/" + operationName + "\"");
-		}
-		
-		// processing the response
-		if (!dummyMessage)
-			processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
-		
-		msgCtx.pause();  // the execution will be stopped.
-		outHandlerTransaction.commit();		
-	}
-
-	public void addCreateSequenceMessage(RMMsgContext applicationRMMsg,
-			String internalSequenceId, String acksTo) throws SandeshaException {
-
-		MessageContext applicationMsg = applicationRMMsg.getMessageContext();
-		if (applicationMsg == null) {
-			String message = "Message context is null";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(
-				applicationRMMsg, internalSequenceId, acksTo);
-		createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
-		CreateSequence createSequencePart = (CreateSequence) createSeqRMMessage
-				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
-		if (createSequencePart == null) {
-			String message = "Create Sequence part is null for a CreateSequence message";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		StorageManager storageManager = SandeshaUtil
-				.getSandeshaStorageManager(applicationMsg
-						.getConfigurationContext());
-
-		SequencePropertyBeanMgr seqPropMgr = storageManager
-				.getSequencePropretyBeanMgr();
-
-		SequenceOffer offer = createSequencePart.getSequenceOffer();
-		if (offer != null) {
-			String offeredSequenceId = offer.getIdentifer().getIdentifier();
-			SequencePropertyBean msgsBean = new SequencePropertyBean();
-			msgsBean.setSequenceID(offeredSequenceId);
-			msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
-			msgsBean.setValue("");
-
-			SequencePropertyBean offeredSequenceBean = new SequencePropertyBean();
-			offeredSequenceBean
-					.setName(Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
-			offeredSequenceBean.setSequenceID(internalSequenceId);
-			offeredSequenceBean.setValue(offeredSequenceId);
-
-			seqPropMgr.insert(msgsBean);
-			seqPropMgr.insert(offeredSequenceBean);
-		}
-
-		MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
-		createSeqMsg.setRelatesTo(null); // create seq msg does not relateTo anything
-		AbstractContext context = applicationRMMsg.getContext();
-		if (context == null) {
-			String message = "Context is null";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
-
-		CreateSeqBean createSeqBean = new CreateSeqBean(internalSequenceId,
-				createSeqMsg.getMessageID(), null);
-		createSeqMgr.insert(createSeqBean);
-
-		if (createSeqMsg.getReplyTo() == null)
-			createSeqMsg.setReplyTo(new EndpointReference(
-					Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
-
-		SenderBeanMgr retransmitterMgr = storageManager
-				.getRetransmitterBeanMgr();
-
-		String key = SandeshaUtil.getUUID();
-
-		SenderBean createSeqEntry = new SenderBean();
-		createSeqEntry.setMessageContextRefKey(key);
-		createSeqEntry.setTimeToSend(System.currentTimeMillis());
-		createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
-
-		// this will be set to true in the sender
-		createSeqEntry.setSend(true);
-		
-		createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
-				Sandesha2Constants.VALUE_FALSE);
-		
-		createSeqEntry
-				.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
-		retransmitterMgr.insert(createSeqEntry);
-		
-		storageManager.storeMessageContext(key,createSeqMsg);
-		
-		// sending the message once through our sender.
-		AxisEngine engine = new AxisEngine(createSeqMsg
-				.getConfigurationContext());
-
-		// message will be stored in the Sandesha2TransportSender		
-		createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, key);
-		
-		TransportOutDescription transportOut = createSeqMsg.getTransportOut();
-		
-		createSeqMsg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
-		createSeqMsg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
-		createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, key);
-		
-		Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc ();
-		createSeqMsg.setTransportOut(sandesha2TransportOutDesc);
-
-		 try {
-			 log.info ("Sending create seq msg...");
-			 engine.send(createSeqMsg);
-		 } catch (AxisFault e) {
-			 throw new SandeshaException (e.getMessage());
-		 }
-	}
-
-	private void processResponseMessage(RMMsgContext rmMsg,
-			String internalSequenceId, long messageNumber)
-			throws SandeshaException {
-
-		MessageContext msg = rmMsg.getMessageContext();
-
-		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
-				.getSOAPVersion(rmMsg.getSOAPEnvelope()));
-
-		if (rmMsg == null) {
-			String message = "Message or reques message is null";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		AbstractContext context = rmMsg.getContext();
-		if (context == null) {
-			String message = "Context is null";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-		ConfigurationContext configurationContext = rmMsg.getMessageContext().getConfigurationContext();
-
-		StorageManager storageManager = SandeshaUtil
-				.getSandeshaStorageManager(msg.getConfigurationContext());
-		
-		SequencePropertyBeanMgr sequencePropertyMgr = storageManager
-				.getSequencePropretyBeanMgr();
-
-		SenderBeanMgr retransmitterMgr = storageManager
-				.getRetransmitterBeanMgr();
-
-		SequencePropertyBean toBean = sequencePropertyMgr.retrieve(
-				internalSequenceId,
-				Sandesha2Constants.SequenceProperties.TO_EPR);
-		SequencePropertyBean replyToBean = sequencePropertyMgr.retrieve(
-				internalSequenceId,
-				Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
-
-		// again - looks weird in the client side - but consistent
-		SequencePropertyBean outSequenceBean = sequencePropertyMgr.retrieve(
-				internalSequenceId,
-				Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
-
-		if (toBean == null) {
-			String message = "To is null";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		EndpointReference toEPR = new EndpointReference(toBean.getValue());
-		EndpointReference replyToEPR = null;
-
-		if (replyToBean != null) {
-			replyToEPR = new EndpointReference(replyToBean.getValue());
-		}
-
-		if (toEPR == null || toEPR.getAddress() == null
-				|| toEPR.getAddress() == "") {
-			String message = "To Property has an invalid value";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		String newToStr = null;
-
-		if (msg.isServerSide()) {
-			try {
-				MessageContext requestMsg = msg.getOperationContext()
-						.getMessageContext(
-								OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
-				if (requestMsg != null) {
-					newToStr = requestMsg.getReplyTo().getAddress();
-				}
-			} catch (AxisFault e) {
-				throw new SandeshaException(e.getMessage());
-			}
-		}
-
-		if (newToStr != null)
-			rmMsg.setTo(new EndpointReference(newToStr));
-		else
-			rmMsg.setTo(toEPR);
-
-		if (replyToEPR != null)
-			rmMsg.setReplyTo(replyToEPR);
-
-		String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,configurationContext);
-		if (rmVersion==null)
-			throw new SandeshaException ("Cant find the rmVersion of the given message");
-		
-		String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
-		
-		Sequence sequence = new Sequence(factory,rmNamespaceValue);
-
-		MessageNumber msgNumber = new MessageNumber(factory,rmNamespaceValue);
-		msgNumber.setMessageNumber(messageNumber);
-		sequence.setMessageNumber(msgNumber);
-
-		boolean lastMessage = false;
-		// setting last message
-		if (msg.isServerSide()) {
-			// server side
-			String incomingSeqId = internalSequenceId;
-			MessageContext requestMsg = null;
-
-			try {
-				requestMsg = msg.getOperationContext().getMessageContext(
-						WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-			} catch (AxisFault e) {
-				throw new SandeshaException(e.getMessage());
-			}
-
-			RMMsgContext reqRMMsgCtx = MsgInitializer
-					.initializeMessage(requestMsg);
-			Sequence requestSequence = (Sequence) reqRMMsgCtx
-					.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-			if (requestSequence == null) {
-				String message = "Request Sequence is null";
-				log.debug(message);
-				throw new SandeshaException(message);
-			}
-
-			
-			//TODO check for highest msg no.
-			long requestMsgNo = requestSequence.getMessageNumber().getMessageNumber();
-			
-			
-			if (requestSequence.getLastMessage() != null) {
-				lastMessage = true;
-				sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
-
-				// saving the last message no.
-				SequencePropertyBean lastOutMsgBean = new SequencePropertyBean(
-						internalSequenceId,
-						Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE,
-						new Long(messageNumber).toString());
-				sequencePropertyMgr.insert(lastOutMsgBean);
-			}
-
-		} else {
-			// client side
-
-			OperationContext operationContext = msg.getOperationContext();
-			if (operationContext != null) {
-				Object obj = msg.getProperty(Sandesha2ClientAPI.LAST_MESSAGE);
-				if (obj != null && "true".equals(obj)) {
-					lastMessage = true;
-					
-					SequencePropertyBean specVersionBean = sequencePropertyMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
-					if (specVersionBean==null)
-						throw new SandeshaException ("Spec version bean is not set");
-					String specVersion = specVersionBean.getValue();
-					
-					if (SpecSpecificConstants.isLastMessageIndicatorRequired(specVersion))
-						sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
-					
-					// saving the last message no.
-					SequencePropertyBean lastOutMsgBean = new SequencePropertyBean(
-							internalSequenceId,
-							Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE,
-							new Long(messageNumber).toString());
-					sequencePropertyMgr.insert(lastOutMsgBean);
-				}
-			}
-		}
-
-		AckRequested ackRequested = null;
-
-		boolean addAckRequested = false;
-		//if (!lastMessage)
-		addAckRequested = true;   //TODO decide the policy to add the ackRequested tag
-
-		// setting the Sequnece id.
-		// Set send = true/false depending on the availability of the out
-		// sequence id.
-		String identifierStr = null;
-		if (outSequenceBean == null || outSequenceBean.getValue() == null) {
-			identifierStr = Sandesha2Constants.TEMP_SEQUENCE_ID;
-
-		} else {
-			identifierStr = (String) outSequenceBean.getValue();
-		}
-
-		Identifier id1 = new Identifier(factory,rmNamespaceValue);
-		id1.setIndentifer(identifierStr);
-		sequence.setIdentifier(id1);
-		rmMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE,sequence);
-
-		if (addAckRequested) {
-			ackRequested = new AckRequested(factory,rmNamespaceValue);
-			Identifier id2 = new Identifier(factory,rmNamespaceValue);
-			id2.setIndentifer(identifierStr);
-			ackRequested.setIdentifier(id2);
-			rmMsg.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST,
-					ackRequested);
-		}
-
-		try {
-			rmMsg.addSOAPEnvelope();
-		} catch (AxisFault e1) {
-			throw new SandeshaException(e1.getMessage());
-		}
-
-		//Retransmitter bean entry for the application message
-		SenderBean appMsgEntry = new SenderBean();
-		String storageKey = SandeshaUtil.getUUID();
-		
-		appMsgEntry.setMessageContextRefKey(storageKey);
-
-		appMsgEntry.setTimeToSend(System.currentTimeMillis());
-		appMsgEntry.setMessageID(rmMsg.getMessageId());
-		appMsgEntry.setMessageNumber(messageNumber);
-		appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
-		if (outSequenceBean == null || outSequenceBean.getValue() == null) {
-			appMsgEntry.setSend(false);
-		} else {
-			appMsgEntry.setSend(true);
-			// Send will be set to true at the sender.
-			msg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,
-					Sandesha2Constants.VALUE_TRUE);
-		}
-
-		appMsgEntry.setInternalSequenceID(internalSequenceId);
-		
-		storageManager.storeMessageContext(storageKey,msg);
-		
-		retransmitterMgr.insert(appMsgEntry);
-
-		msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
-				Sandesha2Constants.VALUE_FALSE);
-
-		// changing the sender. This will set send to true.
-		TransportSender sender = msg.getTransportOut().getSender();
-
-		if (sender != null) {
-			Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc ();
-			msg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, storageKey);
-			msg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,
-					msg.getTransportOut());
-			msg.setTransportOut(sandesha2TransportOutDesc);
-			
-		}
-		
-
-		//increasing the current handler index, so that the message will not be going throught the SandeshaOutHandler again.
-		msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex()+1);
-		
-		//sending the message through, other handlers and the Sandesha2TransportSender so that it get dumped to the storage.
-		AxisEngine engine = new AxisEngine (msg.getConfigurationContext());
-		try {
-			engine.resumeSend(msg);
-		} catch (AxisFault e) {
-			throw new SandeshaException (e);
-		}
-		
-	}	
+		if (msgProcessor!=null)
+			msgProcessor.processOutMessage(rmMsgCtx);
 	
-	private long getPreviousMsgNo(ConfigurationContext context,
-			String internalSequenceId) throws SandeshaException {
-
-		StorageManager storageManager = SandeshaUtil
-				.getSandeshaStorageManager(context);
-
-		SequencePropertyBeanMgr seqPropMgr = storageManager
-				.getSequencePropretyBeanMgr();
-		SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(
-				internalSequenceId,
-				Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
-		
-		long nextMsgNo = -1;
-		if (nextMsgNoBean != null) {
-			Long nextMsgNoLng = new Long(nextMsgNoBean.getValue());
-			nextMsgNo = nextMsgNoLng.longValue();
-		} 
-		
-		return nextMsgNo;
 	}
-	
-	private void setNextMsgNo(ConfigurationContext context,
-			String internalSequenceId, long msgNo) throws SandeshaException {
-
-		if (msgNo<=0) {
-			String message = "Message number '" + msgNo + "' is invalid. Has to be larger than zero.";
-			throw new SandeshaException (message);
-		}
-		
-		StorageManager storageManager = SandeshaUtil
-				.getSandeshaStorageManager(context);
-
-		SequencePropertyBeanMgr seqPropMgr = storageManager
-				.getSequencePropretyBeanMgr();
-		SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(
-				internalSequenceId,
-				Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
-
-		boolean update = true;
-		if (nextMsgNoBean == null) {
-			update = false;
-			nextMsgNoBean = new SequencePropertyBean();
-			nextMsgNoBean.setSequenceID(internalSequenceId);
-			nextMsgNoBean.setName(Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
-		}
 
-		nextMsgNoBean.setValue(new Long(msgNo).toString());
-		if (update)
-			seqPropMgr.update(nextMsgNoBean);
-		else
-			seqPropMgr.insert(nextMsgNoBean);
+	
 
-	}
 
 	public QName getName() {
 		return new QName(Sandesha2Constants.OUT_HANDLER_NAME);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Thu Mar 16 21:01:58 2006
@@ -62,7 +62,7 @@
 
 	private Log log = LogFactory.getLog(getClass());
 	
-	public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+	public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
 		
 		
 		AckRequested ackRequested = (AckRequested) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
@@ -263,6 +263,10 @@
 			
 			msgContext.pause(); 
 		}
+	}
+	
+	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+		
 	}
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Thu Mar 16 21:01:58 2006
@@ -32,10 +32,12 @@
 import org.apache.axis2.transport.TransportSender;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.AcknowledgementManager;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.SpecSpecificConstants;
+import org.apache.sandesha2.TerminateManager;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -62,7 +64,7 @@
 
 	Log log = LogFactory.getLog(getClass());
 	
-	public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+	public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
 
 		SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) rmMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
@@ -73,6 +75,7 @@
 		}
 		
 		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+		ConfigurationContext configCtx = msgCtx.getConfigurationContext();
 		
 		AbstractContext context = rmMsgCtx.getContext();
 		if (context == null) {
@@ -231,41 +234,48 @@
 		seqPropMgr.update(allCompletedMsgsBean);
 		
 		//If all messages up to last message have been acknowledged. Add terminate Sequence message.
-		SequencePropertyBean lastOutMsgBean = seqPropMgr.retrieve(
-				internalSequenceId, Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE);
-		if (lastOutMsgBean != null) {
-			Long lastOutMsgNoLng = new Long (lastOutMsgBean.getValue());
-			if (lastOutMsgNoLng == null) {
-				String message = "Invalid object set for the Last Out Message";
-				log.debug(message);
-				throw new SandeshaException(message);
+//		SequencePropertyBean lastOutMsgBean = seqPropMgr.retrieve(
+//				internalSequenceId, Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE);
+//		if (lastOutMsgBean != null) {
+//			Long lastOutMsgNoLng = new Long (lastOutMsgBean.getValue());
+//			if (lastOutMsgNoLng == null) {
+//				String message = "Invalid object set for the Last Out Message";
+//				log.debug(message);
+//				throw new SandeshaException(message);
+//			}
+//			
+//			long lastOutMessageNo = lastOutMsgNoLng.longValue();
+//			if (lastOutMessageNo <= 0) {
+//				String message = "Invalid value set for the last out message";
+//				log.debug(message);
+//				throw new SandeshaException(message);
+//			}
+
+		
+		
+		//commiting transaction
+		ackTransaction.commit();
+		
+		String lastOutMsgNoStr = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,configCtx);
+		if (lastOutMsgNoStr!=null ) {
+			long highestOutMsgNo = 0;
+			if (lastOutMsgNoStr!=null) {
+				highestOutMsgNo = Long.parseLong(lastOutMsgNoStr);
 			}
 			
-			long lastOutMessageNo = lastOutMsgNoLng.longValue();
-			if (lastOutMessageNo <= 0) {
-				String message = "Invalid value set for the last out message";
-				log.debug(message);
-				throw new SandeshaException(message);
-			}
-
-
-			//commiting transaction
-			ackTransaction.commit();
-
-			boolean complete = SandeshaUtil.verifySequenceCompletion(
-					sequenceAck.getAcknowledgementRanges().iterator(),
-					lastOutMessageNo);
+			if (highestOutMsgNo>0) {
+				boolean complete = AcknowledgementManager.verifySequenceCompletion (
+				sequenceAck.getAcknowledgementRanges().iterator(),highestOutMsgNo);
 			
-			if (complete) {
-				addTerminateSequenceMessage(rmMsgCtx, outSequenceId,internalSequenceId);
+				if (complete) 
+					TerminateManager.addTerminateSequenceMessage(rmMsgCtx, outSequenceId,internalSequenceId);
 			}
 		}
-	
+		
 		//stopping the progress of the message further.
 		rmMsgCtx.pause();	
-		
-
 	}
+	
 
 	private SenderBean getRetransmitterEntry(Collection collection,
 			long msgNo) {
@@ -279,121 +289,7 @@
 		return null;
 	}
 
-	public void addTerminateSequenceMessage(RMMsgContext incomingAckRMMsg,
-			String outSequenceId, String internalSequenceId)
-			throws SandeshaException {
-
-		
-		ConfigurationContext configurationContext = incomingAckRMMsg.getMessageContext().getConfigurationContext();
-		StorageManager storageManager = SandeshaUtil
-				.getSandeshaStorageManager(configurationContext);
-
-		Transaction addTerminateSeqTransaction = storageManager.getTransaction();
-		
-		SequencePropertyBeanMgr seqPropMgr = storageManager
-				.getSequencePropretyBeanMgr();
-
-		SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceId,
-				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
-
-		if (terminated != null && terminated.getValue() != null
-				&& "true".equals(terminated.getValue())) {
-			String message = "Terminate was added previously.";
-			log.info(message);
-			return;
-		}
-
-		RMMsgContext terminateRMMessage = RMMsgCreator
-				.createTerminateSequenceMessage(incomingAckRMMsg, outSequenceId,internalSequenceId);
-		terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
-		terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-		
-		SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceId,
-				Sandesha2Constants.SequenceProperties.TO_EPR);
-
-		EndpointReference toEPR = new EndpointReference ( toBean.getValue());
-		if (toEPR == null) {
-			String message = "To EPR has an invalid value";
-			throw new SandeshaException(message);
-		}
-
-		terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
-		terminateRMMessage.setFrom(new EndpointReference(
-				Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
-		terminateRMMessage.setFaultTo(new EndpointReference(
-				Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
-		
-		String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,configurationContext);
-		if (rmVersion==null)
-			throw new SandeshaException ("Cant find the rmVersion of the given message");
-		terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
-		terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
-
-		SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
-		if (transportToBean!=null) {
-			terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
-		}
-		
-		try {
-			terminateRMMessage.addSOAPEnvelope();
-		} catch (AxisFault e) {
-			throw new SandeshaException(e.getMessage());
-		}
-
-		String key = SandeshaUtil.getUUID();
-		
-		SenderBean terminateBean = new SenderBean();
-		terminateBean.setMessageContextRefKey(key);
-
-		
-		storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext());
-
-		
-		//Set a retransmitter lastSentTime so that terminate will be send with
-		// some delay.
-		//Otherwise this get send before return of the current request (ack).
-		//TODO: refine the terminate delay.
-		terminateBean.setTimeToSend(System.currentTimeMillis()
-				+ Sandesha2Constants.TERMINATE_DELAY);
 
-		terminateBean.setMessageID(terminateRMMessage.getMessageId());
-		
-		//this will be set to true at the sender.
-		terminateBean.setSend(true);
-		
-		terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
-				Sandesha2Constants.VALUE_FALSE);
-		
-		terminateBean.setReSend(false);
-
-		SenderBeanMgr retramsmitterMgr = storageManager
-				.getRetransmitterBeanMgr();
-
-		retramsmitterMgr.insert(terminateBean);
-		
-		SequencePropertyBean terminateAdded = new SequencePropertyBean();
-		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
-		terminateAdded.setSequenceID(outSequenceId);
-		terminateAdded.setValue("true");
-
-		seqPropMgr.insert(terminateAdded);
-		
-		//This should be dumped to the storage by the sender
-		TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
-		terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
-		terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
-		terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
-		terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
-		addTerminateSeqTransaction.commit();
-		
-	    AxisEngine engine = new AxisEngine (incomingAckRMMsg.getMessageContext().getConfigurationContext());
-	    try {
-			engine.send(terminateRMMessage.getMessageContext());
-		} catch (AxisFault e) {
-			throw new SandeshaException (e.getMessage());
-		}
-	    
-	}
 	
 	private static long getNoOfMessagesAcked (Iterator ackRangeIterator) {
 		long noOfMsgs = 0;
@@ -408,5 +304,9 @@
 		}
 		
 		return noOfMsgs;
+	}
+	
+	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+		
 	}
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org