You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/06/15 07:51:24 UTC

svn commit: r414476 [10/15] - in /webservices/sandesha/trunk: ./ c/ config/ interop/ java/ java/config/ java/interop/ java/interop/conf/ java/interop/src/ java/interop/src/org/ java/interop/src/org/apache/ java/interop/src/org/apache/sandesha2/ java/in...

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,495 @@
+/*
+ * Created on Sep 5, 2005
+ *
+ * TODO To change the template for this generated file go to
+ * Window - Preferences - Java - Code Style - Code Templates
+ */
+package org.apache.sandesha2.util;
+
+import java.util.Collection;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.engine.ListenerManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.wsrm.CreateSequence;
+
+/**
+ * This is used to set up a new sequence, both at the sending side and the receiving side.
+ * 
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class SequenceManager {
+
+	private static Log log = LogFactory.getLog(SequenceManager.class);
+	
+	public static String setupNewSequence(RMMsgContext createSequenceMsg,StorageManager storageManager)
+			throws AxisFault {		
+		
+		String sequenceId = SandeshaUtil.getUUID();
+
+		EndpointReference to = createSequenceMsg.getTo();
+		if (to == null) {
+			String message = "To is null";
+			log.debug(message);
+			throw new AxisFault(message);
+		}
+
+		EndpointReference replyTo = createSequenceMsg.getReplyTo();
+
+		CreateSequence createSequence = (CreateSequence) createSequenceMsg
+				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+		if (createSequence == null) {
+			String message = "Create Sequence Part is null";
+			log.debug(message);
+			throw new AxisFault(message);
+		}
+
+		EndpointReference acksTo = createSequence.getAcksTo().getAddress()
+				.getEpr();
+
+		if (acksTo == null) {
+			String message = "AcksTo is null";
+			log.debug(message);
+			throw new AxisFault(message);
+		}
+
+		ConfigurationContext configurationContext = createSequenceMsg.getMessageContext()
+									.getConfigurationContext();
+
+
+		SequencePropertyBeanMgr seqPropMgr = storageManager
+				.getSequencePropretyBeanMgr();
+
+		SequencePropertyBean receivedMsgBean = new SequencePropertyBean(
+				sequenceId, Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
+		
+		
+		//setting the addressing version
+		String addressingNamespaceValue = createSequenceMsg.getAddressingNamespaceValue();
+		SequencePropertyBean addressingNamespaceBean = new SequencePropertyBean (
+				sequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,addressingNamespaceValue);
+		seqPropMgr.insert(addressingNamespaceBean);
+		
+		String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceValue);
+		
+		//If no replyTo value. Send responses as sync.
+		SequencePropertyBean toBean = null;
+		if (replyTo!=null) {
+			toBean = new SequencePropertyBean(sequenceId,
+				Sandesha2Constants.SequenceProperties.TO_EPR, replyTo.getAddress());
+		}else {
+			toBean = new SequencePropertyBean(sequenceId,
+					Sandesha2Constants.SequenceProperties.TO_EPR, anonymousURI);
+		}
+		
+		SequencePropertyBean replyToBean = new SequencePropertyBean(sequenceId,
+				Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, to.getAddress());
+		SequencePropertyBean acksToBean = new SequencePropertyBean(sequenceId,
+				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR, acksTo.getAddress());
+
+		
+		seqPropMgr.insert(receivedMsgBean);
+		seqPropMgr.insert(replyToBean);
+		seqPropMgr.insert(acksToBean);
+		
+		if (toBean!=null)
+			seqPropMgr.insert(toBean);
+
+		NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+		nextMsgMgr.insert(new NextMsgBean(sequenceId, 1)); // 1 will be the next
+		
+		// message to invoke. This will apply for only in-order invocations.
+
+		SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceId);
+		
+		//stting the RM SPEC version for this sequence.
+		String createSequenceMsgAction = createSequenceMsg.getWSAAction();
+		if (createSequenceMsgAction==null)
+		    throw new SandeshaException ("Create sequence message does not have the WSA:Action value");
+		
+		String messageRMNamespace = createSequence.getNamespaceValue();
+		
+		String specVersion = null;
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(messageRMNamespace)) {
+			specVersion = Sandesha2Constants.SPEC_VERSIONS.v1_0;
+		}else if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(messageRMNamespace)) {
+			specVersion = Sandesha2Constants.SPEC_VERSIONS.v1_1;
+		} else {
+			throw new SandeshaException ("Create sequence message does not has a valid RM namespace value. Cant decide the RM version");
+		}
+		
+		SequencePropertyBean specVerionBean = new SequencePropertyBean ();
+		specVerionBean.setSequenceID(sequenceId);
+		specVerionBean.setName(Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+		specVerionBean.setValue(specVersion);
+		
+		seqPropMgr.insert(specVerionBean);
+		
+		//TODO get the SOAP version from the create seq message.
+		
+		return sequenceId;
+	}
+
+	public void removeSequence(String sequence) {
+
+	}
+
+	public static void setupNewClientSequence(
+			MessageContext firstAplicationMsgCtx, String internalSequenceId, String specVersion,StorageManager storageManager)
+			throws SandeshaException {
+		
+		ConfigurationContext configurationContext = firstAplicationMsgCtx
+										.getConfigurationContext();
+
+		SequencePropertyBeanMgr seqPropMgr = storageManager
+				.getSequencePropretyBeanMgr();
+
+		//setting the addressing version
+		String addressingNamespace = (String) firstAplicationMsgCtx.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+		
+		if (addressingNamespace==null) {
+			OperationContext opCtx = firstAplicationMsgCtx.getOperationContext();
+			if (opCtx!=null) {
+				try {
+					MessageContext requestMsg = opCtx.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+					if (requestMsg!=null)
+						addressingNamespace = (String) requestMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+				} catch (AxisFault e) {
+					throw new SandeshaException (e);
+				}
+			}
+		}
+		
+		if (addressingNamespace==null)
+			addressingNamespace = AddressingConstants.Final.WSA_NAMESPACE;   //defaults to Final. Make sure this is synchronized with addressing.
+		
+		SequencePropertyBean addressingNamespaceBean = new SequencePropertyBean (
+				internalSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,addressingNamespace);
+		seqPropMgr.insert(addressingNamespaceBean);
+		
+		String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
+		
+		EndpointReference toEPR = firstAplicationMsgCtx.getTo();
+		String acksTo = (String) firstAplicationMsgCtx
+				.getProperty(SandeshaClientConstants.AcksTo);
+
+		if (toEPR == null) {
+			String message = "WS-Addressing To is null";
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		SequencePropertyBean toBean = new SequencePropertyBean(internalSequenceId,
+				Sandesha2Constants.SequenceProperties.TO_EPR, toEPR.getAddress());
+		SequencePropertyBean replyToBean = null;
+		SequencePropertyBean acksToBean = null;
+		
+		if (firstAplicationMsgCtx.isServerSide()) {
+			//setting replyTo value, if this is the server side.
+			OperationContext opContext = firstAplicationMsgCtx.getOperationContext();
+			try {
+				MessageContext requestMessage = opContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+				if (requestMessage==null) {
+					String message = "Cannot find the request message from the operation context";
+					log.error(message);
+					throw new SandeshaException (message);
+				}
+				
+				EndpointReference replyToEPR = requestMessage.getTo();    //'replyTo' of the response msg is the 'to' value of the req msg.
+				if (replyToEPR!=null) {
+					replyToBean = new SequencePropertyBean (internalSequenceId,Sandesha2Constants.SequenceProperties.REPLY_TO_EPR,replyToEPR.getAddress());
+					acksToBean = new SequencePropertyBean (internalSequenceId,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,replyToEPR.getAddress());		
+				} else {
+					String message = "To EPR is not present in the request message. Need this information to set acksTo & replyTo value of reply messages";
+					log.error(message);
+					throw new SandeshaException (message);
+				}
+			} catch (AxisFault e) {
+				String message = "Cannot get request message from the operation context";
+				log.error(message);
+				log.error(e.getStackTrace());
+				throw new SandeshaException (message);
+			}
+		}
+		//Default value for acksTo is anonymous  (this happens only for the client side)
+		if (acksTo==null) {
+			acksTo = anonymousURI;
+		}
+		
+	    acksToBean = new SequencePropertyBean(
+				internalSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
+				acksTo);
+	    
+		//start the in listner for the client side, if acksTo is not anonymous.
+		if (!firstAplicationMsgCtx.isServerSide()  && !anonymousURI.equals(acksTo)) {
+		    
+			String transportInProtocol = firstAplicationMsgCtx.getOptions().getTransportInProtocol();
+		    if (transportInProtocol==null) {
+		    	throw new SandeshaException ("You must mention the transport in protocol for getting async acknowledgement messages");
+		    }
+		   
+            try {
+				ListenerManager listenerManager =
+				    firstAplicationMsgCtx.getConfigurationContext().getListenerManager();
+				TransportInDescription transportIn = firstAplicationMsgCtx.getConfigurationContext().getAxisConfiguration().getTransportIn(new QName(transportInProtocol));
+				//if acksTo is not anonymous start the in-transport
+				if (!listenerManager.isListenerRunning(transportIn.getName().getLocalPart())) {
+					listenerManager.addListener(transportIn, false);
+				}
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not stast the transport listner",e);
+			}
+			
+			
+		}
+		
+		SequencePropertyBean msgsBean = new SequencePropertyBean();
+		msgsBean.setSequenceID(internalSequenceId);
+		msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+		msgsBean.setValue("");
+		
+		seqPropMgr.insert(msgsBean);
+		
+		seqPropMgr.insert(toBean);
+		if (acksToBean!=null)
+			seqPropMgr.insert(acksToBean);
+		if (replyToBean!=null)
+			seqPropMgr.insert(replyToBean);
+		
+		//saving transportTo value;
+		String transportTo = (String) firstAplicationMsgCtx.getProperty(MessageContextConstants.TRANSPORT_URL);
+		if (transportTo!=null) {
+			SequencePropertyBean transportToBean = new SequencePropertyBean ();
+			transportToBean.setSequenceID(internalSequenceId);
+			transportToBean.setName(Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+			transportToBean.setValue(transportTo);
+			
+			seqPropMgr.insert(transportToBean);
+		}
+
+
+		//setting the spec version for the client side.
+		SequencePropertyBean specVerionBean = new SequencePropertyBean ();
+		specVerionBean.setSequenceID(internalSequenceId);
+		specVerionBean.setName(Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+		specVerionBean.setValue(specVersion);
+		seqPropMgr.insert(specVerionBean);
+		
+		//updating the last activated time.
+		updateLastActivatedTime(internalSequenceId,storageManager);
+		
+		SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
+		
+		
+		
+		updateClientSideListnerIfNeeded (firstAplicationMsgCtx,anonymousURI);
+		
+	}
+	
+	private static void updateClientSideListnerIfNeeded (MessageContext messageContext, String addressingAnonymousURI) throws SandeshaException {
+		if (messageContext.isServerSide())
+			return;   //listners are updated only for the client side.
+		
+		String transportInProtocol = messageContext.getOptions().getTransportInProtocol();
+		
+		String acksTo = (String) messageContext.getProperty(SandeshaClientConstants.AcksTo);
+		String mep = messageContext.getAxisOperation().getMessageExchangePattern();
+		
+		boolean startListnerForAsyncAcks = false;
+		boolean startListnerForAsyncControlMsgs = false;   //For async createSerRes & terminateSeq.
+		
+		if (acksTo!=null && !addressingAnonymousURI.equals(acksTo)) {
+			//starting listner for async acks.
+			startListnerForAsyncAcks = true;
+		}
+		
+		if (mep!=null && !AxisOperation.MEP_URI_OUT_ONLY.equals(mep)) {
+			//starting listner for the async createSeqResponse & terminateSer messages.
+			startListnerForAsyncControlMsgs = true;
+		}
+		
+		try {
+			if ((startListnerForAsyncAcks || startListnerForAsyncControlMsgs) && transportInProtocol==null)
+				throw new SandeshaException ("Cant start the listner since the TransportInProtocol is not set.");		
+			
+		} catch (AxisFault e) {
+			String message = "Cant start the listner for incoming messages";
+			log.error(e.getStackTrace());
+			throw new SandeshaException (message,e);
+		}
+	
+	}
+	
+	/**
+	 * Takes the internalSeqID as the param. Not the sequenceID.
+	 * @param internalSequenceID
+	 * @param configContext
+	 * @throws SandeshaException
+	 */
+	public static void updateLastActivatedTime (String propertyKey, StorageManager storageManager) throws SandeshaException {
+		//Transaction lastActivatedTransaction = storageManager.getTransaction();
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean lastActivatedBean = sequencePropertyBeanMgr.retrieve(propertyKey, Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		
+		boolean added = false;
+		
+		if (lastActivatedBean==null) {
+			added = true;
+			lastActivatedBean = new SequencePropertyBean ();
+			lastActivatedBean.setSequenceID(propertyKey);
+			lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		}
+		
+		long currentTime = System.currentTimeMillis();
+		lastActivatedBean.setValue(Long.toString(currentTime));
+		
+		if (added)
+			sequencePropertyBeanMgr.insert(lastActivatedBean);
+		else
+			sequencePropertyBeanMgr.update(lastActivatedBean);
+		
+	}
+	
+	
+	public static long getLastActivatedTime (String propertyKey, StorageManager storageManager) throws SandeshaException {
+		
+		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(propertyKey,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		
+		long lastActivatedTime = -1;
+		
+		if (lastActivatedBean!=null) {
+			lastActivatedTime = Long.parseLong(lastActivatedBean.getValue());
+		}
+		
+		return lastActivatedTime;
+	}
+		
+	public static boolean hasSequenceTimedOut (String propertyKey, RMMsgContext rmMsgCtx,StorageManager storageManager) throws SandeshaException {
+		
+		//operation is the lowest level, Sandesha2 could be engaged.
+		SandeshaPropertyBean propertyBean = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation());
+		
+		if (propertyBean.getInactiveTimeoutInterval()<=0)
+			return false;
+
+		boolean sequenceTimedOut = false;
+		
+		long lastActivatedTime = getLastActivatedTime(propertyKey,storageManager);
+		long timeNow = System.currentTimeMillis();
+		if (lastActivatedTime>0 && (lastActivatedTime+propertyBean.getInactiveTimeoutInterval()<timeNow))
+			sequenceTimedOut = true;
+		
+		return sequenceTimedOut;
+	}
+	
+	public static long getOutGoingSequenceAckedMessageCount (String internalSequenceID,StorageManager storageManager) throws SandeshaException {
+///		Transaction transaction = storageManager.getTransaction();
+		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+		findSeqIDBean.setValue(internalSequenceID);
+		findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+		Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+		
+		if (seqIDBeans.size()==0) {
+			String message = "A sequence with give data has not been created";
+			log.debug(message);
+			throw new SandeshaException (message);
+		}
+		
+		if (seqIDBeans.size()>1) {
+			String message = "Sequence data is not unique. Cant generate report";
+			log.debug(message);
+			throw new SandeshaException (message);
+		}
+		
+		SequencePropertyBean seqIDBean = (SequencePropertyBean) seqIDBeans.iterator().next();
+		String sequenceID = seqIDBean.getSequenceID();
+
+		SequencePropertyBean ackedMsgBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
+		if (ackedMsgBean==null)
+			return 0; //No acknowledgement has been received yet.
+		
+		long noOfMessagesAcked = Long.parseLong(ackedMsgBean.getValue());
+///		transaction.commit();
+		
+		return noOfMessagesAcked;
+	}
+	
+	public static boolean isOutGoingSequenceCompleted (String internalSequenceID,StorageManager storageManager) throws SandeshaException {
+///		Transaction transaction = storageManager.getTransaction();
+		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+		findSeqIDBean.setValue(internalSequenceID);
+		findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+		Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+		
+		if (seqIDBeans.size()==0) {
+			String message = "A sequence with give data has not been created";
+			log.debug(message);
+			throw new SandeshaException (message);
+		}
+		
+		if (seqIDBeans.size()>1) {
+			String message = "Sequence data is not unique. Cant generate report";
+			log.debug(message);
+			throw new SandeshaException (message);
+		}
+		
+		SequencePropertyBean seqIDBean = (SequencePropertyBean) seqIDBeans.iterator().next();
+		String sequenceID = seqIDBean.getSequenceID();
+		
+		SequencePropertyBean terminateAddedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+		if (terminateAddedBean==null)
+			return false;
+		
+		if ("true".equals(terminateAddedBean.getValue()))
+			return true;
+
+///		transaction.commit();
+		return false;
+	}
+	
+	public static boolean isIncomingSequenceCompleted (String sequenceID, StorageManager storageManager) throws SandeshaException {
+		
+///		Transaction transaction = storageManager.getTransaction();
+		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean terminateReceivedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
+		boolean complete = false;
+		
+		if (terminateReceivedBean!=null && "true".equals(terminateReceivedBean.getValue()))
+			complete = true;
+		
+///		transaction.commit();
+		return complete;
+	}
+	
+	
+}
\ No newline at end of file

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,218 @@
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+
+public class SpecSpecificConstants {
+
+	private static String unknownSpecErrorMessage = "Unknown specification version";
+	
+	public static String getSpecVersionString (String namespaceValue) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceValue))
+			return Sandesha2Constants.SPEC_VERSIONS.v1_0;
+		else if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceValue))
+			return Sandesha2Constants.SPEC_VERSIONS.v1_1;
+		else
+			throw new SandeshaException ("Unknows rm namespace value");
+	}
+	
+	public static String getRMNamespaceValue (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.NS_URI;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.NS_URI;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getCreateSequenceAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CREATE_SEQUENCE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getCreateSequenceResponseAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE_RESPONSE;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CREATE_SEQUENCE_RESPONSE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getTerminateSequenceAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.Actions.ACTION_TERMINATE_SEQUENCE;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_TERMINATE_SEQUENCE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getTerminateSequenceResponseAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_TERMINATE_SEQUENCE_RESPONSE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getCloseSequenceAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			throw new SandeshaException ("This rm spec version does not define a sequenceClose action");
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CLOSE_SEQUENCE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+
+	public static String getCloseSequenceResponseAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			throw new SandeshaException ("This rm spec version does not define a sequenceClose action");
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CLOSE_SEQUENCE_RESPONSE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getAckRequestAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			throw new SandeshaException ("this spec version does not define a ackRequest action");
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_ACK_REQUEST;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getSequenceAcknowledgementAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getCreateSequenceSOAPAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_CREATE_SEQUENCE;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_CREATE_SEQUENCE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getCreateSequenceResponseSOAPAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getTerminateSequenceSOAPAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_TERMINATE_SEQUENCE;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_TERMINATE_SEQUENCE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getTerminateSequenceResponseSOAPAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_TERMINATE_SEQUENCE_RESPONSE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getAckRequestSOAPAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			throw new SandeshaException ("this spec version does not define a ackRequest SOAP action");
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_ACK_REQUEST;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getSequenceAcknowledgementSOAPAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_SEQUENCE_ACKNOWLEDGEMENT;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_SEQUENCE_ACKNOWLEDGEMENT;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isTerminateSequenceResponseRequired (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return false;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return true;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isLastMessageIndicatorRequired (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return true;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return false;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isAckFinalAllowed (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return false;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return true;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isAckNoneAllowed (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return false;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return true;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isSequenceClosingAllowed (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
+			return false;
+		else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
+			return true;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getDefaultSpecVersion () {
+		return Sandesha2Constants.SPEC_VERSIONS.v1_0;
+	}
+	
+	public static String getAddressingAnonymousURI (String addressingNSURI) throws SandeshaException {
+		if (AddressingConstants.Submission.WSA_NAMESPACE.equals(addressingNSURI))
+			return AddressingConstants.Submission.WSA_ANONYMOUS_URL;
+		else if (AddressingConstants.Final.WSA_NAMESPACE.equals(addressingNSURI))
+			return AddressingConstants.Final.WSA_ANONYMOUS_URL;
+		else
+			throw new SandeshaException ("Unknown addressing version");
+	}
+	
+	public static String getAddressingFaultAction (String addressingNSURI) throws SandeshaException {
+		if (AddressingConstants.Submission.WSA_NAMESPACE.equals(addressingNSURI))
+			return "http://schemas.xmlsoap.org/ws/2004/08/addressing/fault";  //this is not available in addressing constants )-:
+		else if (AddressingConstants.Final.WSA_NAMESPACE.equals(addressingNSURI))
+			return AddressingConstants.Final.WSA_FAULT_ACTION;
+		else
+			throw new SandeshaException ("Unknown addressing version");
+	}
+	
+}

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,441 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+
+/**
+ * Contains logic to remove all the storad data of a sequence.
+ * Methods of this are called by sending side and the receiving side when appropriate
+ * 
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class TerminateManager {
+
+	private static Log log = LogFactory.getLog(TerminateManager.class);
+	
+	private static String CLEANED_ON_TERMINATE_MSG = "CleanedOnTerminateMsg";
+	private static String CLEANED_AFTER_INVOCATION = "CleanedAfterInvocation";
+	
+	public static HashMap receivingSideCleanMap = new HashMap ();
+	/**
+	 * Called by the receiving side to remove data related to a sequence.
+	 * e.g. After sending the TerminateSequence message. Calling this methods will complete all
+	 * the data if InOrder invocation is not sequired.
+	 * 
+	 * @param configContext
+	 * @param sequenceID
+	 * @throws SandeshaException
+	 */
+	public static void cleanReceivingSideOnTerminateMessage (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException {
+		//clean senderMap
+		
+		//Currently in-order invocation is done for default values.
+		boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(configContext.getAxisConfiguration()).isInOrder();		
+		
+		if(!inOrderInvocation) { 
+			//there is no invoking by Sandesha2. So clean invocations storages.
+			cleanReceivingSideAfterInvocation(configContext,sequenceID,storageManager);
+		}
+
+		String cleanStatus = (String) receivingSideCleanMap.get(sequenceID);
+		if (cleanStatus!=null && CLEANED_AFTER_INVOCATION.equals(cleanStatus))
+			completeTerminationOfReceivingSide(configContext,sequenceID,storageManager);
+		else {
+			receivingSideCleanMap.put(sequenceID,CLEANED_ON_TERMINATE_MSG);
+		}
+	}
+	
+	/**
+	 * When InOrder invocation is anabled this had to be called to clean the data left by the 
+	 * above method. This had to be called after the Invocation of the Last Message.
+	 * 
+	 * @param configContext
+	 * @param sequenceID
+	 * @throws SandeshaException
+	 */
+	public static void cleanReceivingSideAfterInvocation (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException {
+		InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
+				
+		//removing storageMap entries
+		InvokerBean findStorageMapBean = new InvokerBean ();
+		findStorageMapBean.setSequenceID(sequenceID);
+		findStorageMapBean.setInvoked(true);
+		Collection collection = storageMapBeanMgr.find(findStorageMapBean);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			InvokerBean storageMapBean = (InvokerBean) iterator.next();
+			storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
+			
+			//removing the respective message context from the message store. If this is an in-only message.
+			//In-out message will be deleted when a ack is retrieved for the out message.
+			String messageStoreKey = storageMapBean.getMessageContextRefKey();
+			storageManager.removeMessageContext(messageStoreKey);
+			
+		}
+		
+		String cleanStatus = (String) receivingSideCleanMap.get(sequenceID);
+		if (cleanStatus!=null && CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
+			completeTerminationOfReceivingSide(configContext,sequenceID,storageManager);
+		else {
+			receivingSideCleanMap.put(sequenceID,CLEANED_AFTER_INVOCATION);
+		}
+	}
+	
+	/**
+	 * This has to be called by the lastly invocated one of the above two methods.
+	 *
+	 */
+	private static void completeTerminationOfReceivingSide (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException {
+		InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
+		NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+		
+		//removing nextMsgMgr entries
+		NextMsgBean findNextMsgBean = new NextMsgBean ();
+		findNextMsgBean.setSequenceID(sequenceID);
+		Collection collection = nextMsgBeanMgr.find(findNextMsgBean);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
+//			nextMsgBeanMgr.delete(nextMsgBean.getSequenceID());
+		}
+		
+		//removing the HighestInMessage entry.
+		String highestInMessageKey = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,storageManager);
+		if (highestInMessageKey!=null) {
+			storageManager.removeMessageContext(highestInMessageKey);
+		}
+		
+		removeReceivingSideProperties(configContext,sequenceID,storageManager);
+	}
+
+	private static void removeReceivingSideProperties (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException {
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		SequencePropertyBean allSequenceBean = sequencePropertyBeanMgr.retrieve(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+		
+		if (allSequenceBean!=null) {
+			log.debug("AllSequence bean is null");
+			
+			ArrayList allSequenceList = SandeshaUtil.getArrayListFromString(allSequenceBean.getValue());
+			allSequenceList.remove(sequenceID);
+		
+			//updating 
+			allSequenceBean.setValue(allSequenceList.toString());
+			sequencePropertyBeanMgr.update(allSequenceBean);
+		}
+	}
+	
+	private static boolean isRequiredForResponseSide (String name) {
+		if (name==null && name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO))
+			return false;
+		
+		if (name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO))
+			return false;
+		
+		return false;
+	}
+	
+	
+	/**
+	 * This is called by the sending side to clean data related to a sequence.
+	 * e.g. after sending the TerminateSequence message.
+	 * 
+	 * @param configContext
+	 * @param sequenceID
+	 * @throws SandeshaException
+	 */
+	public static void terminateSendingSide (ConfigurationContext configContext, String internalSequenceID,boolean serverSide,StorageManager storageManager) throws SandeshaException {
+		
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
+		seqPropMgr.insert(seqTerminatedBean);
+		
+		cleanSendingSideData(configContext,internalSequenceID,serverSide,storageManager);
+	}
+	
+	private static void doUpdatesIfNeeded (String sequenceID, SequencePropertyBean propertyBean, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+		
+		boolean addEntryWithSequenceID = false;
+		
+		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES)) {
+			addEntryWithSequenceID = true;
+		}
+		
+		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED)) {
+			addEntryWithSequenceID = true;
+		}
+		
+		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED)) {
+			addEntryWithSequenceID = true;
+		}
+		
+		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT)) {
+			addEntryWithSequenceID = true;
+		}
+		
+		if (addEntryWithSequenceID && sequenceID!=null) {
+			//this value cannot be completely deleted since this data will be needed by SequenceReports
+			//so saving it with the sequenceID value being the out sequenceID.
+			
+			SequencePropertyBean newBean = new SequencePropertyBean ();
+			newBean.setSequenceID(sequenceID);
+			newBean.setName(propertyBean.getName());
+			newBean.setValue(propertyBean.getValue());
+
+			seqPropMgr.insert(newBean);
+			//TODO amazingly this property does not seem to get deleted without following - in the hibernate impl 
+			//(even though the lines efter current methodcall do this).
+			seqPropMgr.delete (propertyBean.getSequenceID(),propertyBean.getName());
+		}	
+	}
+	
+	private static boolean isProportyDeletable (String name) {
+		boolean deleatable = true;
+				
+		if (Sandesha2Constants.SequenceProperties.TERMINATE_ADDED.equals(name))
+			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED.equals(name))
+			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
+			deleatable = false;
+		
+//		if (Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
+//			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED.equals(name))
+			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED.equals(name))
+			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT.equals(name))
+			deleatable = false;
+		
+		return deleatable;
+	}
+	
+	public static void timeOutSendingSideSequence (ConfigurationContext context,String internalSequenceID, boolean serverside,StorageManager storageManager) throws SandeshaException {
+		
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+		SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT,Sandesha2Constants.VALUE_TRUE);
+		seqPropMgr.insert(seqTerminatedBean);
+		
+		cleanSendingSideData(context,internalSequenceID,serverside,storageManager);
+	}
+	
+	private static void cleanSendingSideData (ConfigurationContext configContext,String internalSequenceID, boolean serverSide,StorageManager storageManager) throws SandeshaException {
+		
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
+		CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
+		
+		String outSequenceID = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,storageManager);
+		
+		if (!serverSide) {
+			boolean stopListnerForAsyncAcks = false;
+			SequencePropertyBean acksToBean = sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+				
+			String addressingNamespace = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager);
+			String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
+				
+			if (acksToBean!=null) {
+				String acksTo = acksToBean.getValue();
+				if (acksTo!=null && !anonymousURI.equals(acksTo)) {
+					stopListnerForAsyncAcks = true;
+				}
+			}
+		}
+		
+		//removing retransmitterMgr entries and corresponding message contexts.
+		Collection collection = retransmitterBeanMgr.find(internalSequenceID);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			SenderBean retransmitterBean = (SenderBean) iterator.next();
+			retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+			
+			String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+			storageManager.removeMessageContext(messageStoreKey);
+		}
+		
+		//removing the createSeqMgrEntry
+		CreateSeqBean findCreateSequenceBean = new CreateSeqBean ();
+		findCreateSequenceBean.setInternalSequenceID(internalSequenceID);
+		collection = createSeqBeanMgr.find(findCreateSequenceBean);
+		iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			CreateSeqBean createSeqBean = (CreateSeqBean) iterator.next();
+			createSeqBeanMgr.delete(createSeqBean.getCreateSeqMsgID());
+		}
+		
+		//removing sequence properties
+		SequencePropertyBean findSequencePropertyBean1 = new SequencePropertyBean ();
+		findSequencePropertyBean1.setSequenceID(internalSequenceID);
+		collection = sequencePropertyBeanMgr.find(findSequencePropertyBean1);
+		iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+			doUpdatesIfNeeded (outSequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
+			
+			//TODO all properties which hv the temm:Seq:id as the key should be deletable.
+			if (isProportyDeletable(sequencePropertyBean.getName())) {
+				sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
+			}
+		}
+	}
+	
+	public static void addTerminateSequenceMessage(RMMsgContext referenceMessage,
+			String outSequenceId, String internalSequenceId,StorageManager storageManager)
+			throws SandeshaException {
+
+		ConfigurationContext configurationContext = referenceMessage.getMessageContext().getConfigurationContext();
+
+///		Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+		
+		SequencePropertyBeanMgr seqPropMgr = storageManager
+				.getSequencePropretyBeanMgr();
+
+		SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceId,
+				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+
+		if (terminated != null && terminated.getValue() != null
+				&& "true".equals(terminated.getValue())) {
+			String message = "Terminate was added previously.";
+			log.debug (message);
+		}
+
+		RMMsgContext terminateRMMessage = RMMsgCreator
+				.createTerminateSequenceMessage(referenceMessage, outSequenceId,internalSequenceId,storageManager);
+		terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+		terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+		
+		SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceId,
+				Sandesha2Constants.SequenceProperties.TO_EPR);
+
+		EndpointReference toEPR = new EndpointReference ( toBean.getValue());
+		if (toEPR == null) {
+			String message = "To EPR has an invalid value";
+			throw new SandeshaException(message);
+		}
+
+		terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
+		
+		String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager);
+		String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
+		
+		String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,storageManager);
+		if (rmVersion==null)
+			throw new SandeshaException ("Cant find the rmVersion of the given message");
+		terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+		terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+		SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+		if (transportToBean!=null) {
+			terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
+		}
+		
+		try {
+			terminateRMMessage.addSOAPEnvelope();
+		} catch (AxisFault e) {
+			throw new SandeshaException(e.getMessage());
+		}
+
+		String key = SandeshaUtil.getUUID();
+		
+		SenderBean terminateBean = new SenderBean();
+		terminateBean.setMessageContextRefKey(key);
+
+		
+		storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext());
+		
+		//Set a retransmitter lastSentTime so that terminate will be send with
+		// some delay.
+		//Otherwise this get send before return of the current request (ack).
+		//TODO: refine the terminate delay.
+		terminateBean.setTimeToSend(System.currentTimeMillis()
+				+ Sandesha2Constants.TERMINATE_DELAY);
+
+		terminateBean.setMessageID(terminateRMMessage.getMessageId());
+		
+		//this will be set to true at the sender.
+		terminateBean.setSend(true);
+		
+		terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+				Sandesha2Constants.VALUE_FALSE);
+		
+		terminateBean.setReSend(false);
+
+		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+		retramsmitterMgr.insert(terminateBean);
+		
+		SequencePropertyBean terminateAdded = new SequencePropertyBean();
+		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+		terminateAdded.setSequenceID(outSequenceId);
+		terminateAdded.setValue("true");
+
+		seqPropMgr.insert(terminateAdded);
+		
+		//This should be dumped to the storage by the sender
+		TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
+		terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+		terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+		terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+		terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
+///		addTerminateSeqTransaction.commit();
+		
+	    AxisEngine engine = new AxisEngine (configurationContext);
+	    try {
+			engine.send(terminateRMMessage.getMessageContext());
+		} catch (AxisFault e) {
+			throw new SandeshaException (e.getMessage());
+		}
+	}
+	
+}

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,264 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * This is used when InOrder invocation is required. This is a seperated Thread that keep running
+ * all the time. At each iteration it checks the InvokerTable to find weather there are any messages to
+ * me invoked.
+ * 
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class InOrderInvoker extends Thread {
+	
+	private boolean runInvoker = false;
+	private ArrayList workingSequences = new ArrayList();
+	private ConfigurationContext context = null;
+	private static final Log log = LogFactory.getLog(InOrderInvoker.class);
+	
+	public synchronized void stopInvokerForTheSequence(String sequenceID) {
+		workingSequences.remove(sequenceID);
+		if (workingSequences.size()==0) {
+			runInvoker = false;
+		}
+	}
+	
+	public synchronized void stopInvoking () {
+		runInvoker = false;
+	}
+
+	public synchronized boolean isInvokerStarted() {
+		return runInvoker;
+	}
+
+	public void setConfugurationContext(ConfigurationContext context) {
+		this.context = context;
+	}
+
+	public synchronized void runInvokerForTheSequence(ConfigurationContext context, String sequenceID) {
+		
+		if (!workingSequences.contains(sequenceID))
+			workingSequences.add(sequenceID);
+
+		if (!isInvokerStarted()) {
+			this.context = context;
+			runInvoker = true;     //so that isSenderStarted()=true.
+			super.start();
+		}
+	}
+
+	public void run() {
+
+		while (isInvokerStarted()) {
+
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException ex) {
+				log.debug("Invoker was Inturrepted....");
+				log.debug(ex.getMessage());
+			}
+
+			Transaction transaction = null;
+			boolean rolebacked = false;
+			
+			try {
+				StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+				NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+
+				InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
+
+				SequencePropertyBeanMgr sequencePropMgr = storageManager
+						.getSequencePropretyBeanMgr();
+
+				transaction = storageManager.getTransaction();
+				
+				//Getting the incomingSequenceIdList
+				SequencePropertyBean allSequencesBean = (SequencePropertyBean) sequencePropMgr
+						.retrieve(
+								Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+								Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+				
+				if (allSequencesBean == null) {
+					continue;
+				}
+				ArrayList allSequencesList = SandeshaUtil.getArrayListFromString (allSequencesBean.getValue());
+				
+				Iterator allSequencesItr = allSequencesList.iterator();
+				
+				currentIteration: while (allSequencesItr.hasNext()) {
+					String sequenceId = (String) allSequencesItr.next();
+					
+					//commiting the old transaction
+					transaction.commit();
+					
+					//starting a new transaction for the new iteration.
+					transaction = storageManager.getTransaction();
+					
+					NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+					if (nextMsgBean == null) {
+						String message = "Next message not set correctly. Removing invalid entry.";
+						log.debug(message);
+						allSequencesItr.remove();
+						
+						//cleaning the invalid data of the all sequences.
+						allSequencesBean.setValue(allSequencesList.toString());
+						sequencePropMgr.update(allSequencesBean);	
+						continue;
+					}
+
+					long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
+					if (nextMsgno <= 0) { 
+						String message = "Invalid message number as the Next Message Number.";
+						throw new SandeshaException(message);
+					}
+
+					Iterator stMapIt = storageMapMgr.find(
+							new InvokerBean(null, nextMsgno, sequenceId))
+							.iterator();
+					
+					boolean invoked = false;
+					
+					while (stMapIt.hasNext()) {
+
+						InvokerBean stMapBean = (InvokerBean) stMapIt.next();
+						String key = stMapBean.getMessageContextRefKey();
+
+						MessageContext msgToInvoke = storageManager.retrieveMessageContext(key,context);
+						RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
+
+						//have to commit the transaction before invoking. This may get changed when WS-AT is available.
+						transaction.commit();
+						
+						try {
+							//Invoking the message.														
+							msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+		
+							boolean postFailureInvocation = false;
+							
+							//StorageManagers should st following property to true, to indicate that the message received comes after a failure.
+							String postFaulureProperty = (String) msgToInvoke.getProperty(Sandesha2Constants.POST_FAILURE_MESSAGE);
+							if (postFaulureProperty!=null && Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
+								postFailureInvocation = true;  
+							
+							AxisEngine engine = new AxisEngine (context);
+							if (postFailureInvocation) {
+								makeMessageReadyForReinjection (msgToInvoke);
+								engine.receive(msgToInvoke);
+							} else {
+								engine.resume(msgToInvoke);
+							}
+							
+							invoked = true;
+
+						} catch (Exception e) {
+							throw new SandeshaException(e);
+						} finally {
+							transaction = storageManager.getTransaction();
+						}
+						
+						//Service will be invoked only once. I.e. even if an exception get thrown in invocation
+						//the service will not be invoked again. 
+						storageMapMgr.delete(key);
+						
+						//removing the corresponding message context as well.
+						MessageContext msgCtx = storageManager.retrieveMessageContext(key,context);
+						if (msgCtx!=null) {
+							storageManager.removeMessageContext(key);
+						}
+						
+						
+						//undating the next msg to invoke
+
+						if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+							Sequence sequence = (Sequence) rmMsg
+									.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+							if (sequence.getLastMessage() != null) {
+								TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId,storageManager);
+								//exit from current iteration. (since an entry was removed)
+								break currentIteration;
+							}
+						}
+					}
+
+					if (invoked) {
+						nextMsgno++;
+						nextMsgBean.setNextMsgNoToProcess(nextMsgno);
+						nextMsgMgr.update(nextMsgBean);
+					}	
+				}
+				
+			} catch (Exception e) {				
+				if (transaction!=null) {
+					try {
+						transaction.rollback();
+						rolebacked = true;
+					} catch (Exception e1) {
+						String message = "Exception thrown when trying to roleback the transaction.";
+						log.debug(message,e1);
+					}
+				}
+				String message = "Sandesha2 got an exception when trying to invoke the message";
+				log.debug(message,e);
+			} finally { 
+				if (!rolebacked && transaction!=null) {
+					try {
+						transaction.commit();
+					} catch (Exception e) {
+						String message = "Exception thrown when trying to commit the transaction.";
+						log.debug(message,e);
+					}
+				}
+			}
+		}
+	}
+	
+	private void makeMessageReadyForReinjection (MessageContext messageContext) {
+		messageContext.setProperty(AddressingConstants.WS_ADDRESSING_VERSION,null);
+		messageContext.getOptions().setMessageId(null);
+		messageContext.getOptions().setTo(null);
+		messageContext.getOptions().setAction(null);
+		messageContext.setProperty(Sandesha2Constants.REINJECTED_MESSAGE,Sandesha2Constants.VALUE_TRUE);
+	}
+}
\ No newline at end of file

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,387 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+/**
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
+ * 
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class Sender extends Thread {
+
+	private boolean runSender = false;
+	private ArrayList workingSequences = new ArrayList();
+	private ConfigurationContext context = null;
+	private static final Log log = LogFactory.getLog(Sender.class);
+
+	public synchronized void stopSenderForTheSequence(String sequenceID) {
+		workingSequences.remove(sequenceID);
+		if (workingSequences.size() == 0) {
+			 runSender = false;
+		}
+	}
+	
+	public synchronized void stopSending () {
+		runSender = false;
+	}
+
+	public synchronized boolean isSenderStarted() {
+		return runSender;
+	}
+
+	public void run() {
+
+		StorageManager storageManager = null;
+
+		try {
+			storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+		} catch (SandeshaException e2) {
+			// TODO Auto-generated catch block
+			log.debug("ERROR: Could not start sender");
+			e2.printStackTrace();
+			return;
+		}
+
+		while (isSenderStarted()) {
+
+			try {
+				Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
+			} catch (InterruptedException e1) {
+				// e1.printStackTrace();
+				log.debug("Sender was interupted...");
+				log.debug(e1.getMessage());
+				log.debug("End printing Interrupt...");
+			}
+			
+			Transaction transaction = null;
+			boolean rolebacked = false;
+			
+			try {
+				if (context == null) {
+					String message = "Can't continue the Sender. Context is null";
+					log.debug(message);
+					throw new SandeshaException(message);
+				}
+				
+				transaction = storageManager.getTransaction();
+				
+				SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr();
+				SenderBean senderBean = mgr.getNextMsgToSend();
+				if (senderBean==null) {
+					continue;
+			    }
+				
+				String key = (String) senderBean.getMessageContextRefKey();
+				MessageContext msgCtx = storageManager.retrieveMessageContext(key, context);
+				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+				
+				MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
+				boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context,storageManager);
+				if (!continueSending) {
+					continue;
+				}
+
+				if (msgCtx == null) {
+					String message = "Message context is not present in the storage";
+				}
+
+				// sender will not send the message if following property is
+				// set and not true.
+				// But it will set if it is not set (null)
+
+				// This is used to make sure that the mesage get passed the Sandesha2TransportSender.
+
+				String qualifiedForSending = (String) msgCtx.getProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING);
+				if (qualifiedForSending != null && !qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
+					continue;
+				}
+
+				if (msgCtx == null) {
+					log.debug("ERROR: Sender has an Unavailable Message entry");
+					break;
+				}
+
+				RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+				
+				//operation is the lowest level Sandesha2 should be attached
+				ArrayList msgsNotToSend = SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation()).getMsgTypesToDrop();
+
+				if (msgsNotToSend != null && msgsNotToSend.contains(new Integer(rmMsgCtx.getMessageType()))) {
+					continue;
+				}
+				
+				updateMessage(msgCtx);
+
+				int messageType = rmMsgCtx.getMessageType();
+				if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
+					Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+					String sequenceID = sequence.getIdentifier().getIdentifier();
+				}
+				
+				//checking weather this message can carry piggybacked acks
+				if (isAckPiggybackableMsgType(messageType) && !isAckAlreadyPiggybacked(rmMsgCtx)) {
+					// piggybacking if an ack if available for the same sequence.
+					//TODO do piggybacking based on wsa:To
+					AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx,storageManager);
+				}
+				
+				//sending the message
+				TransportOutDescription transportOutDescription = msgCtx.getTransportOut();
+				TransportSender transportSender = transportOutDescription.getSender();
+					
+				boolean successfullySent = false;
+				if (transportSender != null) {
+					
+					//have to commit the transaction before sending. This may get changed when WS-AT is available.
+					transaction.commit();
+					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
+					try {
+						
+						//had to fully build the SOAP envelope to support retransmissions.
+						//Otherwise a 'parserAlreadyAccessed' exception could get thrown in retransmissions.
+						//But this has a performance reduction.
+						msgCtx.getEnvelope().build();
+						
+						//TODO change this to cater for security.
+						transportSender.invoke(msgCtx);
+						successfullySent = true;
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+					    String message = "Sandesha2 got an exception when trying to send the message";
+						log.debug(message,e);
+					} finally {
+						transaction =  storageManager.getTransaction();
+						msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+					}
+				}
+
+				// update or delete only if the object is still present.
+				SenderBean bean1 = mgr.retrieve(senderBean.getMessageID());
+				if (bean1 != null) {
+					if (senderBean.isReSend()) {
+						bean1.setSentCount(senderBean.getSentCount());
+						bean1.setTimeToSend(senderBean.getTimeToSend());
+						mgr.update(bean1);
+					} else {
+						mgr.delete(bean1.getMessageID());
+						
+						//removing the message from the storage.
+						String messageStoredKey = bean1.getMessageContextRefKey();
+						storageManager.removeMessageContext(messageStoredKey);
+					}
+				}
+				
+				if (successfullySent) {
+					if (!msgCtx.isServerSide())
+						checkForSyncResponses(msgCtx);
+				}
+				
+				if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+					// terminate sending side.
+					TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+					String sequenceID = terminateSequence.getIdentifier().getIdentifier();
+					ConfigurationContext configContext = msgCtx.getConfigurationContext();
+					
+					String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,storageManager);
+					TerminateManager.terminateSendingSide(configContext,internalSequenceID, msgCtx.isServerSide(),storageManager);
+				}
+
+				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
+				
+			} catch (Exception e) {
+				
+				// TODO : when this is the client side throw the exception to
+				// the client when necessary.
+				
+				if (transaction!=null) {
+					try {
+						transaction.rollback();
+						rolebacked = true;
+					} catch (Exception e1) {
+						String message = "Exception thrown when trying to roleback the transaction.";
+						log.debug(message,e1);
+					}
+				}
+				
+				String message = "An Exception was throws in sending";
+				log.debug(message,e);
+			} finally {
+				if (transaction!=null && !rolebacked) {
+					try {
+						transaction.commit();
+					} catch (Exception e) {
+						String message = "Exception thrown when trying to commit the transaction.";
+						log.debug(message,e);
+					}
+				}
+			}
+		}
+	}
+
+	public synchronized void runSenderForTheSequence(
+			ConfigurationContext context, String sequenceID) {
+
+		if (sequenceID != null && !workingSequences.contains(sequenceID))
+			workingSequences.add(sequenceID);
+
+		if (!isSenderStarted()) {
+			this.context = context;
+			runSender = true; // so that isSenderStarted()=true.
+			super.start();
+		}
+	}
+
+	private void updateMessage(MessageContext msgCtx1) throws SandeshaException {
+		//do updates if required.
+	}
+
+	private void checkForSyncResponses(MessageContext msgCtx)
+			throws SandeshaException {
+
+		try {
+
+			boolean responsePresent = (msgCtx
+					.getProperty(MessageContext.TRANSPORT_IN) != null);
+			if (!responsePresent)
+				return;
+			
+			// create the responseMessageContext
+
+			MessageContext responseMessageContext = new MessageContext();
+			responseMessageContext.setServerSide(false);
+			responseMessageContext.setConfigurationContext(msgCtx
+					.getConfigurationContext());
+			responseMessageContext.setTransportIn(msgCtx.getTransportIn());
+			responseMessageContext.setTransportOut(msgCtx.getTransportOut());
+
+			responseMessageContext.setProperty(MessageContext.TRANSPORT_IN,
+					msgCtx.getProperty(MessageContext.TRANSPORT_IN));
+			responseMessageContext.setServiceContext(msgCtx.getServiceContext());
+			responseMessageContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
+			
+			//copying required properties from op. context to the response msg ctx.
+			OperationContext requestMsgOpCtx = msgCtx.getOperationContext();
+			if (requestMsgOpCtx!=null) {
+			     if (responseMessageContext.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE)==null) {
+			    	 responseMessageContext.setProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE,
+			    			 requestMsgOpCtx.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE));
+			     }
+			     
+			     if (responseMessageContext.getProperty(HTTPConstants.CHAR_SET_ENCODING)==null) {
+			    	 responseMessageContext.setProperty(HTTPConstants.CHAR_SET_ENCODING,
+			    			 requestMsgOpCtx.getProperty(HTTPConstants.CHAR_SET_ENCODING));
+			     }
+			}
+
+			// If request is REST we assume the responseMessageContext is REST,
+			// so set the variable
+
+		    responseMessageContext.setDoingREST(msgCtx.isDoingREST());
+
+			SOAPEnvelope resenvelope = null;
+			try {
+				resenvelope = TransportUtils.createSOAPMessage(msgCtx, 
+						msgCtx.getEnvelope().getNamespace().getName());
+				
+			} catch (AxisFault e) {
+				// TODO Auto-generated catch block
+				log.debug("Valid SOAP envelope not found");
+				log.debug(e.getStackTrace().toString());
+			}
+			
+			//if the request msg ctx is withina a transaction, processing if the response should also happen
+			//withing the same transaction
+			responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION
+					,msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
+			
+			if (resenvelope != null) {
+				responseMessageContext.setEnvelope(resenvelope);
+				AxisEngine engine = new AxisEngine(msgCtx
+						.getConfigurationContext());
+				
+				if (isFaultEnvelope(resenvelope)) {
+					engine.receiveFault(responseMessageContext);
+				}else {
+					engine.receive(responseMessageContext);
+				}
+			} 
+			
+		} catch (Exception e) {
+			String message = "No valid Sync response...";
+			log.debug(message);
+			throw new SandeshaException(message, e);
+		}
+	}
+	
+	private boolean isAckPiggybackableMsgType(int messageType) {
+		boolean piggybackable = true;
+		
+		if (messageType==Sandesha2Constants.MessageTypes.ACK) 
+			piggybackable = false;
+		
+		return piggybackable;	
+	}
+	
+	private boolean isAckAlreadyPiggybacked (RMMsgContext rmMessageContext) {
+		if (rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT)!=null)
+			return true;
+		
+		return false;
+	}
+	
+	private boolean isFaultEnvelope (SOAPEnvelope envelope) throws SandeshaException {		
+		SOAPFault fault = envelope.getBody().getFault();
+		if (fault!=null)
+			return true;
+		else
+			return false;
+	}
+	
+}
\ No newline at end of file

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/Accept.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/Accept.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/Accept.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/Accept.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,110 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ * @author Sanka Samaranayaka <ss...@gmail.com>
+ * @author Saminda Abeyruwan  <sa...@opensource.lk>
+ */
+
+public class Accept implements IOMRMElement {
+
+	private AcksTo acksTo;
+	
+	private OMFactory defaultFactory;
+	
+	private String rmNamespaceValue;
+	
+	private String addressingNamespaceValue;
+
+
+	public Accept(OMFactory factory, String rmNamespaceValue, String addressingNamespaceValue) throws SandeshaException {
+		if (!isNamespaceSupported(rmNamespaceValue))
+			throw new SandeshaException ("Unsupported namespace");
+		
+		this.defaultFactory = factory;
+		this.addressingNamespaceValue = addressingNamespaceValue;
+		this.rmNamespaceValue = rmNamespaceValue;
+	}
+
+	public String getNamespaceValue(){
+		return rmNamespaceValue;
+	}
+
+	public Object fromOMElement(OMElement element) throws OMException,SandeshaException {
+
+		OMFactory factory = element.getOMFactory();
+		if (factory==null)
+			factory = defaultFactory;
+		
+		OMElement acceptPart = element.getFirstChildWithName(new QName(
+				rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.ACCEPT));
+		if (acceptPart == null)
+			throw new OMException("Passed element does not contain an Accept part");
+
+		acksTo = new AcksTo(defaultFactory,rmNamespaceValue,addressingNamespaceValue);
+		acksTo.fromOMElement(acceptPart);
+
+		return this;
+	}
+
+	public OMElement toOMElement(OMElement element) throws OMException {
+
+		OMFactory factory = element.getOMFactory();
+		if (factory==null)
+			factory = defaultFactory;
+		
+		if (acksTo == null)
+			throw new OMException("Cant add Accept part since AcksTo object is null");
+
+		OMNamespace rmNamespace = factory.createOMNamespace(rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+		OMElement acceptElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.ACCEPT, rmNamespace);
+		
+		acksTo.toOMElement(acceptElement);
+		element.addChild(acceptElement);
+
+		return element;
+	}
+
+	public void setAcksTo(AcksTo acksTo) {
+		this.acksTo = acksTo;
+	}
+
+	public AcksTo getAcksTo() {
+		return acksTo;
+	}
+	
+	public boolean isNamespaceSupported (String namespaceName) {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceName))
+			return true;
+		
+		return false;
+	}
+}
\ No newline at end of file

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckFinal.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckFinal.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckFinal.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckFinal.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ * 
+ * This represent the wsrm:final element that may be present withing a sequence acknowledgement.
+ */
+public class AckFinal implements IOMRMElement {
+
+	private OMFactory defaultfactory;
+	
+	private String namespaceValue = null;
+	
+	public AckFinal(OMFactory factory,String namespaceValue) throws SandeshaException {
+		if (!isNamespaceSupported(namespaceValue))
+			throw new SandeshaException ("Unsupported namespace");
+		
+		this.defaultfactory = factory;
+		this.namespaceValue = namespaceValue;
+	}
+
+	public String getNamespaceValue(){
+		return namespaceValue;
+	}
+
+	public Object fromOMElement(OMElement element) throws OMException {
+		
+		OMFactory factory = element.getOMFactory();
+		if (factory==null)
+			factory = defaultfactory;
+		
+		OMElement finalPart = element.getFirstChildWithName(new QName(
+				namespaceValue, Sandesha2Constants.WSRM_COMMON.FINAL));
+		if (finalPart == null)
+			throw new OMException("The passed element does not contain a 'Final' part");
+
+		return this;
+	}
+
+	public OMElement toOMElement(OMElement sequenceAckElement) throws OMException {
+		//soapheaderblock element will be given
+
+		OMFactory factory = sequenceAckElement.getOMFactory();
+		if (factory==null)
+			factory = defaultfactory;
+		
+		OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+		OMElement finalElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.FINAL, rmNamespace);
+		sequenceAckElement.addChild(finalElement);
+
+		return sequenceAckElement;
+	}
+
+	
+	//this element is only supported in 2005_05_10 spec.
+	public boolean isNamespaceSupported (String namespaceName) {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+			return false;
+		
+		if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceName))
+			return true;
+		
+		return false;
+	}
+}

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckNone.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckNone.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckNone.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckNone.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,88 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ * 
+ * This represent the wsrm:none element that may be present withing a sequence acknowledgement.
+ */
+public class AckNone implements IOMRMElement {
+
+	private OMNamespace rmNamespace = null;
+	
+	private OMFactory defaultFactory;
+	
+	private String namespaceValue = null;
+	
+	public AckNone(OMFactory factory,String namespaceValue) throws SandeshaException {
+		if (!isNamespaceSupported(namespaceValue))
+			throw new SandeshaException ("Unsupported namespace");
+		
+		this.defaultFactory = factory;
+		this.namespaceValue = namespaceValue;
+	}
+
+	public String getNamespaceValue(){
+		return namespaceValue;
+	}
+
+	public Object fromOMElement(OMElement element) throws OMException {
+	    
+		OMElement nonePart = element.getFirstChildWithName(new QName(
+				namespaceValue, Sandesha2Constants.WSRM_COMMON.NONE));
+		if (nonePart == null)
+			throw new OMException("The passed element does not contain a 'None' part");
+
+		return this;
+	}
+
+	public OMElement toOMElement(OMElement sequenceAckElement) throws OMException {
+		
+		OMFactory factory = sequenceAckElement.getOMFactory();
+	    if (factory==null)
+	    	factory = defaultFactory;
+	    
+	    OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+	    OMElement noneElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.NONE, rmNamespace);
+
+		sequenceAckElement.addChild(noneElement);
+		return sequenceAckElement;
+	}
+	
+	//this element is only supported in 2005_05_10 spec.
+	public boolean isNamespaceSupported (String namespaceName) {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+			return false;
+		
+		if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceName))
+			return true;
+		
+		return false;
+	}
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org