You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/03/07 03:39:04 UTC

svn commit: r383751 [1/2] - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/ handlers/ msgprocessors/ util/ workers/ wsrm/

Author: chamikara
Date: Mon Mar  6 18:39:01 2006
New Revision: 383751

URL: http://svn.apache.org/viewcvs?rev=383751&view=rev
Log:
Add the AckFinal, AckNone, CloseSequence, CloseSequenceResponse elements as required by the new spec. Scenario 1.3 is working.

Corrected the fault handlong logic.
Bug fixes.

Added:
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/AckFinal.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/AckNone.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/CloseSequence.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/CloseSequenceResponse.java
Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/MsgInitializer.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/IOMRMPart.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/RMElements.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Mon Mar  6 18:39:01 2006
@@ -21,19 +21,35 @@
 import java.util.Collection;
 import java.util.Iterator;
 
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisOperationFactory;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisEngine;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
 import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.PropertyManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaPropertyBean;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.wsrm.Sequence;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
 
 /**
  * Contains logic for managing acknowledgements.
@@ -175,11 +191,209 @@
 		return completedMsgList;
 	}
 	
-	public static void sendSyncAck () {
+	public static RMMsgContext generateAckMessage (RMMsgContext referenceRMMessage, String sequenceID)throws SandeshaException {
+		
+		MessageContext referenceMsg = referenceRMMessage.getMessageContext();
+		
+		ConfigurationContext configurationContext = referenceRMMessage.getMessageContext().getConfigurationContext();
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		//Setting the ack depending on AcksTo.
+		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceID,
+				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+		EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
+		String acksToStr = acksTo.getAddress();
+
+		if (acksToStr == null)
+			throw new SandeshaException(
+					"acksToStr Seqeunce property is not set correctly");
+		
+		AxisOperation ackOperation = null;
+
+		try {
+			ackOperation = AxisOperationFactory.getOperationDescription(AxisOperationFactory.MEP_URI_IN_ONLY);
+		} catch (AxisFault e) {
+			throw new SandeshaException("Could not create the Operation");
+		}
+
+		AxisOperation rmMsgOperation = referenceRMMessage.getMessageContext()
+				.getAxisOperation();
+		if (rmMsgOperation != null) {
+			ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
+			if (outFlow != null) {
+				ackOperation.setPhasesOutFlow(outFlow);
+				ackOperation.setPhasesOutFaultFlow(outFlow);
+			}
+		}
+
+		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(
+				referenceRMMessage, ackOperation);
+		
+		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+		
+		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+		ackRMMsgCtx.setRMNamespaceValue(referenceRMMessage.getRMNamespaceValue());
+		
+		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+				.getSOAPVersion(referenceMsg.getEnvelope()));
+		
+		//Setting new envelope
+		SOAPEnvelope envelope = factory.getDefaultEnvelope();
+		try {
+			ackMsgCtx.setEnvelope(envelope);
+		} catch (AxisFault e3) {
+			throw new SandeshaException(e3.getMessage());
+		}
+
+		ackMsgCtx.setTo(acksTo);
+		ackMsgCtx.setReplyTo(referenceMsg.getTo());
+		
+		//adding the SequenceAcknowledgement part.
+		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceID);
+
+		if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
+
+//			AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
+//					.getConfigurationContext());
+
+			//setting CONTEXT_WRITTEN since acksto is anonymous
+			if (referenceRMMessage.getMessageContext().getOperationContext() == null) {
+				//operation context will be null when doing in a GLOBAL
+				// handler.
+				try {
+					AxisOperation op = AxisOperationFactory
+							.getAxisOperation(AxisOperationFactory.MEP_CONSTANT_IN_OUT);
+					OperationContext opCtx = new OperationContext(op);
+					referenceRMMessage.getMessageContext().setAxisOperation(op);
+					referenceRMMessage.getMessageContext().setOperationContext(opCtx);
+				} catch (AxisFault e2) {
+					throw new SandeshaException(e2.getMessage());
+				}
+			}
+
+			referenceRMMessage.getMessageContext().getOperationContext().setProperty(
+					org.apache.axis2.Constants.RESPONSE_WRITTEN,
+					Constants.VALUE_TRUE);
+
+			referenceRMMessage.getMessageContext().setProperty(
+					Sandesha2Constants.ACK_WRITTEN, "true");
+//			try {
+//				engine.send(ackRMMsgCtx.getMessageContext());
+//			} catch (AxisFault e1) {
+//				throw new SandeshaException(e1.getMessage());
+//			}
+			return ackRMMsgCtx;
+			
+		} else {
+
+			Transaction asyncAckTransaction = storageManager.getTransaction();
+
+			SenderBeanMgr retransmitterBeanMgr = storageManager
+					.getRetransmitterBeanMgr();
+
+			String key = SandeshaUtil.getUUID();
+			
+			//dumping to the storage will be done be Sandesha2 Transport Sender
+			//storageManager.storeMessageContext(key,ackMsgCtx);
+			
+			SenderBean ackBean = new SenderBean();
+			ackBean.setMessageContextRefKey(key);
+			ackBean.setMessageID(ackMsgCtx.getMessageID());
+			ackBean.setReSend(false);
+			
+			//this will be set to true in the sender.
+			ackBean.setSend(true);
+			
+			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+					Sandesha2Constants.VALUE_FALSE);
+			
+			ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+			
+			//the internalSequenceId value of the retransmitter Table for the
+			// messages related to an incoming
+			//sequence is the actual sequence ID
+
+//			RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
+//					.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
+		
+//			long ackInterval = PropertyManager.getInstance()
+//					.getAcknowledgementInterval();
+			
+			Parameter param = referenceMsg.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
+			
+			SandeshaPropertyBean propertyBean = null;
+			if (param!=null) {
+				propertyBean = (SandeshaPropertyBean)  param.getValue();
+			}else {
+				propertyBean = PropertyManager.getInstance().getPropertyBean();
+			}
+			
+			
+			long ackInterval = propertyBean.getAcknowledgementInaterval();
+			
+			//			if (policyBean != null) {
+//				ackInterval = policyBean.getAcknowledgementInaterval();
+//			}
+			
+			//Ack will be sent as stand alone, only after the retransmitter
+			// interval.
+			long timeToSend = System.currentTimeMillis() + ackInterval;
+
+			//removing old acks.
+			SenderBean findBean = new SenderBean();
+			findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+			
+			//this will be set to true in the sandesha2TransportSender.
+			findBean.setSend(true);
+			findBean.setReSend(false);
+			Collection coll = retransmitterBeanMgr.find(findBean);
+			Iterator it = coll.iterator();
+
+			if (it.hasNext()) {
+				SenderBean oldAckBean = (SenderBean) it.next();
+				timeToSend = oldAckBean.getTimeToSend();		//If there is an old ack. This ack will be sent in the old timeToSend.
+				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+			}
+			
+			ackBean.setTimeToSend(timeToSend);
+
+			storageManager.storeMessageContext(key,ackMsgCtx);
+			
+			//inserting the new ack.
+			retransmitterBeanMgr.insert(ackBean);
+
+			asyncAckTransaction.commit();
+
+			//passing the message through sandesha2sender
+
+			ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
+			ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+			
+			ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+			
+			ackMsgCtx.setTransportOut(new Sandesha2TransportOutDesc ());
+			
+//			AxisEngine engine = new AxisEngine (configurationContext);
+//			try {
+//				engine.send(ackMsgCtx);
+//			} catch (AxisFault e) {
+//				throw new SandeshaException (e.getMessage());
+//			}
+			
+			RMMsgContext ackRMMessageCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+			
+			SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
+			
+			referenceMsg.pause(); 
+			
+			return ackRMMessageCtx;
+		}
+		
 		
-	}
-	
-	public static void sendAsyncAck () {
 		
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Mon Mar  6 18:39:01 2006
@@ -17,6 +17,8 @@
 
 package org.apache.sandesha2;
 
+import org.apache.axis2.addressing.AddressingConstants;
+
 
 /**
  * Contains all the Sandesha2Constants of Sandesha2.
@@ -31,8 +33,8 @@
 
 	
 	public interface SPEC_VERSIONS {
-		String WSRM = "Spec_2005_10";
-		String WSRX = "Spec_2005_02";
+		String WSRM = "Spec_2005_02";
+		String WSRX = "Spec_2005_10";
 	}
 	
 	public interface SPEC_2005_02 {
@@ -73,8 +75,14 @@
 
 			String ACTION_TERMINATE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/TerminateSequence";
 			
+			String ACTION_TERMINATE_SEQUENCE_RESPONSE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/TerminateSequenceResponse";
+			
+			String ACTION_ACK_REQUEST = "http://docs.oasis-open.org/ws-rx/wsrm/200602/AckRequested";
+			
 			String ACTION_CLOSE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CloseSequence";
 			
+			String ACTION_CLOSE_SEQUENCE_RESPONSE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CloseSequenceResponse";
+			
 			String SOAP_ACTION_CREATE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CreateSequence";
 
 			String SOAP_ACTION_CREATE_SEQUENCE_RESPONSE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CreateSequenceResponse";
@@ -83,6 +91,8 @@
 
 			String SOAP_ACTION_TERMINATE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/TerminateSequence";
 			
+			String SOAP_ACTION_ACK_REQUEST = "http://docs.oasis-open.org/ws-rx/wsrm/200510/AckRequested";
+			
 			String SOAP_ACTION_CLOSE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CloseSequence";
 		}
 	}
@@ -103,6 +113,10 @@
 
 		String TERMINATE_SEQUENCE = "TerminateSequence";
 
+		String CLOSE_SEQUENCE = "CloseSequence";
+		
+		String CLOSE_SEQUENCE_RESPONSE = "CloseSequenceResponse";
+		
 		String TERMINATE_SEQUENCE_RESPONSE = "TerminateSequenceResponse";
 		
 		String FAULT_CODE = "FaultCode";
@@ -132,12 +146,16 @@
 		String IDENTIFIER = "Identifier";
 
 		String ACCEPT = "Accept";
+		
+		String NONE = "None";
+		
+		String FINAL = "Final";
 	}
 
 	public interface WSA {
-		String NS_URI_ANONYMOUS = "http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous";
+		String NS_URI_ANONYMOUS = AddressingConstants.Final.WSA_ANONYMOUS_URL;  // "http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous";
 
-		String NS_URI_ADDRESSING = "http://schemas.xmlsoap.org/ws/2004/08/addressing";
+		String NS_URI_ADDRESSING = AddressingConstants.Final.WSA_NAMESPACE; //"http://schemas.xmlsoap.org/ws/2004/08/addressing";
 
 		String NS_PREFIX_ADDRESSING = "wsa";
 
@@ -163,11 +181,15 @@
 		
 		int CLOSE_SEQUENCE = 5;
 
-		int TERMINATE_SEQ = 6;
+		int CLOSE_SEQUENCE_RESPONSE = 6;
 		
-		int TERMINATE_SEQ_RESPONSE = 7;
+		int TERMINATE_SEQ = 7;
+		
+		int ACK_REQUEST = 8;
+		
+		int TERMINATE_SEQ_RESPONSE = 9;
 
-		int MAX_MESSAGE_TYPE = 7;
+		int MAX_MESSAGE_TYPE = 9;
 	}
 
 	public interface MessageParts {
@@ -184,12 +206,16 @@
 		int CREATE_SEQ_RESPONSE = 10;
 
 		int TERMINATE_SEQ = 11;
+		
+		int CLOSE_SEQUENCE = 12;
+		
+		int CLOSE_SEQUENCE_RESPONSE = 13;
 
-		int TERMINATE_SEQ_RESPONSE = 12;
+		int TERMINATE_SEQ_RESPONSE = 14;
 		
-		int ACK_REQUEST = 13;
+		int ACK_REQUEST = 15;
 
-		int MAX_MSG_PART_ID = 13;
+		int MAX_MSG_PART_ID = 15;
 	}
 
 	public interface SequenceProperties {
@@ -243,6 +269,15 @@
 		String TRANSPORT_TO = "TransportTo";
 		
 		String OUT_SEQ_ACKSTO = "OutSequenceAcksTo";
+		
+		String SEQUENCE_CLOSED = "SequenceClosed";
+		
+		String LAST_MESSAGE = "LastMessage";
+		
+
+		String REQUEST_SIDE_SEQUENCE_ID = "RequestSideSequenceID"; 		//used only at the server side
+		
+		String HIGHEST_MSG_NO = "HighestMessageNumber";
 	}
 
 	public interface SOAPVersion {
@@ -291,6 +326,8 @@
 		public interface Subcodes {
 
 			String SEQUENCE_TERMINATED = "wsrm:SequenceTerminated";
+			
+			String SEQUENCE_CLOSED = "wsrm:SequenceClosed";
 
 			String UNKNOWN_SEQUENCE = "wsrm:UnknownSequence";
 
@@ -301,6 +338,7 @@
 			String LAST_MESSAGE_NO_EXCEEDED = "wsrm:LastMessageNumberExceeded";
 
 			String CREATE_SEQUENCE_REFUSED = "wsrm:CreateSequenceRefused";
+			
 
 		}
 
@@ -313,7 +351,8 @@
 			public static final int INVALID_ACKNOWLEDGEMENT = 3;
 
 			public static final int CREATE_SEQUENCE_REFUSED = 4;
-
+			
+			public static final int LAST_MESSAGE_NO_EXCEEDED = 5;
 		}
 	}
 
@@ -335,6 +374,8 @@
 		
 		String MessageTypesToDrop = "MessageTypesToDrop";
 		
+		String RetransmissionCount = "RetransmissionCount";
+		
 		public interface DefaultValues {
 			
 			int RetransmissionInterval = 20000;
@@ -352,6 +393,8 @@
 			boolean InvokeInOrder = true;
 			
 			String MessageTypesToDrop=VALUE_NONE;
+			
+			int RetransmissionCount = 8;
 		}
 	}
 	

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java Mon Mar  6 18:39:01 2006
@@ -22,7 +22,6 @@
 import org.apache.axis2.description.AxisDescription;
 import org.apache.axis2.description.AxisModule;
 import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterImpl;
 import org.apache.axis2.description.PolicyInclude;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.modules.Module;
@@ -66,7 +65,7 @@
 		SandeshaPropertyBean defaultPropertyBean = PropertyManager.getInstance().getPropertyBean();
 		SandeshaPropertyBean axisDescPropertyBean = RMPolicyManager.loadPoliciesFromAxisDescription(axisDescription);
 		
-		Parameter parameter = new ParameterImpl ();
+		Parameter parameter = new Parameter ();
 		parameter.setName(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
 		if (axisDescPropertyBean==null) {
 			parameter.setValue(defaultPropertyBean);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java Mon Mar  6 18:39:01 2006
@@ -2,7 +2,16 @@
 
 public class SpecSpecificConstants {
 
-	private static String unknowsSpecErrorMessage = "Unknown specification version";
+	private static String unknownSpecErrorMessage = "Unknown specification version";
+	
+	public static String getSpecVersionString (String namespaceValue) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceValue))
+			return Sandesha2Constants.SPEC_VERSIONS.WSRM;
+		else if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceValue))
+			return Sandesha2Constants.SPEC_VERSIONS.WSRX;
+		else
+			throw new SandeshaException ("Unknows rm namespace value");
+	}
 	
 	public static String getRMNamespaceValue (String specVersion) throws SandeshaException {
 		if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion)) 
@@ -10,7 +19,7 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.NS_URI;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static String getCreateSequenceAction (String specVersion) throws SandeshaException {
@@ -19,7 +28,7 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CREATE_SEQUENCE;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static String getCreateSequenceResponseAction (String specVersion) throws SandeshaException {
@@ -28,7 +37,7 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CREATE_SEQUENCE_RESPONSE;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static String getTerminateSequenceAction (String specVersion) throws SandeshaException {
@@ -37,7 +46,25 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_TERMINATE_SEQUENCE;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getCloseSequenceAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion)) 
+			throw new SandeshaException ("This rm spec version does not define a sequenceClose action");
+		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CLOSE_SEQUENCE;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getAckRequestAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion)) 
+			throw new SandeshaException ("this spec version does not define a ackRequest action");
+		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_ACK_REQUEST;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static String getSequenceAcknowledgementAction (String specVersion) throws SandeshaException {
@@ -46,7 +73,7 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static String getCreateSequenceSOAPAction (String specVersion) throws SandeshaException {
@@ -55,7 +82,7 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_CREATE_SEQUENCE;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static String getCreateSequenceResponseSOAPAction (String specVersion) throws SandeshaException {
@@ -64,7 +91,7 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static String getTerminateSequenceSOAPAction (String specVersion) throws SandeshaException {
@@ -73,7 +100,16 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_TERMINATE_SEQUENCE;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getAckRequestSOAPAction (String specVersion) throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion)) 
+			throw new SandeshaException ("this spec version does not define a ackRequest SOAP action");
+		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
+			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_ACK_REQUEST;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static String getSequenceAcknowledgementSOAPAction (String specVersion) throws SandeshaException {
@@ -82,7 +118,7 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_SEQUENCE_ACKNOWLEDGEMENT;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
 	}
 	
 	public static boolean isTerminateSequenceResponseRequired (String specVersion)  throws SandeshaException {
@@ -91,6 +127,46 @@
 		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
 			return true;
 		else 
-			throw new SandeshaException (unknowsSpecErrorMessage);
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isLastMessageIndicatorRequired (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion)) 
+			return true;
+		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
+			return false;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isAckFinalAllowed (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion)) 
+			return false;
+		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
+			return true;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isAckNoneAllowed (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion)) 
+			return false;
+		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
+			return true;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static boolean isSequenceClosingAllowed (String specVersion)  throws SandeshaException {
+		if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion)) 
+			return false;
+		else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion)) 
+			return true;
+		else 
+			throw new SandeshaException (unknownSpecErrorMessage);
+	}
+	
+	public static String getDefaultSpecVersion () {
+		return Sandesha2Constants.SPEC_VERSIONS.WSRM;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Mon Mar  6 18:39:01 2006
@@ -24,9 +24,16 @@
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.client.Sandesha2ClientAPI;
+import org.apache.sandesha2.client.SequenceReport;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
@@ -39,7 +46,9 @@
 import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
 import org.apache.sandesha2.util.PropertyManager;
+import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaPropertyBean;
 import org.apache.sandesha2.util.SandeshaUtil;
 
@@ -287,4 +296,7 @@
 		
 		return deleatable;
 	}
+	
+
+	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java Mon Mar  6 18:39:01 2006
@@ -19,19 +19,36 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.ServiceClient;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.SpecSpecificConstants;
+import org.apache.sandesha2.TerminateManager;
+import org.apache.sandesha2.msgprocessors.AcknowledgementProcessor;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
 
+import com.sun.corba.se.internal.core.ServiceContext;
+
 /**
  * Contains all the Sandesha2Constants of Sandesha2.
  * Please see sub-interfaces to see grouped data.
@@ -50,11 +67,12 @@
 	public static String SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";
 	public static String MESSAGE_NUMBER = "Sandesha2ClientAPIPropertyMessageNumber";
 	public static String RM_SPEC_VERSION = "Sandesha2ClientAPIPropertyRMSpecVersion";
+	public static String DUMMY_MESSAGE = "Sandesha2ClientAPIDummyMessage"; //If this property is set, even though this message will invoke the RM handlers, this will not be sent as an actual application message
+	public static String VALUE_TRUE = "true";
+	public static String VALUE_FALSE = "false";
 	
-	public static SequenceReport getOutgoingSequenceReport (String to, String sequenceKey,ConfigurationContext configurationContext) throws SandeshaException {
-		
-		String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
-		SequenceReport sequenceReport = new SequenceReport ();
+	public static SequenceReport getOutgoingSequenceReport (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
+	SequenceReport sequenceReport = new SequenceReport ();
 		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
 		SequencePropertyBeanMgr seqpPropMgr = storageManager.getSequencePropretyBeanMgr();
@@ -91,7 +109,14 @@
 		
 		reportTransaction.commit();
 		
-		return sequenceReport;
+		return sequenceReport;	
+	}
+	
+	public static SequenceReport getOutgoingSequenceReport (String to, String sequenceKey,ConfigurationContext configurationContext) throws SandeshaException {
+		
+		String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
+		return getOutgoingSequenceReport(internalSequenceID,configurationContext);
+	
 	}
 	
 	public static SequenceReport getIncomingSequenceReport (String sequenceID,ConfigurationContext configurationContext) throws SandeshaException {
@@ -121,11 +146,5 @@
 		return rmReport;
 	}
 	
-	public static String generateInternalSequenceIDForTheClientSide (String toEPR,String sequenceKey) {
-		return SandeshaUtil.getInternalSequenceID(toEPR,sequenceKey);
-	}
-	
-	public static void endSequence (String internalSequenceID, ConfigurationContext configurationContext) {
-		
-	}
+
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Mon Mar  6 18:39:01 2006
@@ -23,8 +23,10 @@
 import javax.xml.namespace.QName;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.handlers.AbstractHandler;
 import org.apache.ws.commons.soap.SOAPBody;
@@ -61,18 +63,6 @@
 		if (!isRMGlobalMessage) {
 			return;
 		}
-		
-
-		FaultManager faultManager = new FaultManager();
-		RMMsgContext faultMessageContext = faultManager
-				.checkForPossibleFaults(msgContext);
-		if (faultMessageContext != null) {
-			ConfigurationContext configurationContext = msgContext
-					.getConfigurationContext();
-			AxisEngine engine = new AxisEngine(configurationContext);
-			engine.send(faultMessageContext.getMessageContext());
-			return;
-		}
 
 		RMMsgContext rmMessageContext = MsgInitializer
 				.initializeMessage(msgContext);
@@ -179,6 +169,23 @@
 
 						}
 					}
+				}
+			}
+		} else if (rmMsgContext.getMessageType()!=Sandesha2Constants.MessageTypes.UNKNOWN) {
+			//droping other known message types if, an suitable operation context is not available,
+			//and if a relates to value is present.
+			RelatesTo relatesTo = rmMsgContext.getRelatesTo();
+			if (relatesTo!=null) {
+				String value = relatesTo.getValue();
+				
+				//TODO do not drop, relationshipTypes other than reply
+				
+				ConfigurationContext configurationContext = rmMsgContext.getMessageContext().getConfigurationContext();
+				OperationContext opCtx = configurationContext.getOperationContext(value);
+				if (opCtx==null) {
+					String message = "Dropping duplicate RM message";
+					log.debug(message);
+					drop=true;
 				}
 			}
 		}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Mon Mar  6 18:39:01 2006
@@ -34,6 +34,7 @@
 import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
 import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
 
 /**
  * This is invoked in the inFlow of an RM endpoint. This is responsible for selecting an suitable
@@ -64,15 +65,6 @@
 		if (null != DONE && "true".equals(DONE))
 			return;
 
-		FaultManager faultManager = new FaultManager();
-		RMMsgContext faultMessageContext = faultManager
-				.checkForPossibleFaults(msgCtx);
-		if (faultMessageContext != null) {
-			AxisEngine engine = new AxisEngine(context);
-			engine.send(faultMessageContext.getMessageContext());
-			return;
-		}
-
 		AxisService axisService = msgCtx.getAxisService();
 		if (axisService == null) {
 			String message = "AxisService is null";
@@ -89,6 +81,9 @@
 			throw new AxisFault(message);
 		}
 
+		if (rmMsgCtx.getMessageContext().getAxisOperation().getParent()==null) {
+			System.out.println("Operation parent was null for message:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
+		}
 		MsgProcessor msgProcessor = MsgProcessorFactory
 				.getMessageProcessor(rmMsgCtx.getMessageType());
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Mar  6 18:39:01 2006
@@ -30,7 +30,6 @@
 import org.apache.axis2.description.AxisOperationFactory;
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterImpl;
 import org.apache.axis2.description.TransportOutDescription;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.handlers.AbstractHandler;
@@ -106,7 +105,12 @@
 
 		msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
 				"true");
-
+		
+		String dummyMessageString = (String) msgCtx.getOptions().getProperty(Sandesha2ClientAPI.DUMMY_MESSAGE);
+		boolean dummyMessage = false;
+		if (dummyMessageString!=null && Sandesha2ClientAPI.VALUE_TRUE.equals(dummyMessageString))
+			dummyMessage = true;
+		
 		StorageManager storageManager = SandeshaUtil
 				.getSandeshaStorageManager(context);
 
@@ -123,7 +127,7 @@
 		if (policyParam == null) {
 			SandeshaPropertyBean propertyBean = PropertyManager.getInstance()
 					.getPropertyBean();
-			Parameter parameter = new ParameterImpl();
+			Parameter parameter = new Parameter();
 			parameter.setName(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
 			parameter.setValue(propertyBean);
 
@@ -233,7 +237,8 @@
 		}
 		
 		//saving the used message number
-		setNextMsgNo(context,internalSequenceId,messageNumber);
+		if (!dummyMessage)
+			setNextMsgNo(context,internalSequenceId,messageNumber);
 		
 		boolean sendCreateSequence = false;
 
@@ -419,7 +424,8 @@
 		}
 		
 		// processing the response
-		processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
+		if (!dummyMessage)
+			processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
 		
 		msgCtx.pause();  // the execution will be stopped.
 		outHandlerTransaction.commit();		
@@ -659,6 +665,11 @@
 				throw new SandeshaException(message);
 			}
 
+			
+			//TODO check for highest msg no.
+			long requestMsgNo = requestSequence.getMessageNumber().getMessageNumber();
+			
+			
 			if (requestSequence.getLastMessage() != null) {
 				lastMessage = true;
 				sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
@@ -679,7 +690,15 @@
 				Object obj = msg.getProperty(Sandesha2ClientAPI.LAST_MESSAGE);
 				if (obj != null && "true".equals(obj)) {
 					lastMessage = true;
-					sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
+					
+					SequencePropertyBean specVersionBean = sequencePropertyMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+					if (specVersionBean==null)
+						throw new SandeshaException ("Spec version bean is not set");
+					String specVersion = specVersionBean.getValue();
+					
+					if (SpecSpecificConstants.isLastMessageIndicatorRequired(specVersion))
+						sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
+					
 					// saving the last message no.
 					SequencePropertyBean lastOutMsgBean = new SequencePropertyBean(
 							internalSequenceId,
@@ -693,8 +712,8 @@
 		AckRequested ackRequested = null;
 
 		boolean addAckRequested = false;
-		if (!lastMessage)
-			addAckRequested = true;
+		//if (!lastMessage)
+		addAckRequested = true;   //TODO decide the policy to add the ackRequested tag
 
 		// setting the Sequnece id.
 		// Set send = true/false depending on the availability of the out
@@ -730,6 +749,7 @@
 		//Retransmitter bean entry for the application message
 		SenderBean appMsgEntry = new SenderBean();
 		String storageKey = SandeshaUtil.getUUID();
+		
 		appMsgEntry.setMessageContextRefKey(storageKey);
 
 		appMsgEntry.setTimeToSend(System.currentTimeMillis());

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=383751&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Mar  6 18:39:01 2006
@@ -0,0 +1,268 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisOperationFactory;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.PropertyManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaPropertyBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
+
+/**
+ * Responsible for processing an incoming Application message.
+ * 
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+public class AckRequestedProcessor implements MsgProcessor {
+
+	private Log log = LogFactory.getLog(getClass());
+	
+	public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+		
+		
+		AckRequested ackRequested = (AckRequested) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
+		if (ackRequested==null) {
+			throw new SandeshaException ("Message identified as of type ackRequested does not have an AckRequeted element");
+		}
+		
+		MessageContext msgContext = rmMsgCtx.getMessageContext();
+		
+		String sequenceID = ackRequested.getIdentifier().getIdentifier();
+		
+		ConfigurationContext configurationContext = rmMsgCtx.getMessageContext().getConfigurationContext();
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		//Setting the ack depending on AcksTo.
+		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceID,
+				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+		EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
+		String acksToStr = acksTo.getAddress();
+
+		if (acksToStr == null)
+			throw new SandeshaException(
+					"acksToStr Seqeunce property is not set correctly");
+		
+		AxisOperation ackOperation = null;
+
+		try {
+			ackOperation = AxisOperationFactory.getOperationDescription(AxisOperationFactory.MEP_URI_IN_ONLY);
+		} catch (AxisFault e) {
+			throw new SandeshaException("Could not create the Operation");
+		}
+
+		AxisOperation rmMsgOperation = rmMsgCtx.getMessageContext()
+				.getAxisOperation();
+		if (rmMsgOperation != null) {
+			ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
+			if (outFlow != null) {
+				ackOperation.setPhasesOutFlow(outFlow);
+				ackOperation.setPhasesOutFaultFlow(outFlow);
+			}
+		}
+
+		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(
+				rmMsgCtx, ackOperation);
+		
+		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+		
+		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+		ackRMMsgCtx.setRMNamespaceValue(rmMsgCtx.getRMNamespaceValue());
+		
+		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+				.getSOAPVersion(msgContext.getEnvelope()));
+		
+		//Setting new envelope
+		SOAPEnvelope envelope = factory.getDefaultEnvelope();
+		try {
+			ackMsgCtx.setEnvelope(envelope);
+		} catch (AxisFault e3) {
+			throw new SandeshaException(e3.getMessage());
+		}
+
+		ackMsgCtx.setTo(acksTo);
+		ackMsgCtx.setReplyTo(msgContext.getTo());
+		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceID);
+
+		if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
+
+			AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
+					.getConfigurationContext());
+
+			//setting CONTEXT_WRITTEN since acksto is anonymous
+			if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
+				//operation context will be null when doing in a GLOBAL
+				// handler.
+				try {
+					AxisOperation op = AxisOperationFactory
+							.getAxisOperation(AxisOperationFactory.MEP_CONSTANT_IN_OUT);
+					OperationContext opCtx = new OperationContext(op);
+					rmMsgCtx.getMessageContext().setAxisOperation(op);
+					rmMsgCtx.getMessageContext().setOperationContext(opCtx);
+				} catch (AxisFault e2) {
+					throw new SandeshaException(e2.getMessage());
+				}
+			}
+
+			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+					org.apache.axis2.Constants.RESPONSE_WRITTEN,
+					Constants.VALUE_TRUE);
+
+			rmMsgCtx.getMessageContext().setProperty(
+					Sandesha2Constants.ACK_WRITTEN, "true");
+			try {
+				engine.send(ackRMMsgCtx.getMessageContext());
+			} catch (AxisFault e1) {
+				throw new SandeshaException(e1.getMessage());
+			}
+		} else {
+
+			Transaction asyncAckTransaction = storageManager.getTransaction();
+
+			SenderBeanMgr retransmitterBeanMgr = storageManager
+					.getRetransmitterBeanMgr();
+
+			String key = SandeshaUtil.getUUID();
+			
+			//dumping to the storage will be done be Sandesha2 Transport Sender
+			//storageManager.storeMessageContext(key,ackMsgCtx);
+			
+			SenderBean ackBean = new SenderBean();
+			ackBean.setMessageContextRefKey(key);
+			ackBean.setMessageID(ackMsgCtx.getMessageID());
+			ackBean.setReSend(false);
+			
+			//this will be set to true in the sender.
+			ackBean.setSend(true);
+			
+			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+					Sandesha2Constants.VALUE_FALSE);
+			
+			ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+			
+			//the internalSequenceId value of the retransmitter Table for the
+			// messages related to an incoming
+			//sequence is the actual sequence ID
+
+//			RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
+//					.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
+		
+//			long ackInterval = PropertyManager.getInstance()
+//					.getAcknowledgementInterval();
+			
+			Parameter param = msgContext.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
+			
+			SandeshaPropertyBean propertyBean = null;
+			if (param!=null) {
+				propertyBean = (SandeshaPropertyBean)  param.getValue();
+			}else {
+				propertyBean = PropertyManager.getInstance().getPropertyBean();
+			}
+			
+			
+			long ackInterval = propertyBean.getAcknowledgementInaterval();
+			
+			//			if (policyBean != null) {
+//				ackInterval = policyBean.getAcknowledgementInaterval();
+//			}
+			
+			//Ack will be sent as stand alone, only after the retransmitter
+			// interval.
+			long timeToSend = System.currentTimeMillis() + ackInterval;
+
+			//removing old acks.
+			SenderBean findBean = new SenderBean();
+			findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+			
+			//this will be set to true in the sandesha2TransportSender.
+			findBean.setSend(true);
+			findBean.setReSend(false);
+			Collection coll = retransmitterBeanMgr.find(findBean);
+			Iterator it = coll.iterator();
+
+			if (it.hasNext()) {
+				SenderBean oldAckBean = (SenderBean) it.next();
+				timeToSend = oldAckBean.getTimeToSend();		//If there is an old ack. This ack will be sent in the old timeToSend.
+				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+			}
+			
+			ackBean.setTimeToSend(timeToSend);
+
+			storageManager.storeMessageContext(key,ackMsgCtx);
+			
+			//inserting the new ack.
+			retransmitterBeanMgr.insert(ackBean);
+
+			asyncAckTransaction.commit();
+
+			//passing the message through sandesha2sender
+
+			ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
+			ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+			
+			ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+			
+			ackMsgCtx.setTransportOut(new Sandesha2TransportOutDesc ());
+			
+			AxisEngine engine = new AxisEngine (configurationContext);
+			try {
+				engine.send(ackMsgCtx);
+			} catch (AxisFault e) {
+				throw new SandeshaException (e.getMessage());
+			}
+			
+			SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
+			
+			msgContext.pause(); 
+		}
+	}
+	
+}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Mar  6 18:39:01 2006
@@ -44,6 +44,7 @@
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
 import org.apache.sandesha2.transport.Sandesha2TransportSender;
+import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
@@ -71,6 +72,8 @@
 			throw new SandeshaException(message);
 		}
 		
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+		
 		AbstractContext context = rmMsgCtx.getContext();
 		if (context == null) {
 			String message = "Context is null";
@@ -103,6 +106,35 @@
 			throw new SandeshaException(message);
 		}
 
+		FaultManager faultManager = new FaultManager();
+		RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,outSequenceId);
+		if (faultMessageContext != null) {
+			ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+			AxisEngine engine = new AxisEngine(configurationContext);
+			
+			try {
+				engine.sendFault(faultMessageContext.getMessageContext());
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not send the fault message",e);
+			}
+			
+			return;
+		}
+		
+		faultMessageContext = faultManager.checkForInvalidAcknowledgement(rmMsgCtx);
+		if (faultMessageContext != null) {
+			ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+			AxisEngine engine = new AxisEngine(configurationContext);
+			
+			try {
+				engine.sendFault(faultMessageContext.getMessageContext());
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not send the fault message",e);
+			}
+			
+			return;
+		}
+		
 		//updating the last activated time of the sequence.
 //		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
 //		SequenceManager.updateLastActivatedTime(outSequenceId,rmMsgCtx.getMessageContext().getConfigurationContext());

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Mar  6 18:39:01 2006
@@ -33,6 +33,7 @@
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.AcknowledgementManager;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
@@ -48,6 +49,7 @@
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.PropertyManager;
 import org.apache.sandesha2.util.RMMsgCreator;
@@ -83,6 +85,9 @@
 			AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
 			ackProcessor.processMessage(rmMsgCtx);
 		}
+		
+		//TODO process embedded ack requests
+		
 
 		//Processing the application message.
 		MessageContext msgCtx = rmMsgCtx.getMessageContext();
@@ -100,7 +105,7 @@
 			return;
 		}
 
-		//RM will not rend sync responses. If sync acks are there this will be
+		//RM will not send sync responses. If sync acks are there this will be
 		// made true again later.
 		if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
 			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
@@ -113,9 +118,24 @@
 
 
 
+		FaultManager faultManager = new FaultManager();
+		RMMsgContext faultMessageContext = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx);
+		if (faultMessageContext != null) {
+			ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+			AxisEngine engine = new AxisEngine(configurationContext);
+			
+			try {
+				engine.sendFault(faultMessageContext.getMessageContext());
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not send the fault message",e);
+			}
+			
+			return;
+		}
+		
 		SequencePropertyBeanMgr seqPropMgr = storageManager
 				.getSequencePropretyBeanMgr();
-
+		
 		//setting acked msg no range
 		Sequence sequence = (Sequence) rmMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -128,10 +148,43 @@
 			throw new SandeshaException(message);
 		}
 
+		faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,sequenceId);
+		if (faultMessageContext != null) {
+			ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+			AxisEngine engine = new AxisEngine(configurationContext);
+			
+			try {
+				engine.send(faultMessageContext.getMessageContext());
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not send the fault message",e);
+			}
+			
+			return;
+		}
+		
+		
 		//setting mustUnderstand to false.
 		sequence.setMustUnderstand(false);
 		rmMsgCtx.addSOAPEnvelope();
 		
+		
+		//throwing a fault if the sequence is closed.
+		faultMessageContext = faultManager. checkForSequenceClosed(rmMsgCtx,sequenceId);
+		if (faultMessageContext != null) {
+			ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+			AxisEngine engine = new AxisEngine(configurationContext);
+			
+			try {
+				engine.sendFault(faultMessageContext.getMessageContext());
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not send the fault message",e);
+			}
+			
+			return;
+		}
+
+			
+		
 		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
 		
 		//updating the last activated time of the sequence.
@@ -306,197 +359,25 @@
 		
 		LastMessage lastMessage = (LastMessage) sequence.getLastMessage();
 
-		//Setting the ack depending on AcksTo.
-		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceId,
-				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
-
-		EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
-		String acksToStr = acksTo.getAddress();
-
-		if (acksToStr == null || messagesStr == null)
-			throw new SandeshaException(
-					"Seqeunce properties are not set correctly");
-
-		if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
-			// send ack in the sync case, only if the last message or the
-			// ackRequested tag is present.
-			boolean ackRequired = false;
-			if (ackRequested != null || lastMessage != null)
-				ackRequired = true;
-
-			if (!ackRequired) {
-				return;
-			}
-		}
-		AxisOperation ackOperation = null;
-
-		try {
-			ackOperation = AxisOperationFactory.getOperationDescription(AxisOperationFactory.MEP_URI_IN_ONLY);
+		if (lastMessage!=null) {
+			long messageNumber = sequence.getMessageNumber().getMessageNumber();
+			SequencePropertyBean lastMessageBean = new SequencePropertyBean ();
+			lastMessageBean.setSequenceID(sequenceId);
+			lastMessageBean.setName(Sandesha2Constants.SequenceProperties.LAST_MESSAGE);
+			lastMessageBean.setValue(new Long(messageNumber).toString());
+			
+			seqPropMgr.insert(lastMessageBean);
+		}
+		
+	 	RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(rmMsgCtx,sequenceId);
+		
+	 	AxisEngine engine = new AxisEngine (configCtx);
+	 	
+	 	try {
+			engine.send(ackRMMessage.getMessageContext());
 		} catch (AxisFault e) {
-			throw new SandeshaException("Could not create the Operation");
-		}
-
-		AxisOperation rmMsgOperation = rmMsgCtx.getMessageContext()
-				.getAxisOperation();
-		if (rmMsgOperation != null) {
-			ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
-			if (outFlow != null) {
-				ackOperation.setPhasesOutFlow(outFlow);
-				ackOperation.setPhasesOutFaultFlow(outFlow);
-			}
-		}
-
-		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(
-				rmMsgCtx, ackOperation);
-		
-		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-		
-		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
-		ackRMMsgCtx.setRMNamespaceValue(rmMsgCtx.getRMNamespaceValue());
-		
-		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
-
-		//Setting new envelope
-		SOAPEnvelope envelope = factory.getDefaultEnvelope();
-		try {
-			ackMsgCtx.setEnvelope(envelope);
-		} catch (AxisFault e3) {
-			throw new SandeshaException(e3.getMessage());
-		}
-
-		ackMsgCtx.setTo(acksTo);
-		ackMsgCtx.setReplyTo(msgCtx.getTo());
-		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId);
-
-		if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
-
-			AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
-					.getConfigurationContext());
-
-			//setting CONTEXT_WRITTEN since acksto is anonymous
-			if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
-				//operation context will be null when doing in a GLOBAL
-				// handler.
-				try {
-					AxisOperation op = AxisOperationFactory
-							.getAxisOperation(AxisOperationFactory.MEP_CONSTANT_IN_OUT);
-					OperationContext opCtx = new OperationContext(op);
-					rmMsgCtx.getMessageContext().setAxisOperation(op);
-					rmMsgCtx.getMessageContext().setOperationContext(opCtx);
-				} catch (AxisFault e2) {
-					throw new SandeshaException(e2.getMessage());
-				}
-			}
-
-			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
-					org.apache.axis2.Constants.RESPONSE_WRITTEN,
-					Constants.VALUE_TRUE);
-
-			rmMsgCtx.getMessageContext().setProperty(
-					Sandesha2Constants.ACK_WRITTEN, "true");
-			try {
-				engine.send(ackRMMsgCtx.getMessageContext());
-			} catch (AxisFault e1) {
-				throw new SandeshaException(e1.getMessage());
-			}
-		} else {
-
-			Transaction asyncAckTransaction = storageManager.getTransaction();
-
-			SenderBeanMgr retransmitterBeanMgr = storageManager
-					.getRetransmitterBeanMgr();
-
-			String key = SandeshaUtil.getUUID();
-			
-			//dumping to the storage will be done be Sandesha2 Transport Sender
-			//storageManager.storeMessageContext(key,ackMsgCtx);
-			
-			SenderBean ackBean = new SenderBean();
-			ackBean.setMessageContextRefKey(key);
-			ackBean.setMessageID(ackMsgCtx.getMessageID());
-			ackBean.setReSend(false);
-			
-			//this will be set to true in the sender.
-			ackBean.setSend(true);
-			
-			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
-					Sandesha2Constants.VALUE_FALSE);
-			
-			ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-			
-			//the internalSequenceId value of the retransmitter Table for the
-			// messages related to an incoming
-			//sequence is the actual sequence ID
-
-//			RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
-//					.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
-		
-//			long ackInterval = PropertyManager.getInstance()
-//					.getAcknowledgementInterval();
-			
-			Parameter param = msgCtx.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
-			
-			SandeshaPropertyBean propertyBean = null;
-			if (param!=null) {
-				propertyBean = (SandeshaPropertyBean)  param.getValue();
-			}else {
-				propertyBean = PropertyManager.getInstance().getPropertyBean();
-			}
-			
-			
-			long ackInterval = propertyBean.getAcknowledgementInaterval();
-			
-			//			if (policyBean != null) {
-//				ackInterval = policyBean.getAcknowledgementInaterval();
-//			}
-			
-			//Ack will be sent as stand alone, only after the retransmitter
-			// interval.
-			long timeToSend = System.currentTimeMillis() + ackInterval;
-
-			//removing old acks.
-			SenderBean findBean = new SenderBean();
-			findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-			
-			//this will be set to true in the sandesha2TransportSender.
-			findBean.setSend(true);
-			findBean.setReSend(false);
-			Collection coll = retransmitterBeanMgr.find(findBean);
-			Iterator it = coll.iterator();
-
-			if (it.hasNext()) {
-				SenderBean oldAckBean = (SenderBean) it.next();
-				timeToSend = oldAckBean.getTimeToSend();		//If there is an old ack. This ack will be sent in the old timeToSend.
-				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
-			}
-			
-			ackBean.setTimeToSend(timeToSend);
-
-			storageManager.storeMessageContext(key,ackMsgCtx);
-			
-			//inserting the new ack.
-			retransmitterBeanMgr.insert(ackBean);
-
-			asyncAckTransaction.commit();
-
-			//passing the message through sandesha2sender
-
-			ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
-			ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
-			
-			ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
-			
-			ackMsgCtx.setTransportOut(new Sandesha2TransportOutDesc ());
-			
-			AxisEngine engine = new AxisEngine (configCtx);
-			try {
-				engine.send(ackMsgCtx);
-			} catch (AxisFault e) {
-				throw new SandeshaException (e.getMessage());
-			}
-			
-			SandeshaUtil.startSenderForTheSequence(configCtx,sequenceId);
+			String message = "Exception thrown while trying to send the ack message";
+			throw new SandeshaException (message,e);
 		}
-
 	}
 }

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=383751&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon Mar  6 18:39:01 2006
@@ -0,0 +1,121 @@
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.Utils;
+import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.CloseSequenceResponse;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
+
+public class CloseSequenceProcessor implements MsgProcessor {
+
+	public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+		CloseSequence closeSequence = (CloseSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+		
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+		
+		String sequenceID = closeSequence.getIdentifier().getIdentifier();
+		
+		FaultManager faultManager = new FaultManager();
+		RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,sequenceID);
+		if (faultMessageContext != null) {
+			ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+			AxisEngine engine = new AxisEngine(configurationContext);
+			
+			try {
+				engine.sendFault(faultMessageContext.getMessageContext());
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not send the fault message",e);
+			}
+			
+			return;
+		}
+		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx);
+		
+		Transaction closeSequenceTransaction = storageManager.getTransaction();
+		
+		SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropretyBeanMgr();
+		SequencePropertyBean sequenceClosedBean = new SequencePropertyBean ();
+		sequenceClosedBean.setSequenceID(sequenceID);
+		sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED);
+		sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
+		
+		sequencePropMgr.insert(sequenceClosedBean);
+		
+		RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx,sequenceID);
+		
+		MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+		
+		String rmNamespaceValue = rmMsgCtx.getRMNamespaceValue();
+		ackRMMsgCtx.setRMNamespaceValue(rmNamespaceValue);
+		
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+				.getSOAPVersion(rmMsgCtx.getSOAPEnvelope()));
+		
+		//Setting new envelope
+		SOAPEnvelope envelope = factory.getDefaultEnvelope();
+		try {
+			ackMsgCtx.setEnvelope(envelope);
+		} catch (AxisFault e3) {
+			throw new SandeshaException(e3.getMessage());
+		}
+		
+		//adding the ack part to the envelope.
+		SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) ackRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+	
+		MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
+		
+		MessageContext closeSequenceResponseMsg = null;
+		closeSequenceResponseMsg = Utils.createOutMessageContext(closeSequenceMsg);
+		
+		RMMsgContext closeSeqResponseRMMsg = RMMsgCreator
+				.createCloseSeqResponseMsg(rmMsgCtx, closeSequenceResponseMsg);
+		
+		closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,sequenceAcknowledgement); 
+		
+		closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+		closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+		closeSequenceResponseMsg.setResponseWritten(true);
+		
+		closeSeqResponseRMMsg.addSOAPEnvelope();
+		
+		AxisEngine engine = new AxisEngine (closeSequenceMsg.getConfigurationContext());
+		
+		try {
+			engine.send(closeSequenceResponseMsg);
+		} catch (AxisFault e) {
+			String message = "Could not send the terminate sequence response";
+			throw new SandeshaException (message,e);
+		}
+		
+		
+		closeSequenceTransaction.commit();
+	}
+	
+
+
+	
+	
+}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Mon Mar  6 18:39:01 2006
@@ -34,6 +34,7 @@
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
@@ -64,6 +65,21 @@
 			throw new SandeshaException(message);
 		}
 
+		FaultManager faultManager = new FaultManager();
+		RMMsgContext faultMessageContext = faultManager.checkForCreateSequenceRefused(createSeqMsg);
+		if (faultMessageContext != null) {
+			ConfigurationContext configurationContext = createSeqMsg.getConfigurationContext();
+			AxisEngine engine = new AxisEngine(configurationContext);
+			
+			try {
+				engine.sendFault(faultMessageContext.getMessageContext());
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not send the fault message",e);
+			}
+			
+			return;
+		}
+		
 		MessageContext outMessage = null;
 		outMessage = Utils.createOutMessageContext(createSeqMsg);
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Mon Mar  6 18:39:01 2006
@@ -64,7 +64,7 @@
 	
 	public void processMessage(RMMsgContext createSeqResponseRMMsgCtx)
 			throws SandeshaException {
-
+		
 		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
 				.getSOAPVersion(createSeqResponseRMMsgCtx.getSOAPEnvelope()));
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java Mon Mar  6 18:39:01 2006
@@ -41,6 +41,10 @@
 			return new CreateSeqResponseMsgProcessor();
 		case (Sandesha2Constants.MessageTypes.ACK):
 			return new AcknowledgementProcessor();
+		case (Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE):
+			return new CloseSequenceProcessor ();
+		case (Sandesha2Constants.MessageTypes.ACK_REQUEST):
+			return new AckRequestedProcessor ();
 		default:
 			return null;
 		}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Mar  6 18:39:01 2006
@@ -33,6 +33,7 @@
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
@@ -76,6 +77,21 @@
 			String message = "Invalid sequence id";
 			log.debug(message);
 			throw new SandeshaException (message);
+		}
+		
+		FaultManager faultManager = new FaultManager();
+		RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(terminateSeqRMMsg,sequenceId);
+		if (faultMessageContext != null) {
+			ConfigurationContext configurationContext = terminateSeqMsg.getConfigurationContext();
+			AxisEngine engine = new AxisEngine(configurationContext);
+			
+			try {
+				engine.sendFault(faultMessageContext.getMessageContext());
+			} catch (AxisFault e) {
+				throw new SandeshaException ("Could not send the fault message",e);
+			}
+			
+			return;
 		}
 		
 		ConfigurationContext context = terminateSeqMsg.getConfigurationContext();



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org