You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/01/18 07:13:55 UTC

svn commit: r370065 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/ handlers/ msgprocessors/ storage/beanmanagers/ storage/inmemory/ util/ workers/

Author: chamikara
Date: Tue Jan 17 22:06:44 2006
New Revision: 370065

URL: http://svn.apache.org/viewcvs?rev=370065&view=rev
Log:
Additions to the RMReport functionality.
Logic to start and end Sender eligently.
Logic to start and end Invoker eligently.
Logic to clean storage correctly in termination.
Added the findUnique method to Bean Managers.
Bug fixes.

Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/SequenceReport.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/InvokerBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Tue Jan 17 22:06:44 2006
@@ -17,6 +17,7 @@
 
 package org.apache.sandesha2;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
@@ -125,5 +126,34 @@
 				applicationRMMsgContext.addSOAPEnvelope();
 			}
 		}
+	}
+	
+	/**this is used to get the acked messages of a sequence. If this is an outgoing message the sequenceIdentifier should
+	 * be the internal sequenceID.
+	 * 
+	 * @param sequenceIdentifier
+	 * @param outGoingMessage
+	 * @return
+	 */
+	public static ArrayList getCompletedMessagesList (String sequenceIdentifier,ConfigurationContext configurationContext) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		SequencePropertyBean completedMessagesBean = sequencePropertyBeanMgr.retrieve(sequenceIdentifier,Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
+		
+		ArrayList completedMsgList = null;
+		if (completedMessagesBean!=null) {
+			completedMsgList = SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
+		}
+		
+		return completedMsgList;
+	}
+	
+	public static void sendSyncAck () {
+		
+	}
+	
+	public static void sendAsyncAck () {
+		
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Tue Jan 17 22:06:44 2006
@@ -29,6 +29,7 @@
 public interface Sandesha2Constants {
 
 	public interface WSRM {
+		
 		String NS_PREFIX_RM = "wsrm";
 
 		String NS_URI_RM = "http://schemas.xmlsoap.org/ws/2005/02/rm";
@@ -151,7 +152,11 @@
 											   // sequenceId to share data b/w
 											   // sequences
 
-		String RECEIVED_MESSAGES = "SeqMsgListProperty";
+		//For incoming sequences this gives the msg no's of the messages that were
+		//received (may be an ack was sent - depending on the policy)
+		//For out going sequences this gives the messages that were sent and that were successfully
+		//acked by the other end point.
+		String COMPLETED_MESSAGES = "CompletedMessages";
 
 		String TO_EPR = "ToEPR";
 
@@ -276,6 +281,8 @@
 		
 		String InOrderInvocation = "InvokeInOrder";
 		
+		String MessageTypesToDrop = "MessageTypesToDrop";
+		
 		public interface DefaultValues {
 			
 			int RetransmissionInterval = 20000;
@@ -291,6 +298,8 @@
 			String StorageManager = "org.apache.sandesha2.storage.inmemory.InMemoryStorageManager";
 		
 			boolean InvokeInOrder = true;
+			
+			String MessageTypesToDrop=VALUE_NONE;
 		}
 	}
 	
@@ -321,5 +330,9 @@
 	int MAXIMUM_RETRANSMISSION_ATTEMPTS = 5;
 	
 	String PROPERTY_FILE = "sandesha2.properties";
+	
+	String VALUE_NONE = "none";
+	
+	String SANDESHA2_INTERNAL_SEQUENCE_ID = "Sandesha2IntSeq";
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java Tue Jan 17 22:06:44 2006
@@ -35,7 +35,30 @@
 
 	// initialize the module
 	public void init(AxisConfiguration axisSystem) throws AxisFault {
-		cleanStorage (axisSystem);
+		ConfigurationContext configurationContext = new ConfigurationContext (axisSystem);
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		
+	//	cleanStorage (storageManager);
+		
+		ConfigurationContext configCtx = null;
+		//continueUncompletedSequences (storageManager,configCtx);
+	}
+	
+	private void continueUncompletedSequences (StorageManager storageManager,ConfigurationContext configCtx) {
+		//server side continues
+		//SandeshaUtil.startInvokerIfStopped(configCtx);
+		
+		//server side re-injections
+		
+		//reinject everything that has been acked within the in-handler but have not been invoked.
+		
+		
+		//client side continues
+		//SandeshaUtil.startSenderIfStopped(configCtx);
+		
+		//client side re-injections
+		
+		
 	}
 
 	// shutdown the module
@@ -43,12 +66,23 @@
 
 	}
 	
-	private void cleanStorage (AxisConfiguration axisSystem) throws AxisFault {
-		
-		ConfigurationContext configurationContext = new ConfigurationContext (axisSystem);
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+	//Removing data of uncontinuuable sequences so that the sandesha2 system will not be confused
+	private void cleanStorage (StorageManager storageManager) throws AxisFault {
 		
 		storageManager.initStorage();
+		
+		//server side cleaning
+		
+		//cleaning NextMsgData
+		//Cleaning InvokerData
+		
+		
+		//client side cleaning
+		
+		//cleaning RetransmitterData
+		//cleaning CreateSequenceData
+		
+		//cleaning sequence properties
 		
 	}
 	

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Tue Jan 17 22:06:44 2006
@@ -19,18 +19,27 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.client.ListenerManager;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContextConstants;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.PropertyManager;
 import org.apache.sandesha2.util.SandeshaUtil;
 
 /**
@@ -42,6 +51,10 @@
 
 public class TerminateManager {
 
+	private static String CLEANED_ON_TERMINATE_MSG = "CleanedOnTerminateMsg";
+	private static String CLEANED_AFTER_INVOCATION = "CleanedAfterInvocation";
+	
+	public static HashMap receivingSideCleanMap = new HashMap ();
 	/**
 	 * Called by the receiving side to remove data related to a sequence.
 	 * e.g. After sending the TerminateSequence message. Calling this methods will complete all
@@ -51,25 +64,23 @@
 	 * @param sequenceID
 	 * @throws SandeshaException
 	 */
-	public static void terminateReceivingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
-		/*StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
-		NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
-		
-		//removing nextMsgMgr entries
-		NextMsgBean findNextMsgBean = new NextMsgBean ();
-		findNextMsgBean.setSequenceID(sequenceID);
-		Collection collection = nextMsgBeanMgr.find(findNextMsgBean);
-		Iterator iterator = collection.iterator();
-		while (iterator.hasNext()) {
-			NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
-			nextMsgBeanMgr.delete(nextMsgBean.getSequenceID());
-		}
+	public static void cleanReceivingSideOnTerminateMessage (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+
+		//clean senderMap
 		
 		boolean inOrderInvocation = PropertyManager.getInstance().isInOrderInvocation();
 		if(!inOrderInvocation) { 
-			terminateAfterInvocation(configContext,sequenceID);
-		}*/
+			//there is no invoking by Sandesha2. So clean invocations storages.
+			cleanReceivingSideAfterInvocation(configContext,sequenceID);
+		}
 
+		String cleanStatus = (String) receivingSideCleanMap.get(sequenceID);
+		if (cleanStatus!=null && CLEANED_AFTER_INVOCATION.equals(cleanStatus))
+			completeTerminationOfReceivingSide(configContext,sequenceID);
+		else {
+			receivingSideCleanMap.put(sequenceID,CLEANED_ON_TERMINATE_MSG);
+		}
 	}
 	
 	/**
@@ -80,10 +91,10 @@
 	 * @param sequenceID
 	 * @throws SandeshaException
 	 */
-	public static void terminateAfterInvocation (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+	public static void cleanReceivingSideAfterInvocation (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
-
+				
 		//removing storageMap entries
 		InvokerBean findStorageMapBean = new InvokerBean ();
 		findStorageMapBean.setSequenceID(sequenceID);
@@ -95,10 +106,36 @@
 			storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
 		}
 		
-		removeReceivingSideProperties(configContext,sequenceID);
-
+		String cleanStatus = (String) receivingSideCleanMap.get(sequenceID);
+		if (cleanStatus!=null && CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
+			completeTerminationOfReceivingSide(configContext,sequenceID);
+		else {
+			receivingSideCleanMap.put(sequenceID,CLEANED_AFTER_INVOCATION);
+		}
 	}
 	
+	/**
+	 * This has to be called by the lastly invocated one of the above two methods.
+	 *
+	 */
+	private static void completeTerminationOfReceivingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+		InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
+		NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+		
+		//removing nextMsgMgr entries
+		NextMsgBean findNextMsgBean = new NextMsgBean ();
+		findNextMsgBean.setSequenceID(sequenceID);
+		Collection collection = nextMsgBeanMgr.find(findNextMsgBean);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
+			nextMsgBeanMgr.delete(nextMsgBean.getSequenceID());
+		}
+		
+		removeReceivingSideProperties(configContext,sequenceID);
+	}
+
 	private static void removeReceivingSideProperties (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
@@ -132,6 +169,10 @@
 	 */
 	public static void terminateSendingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+		
+		//TODO - remove folowing redundant transaction
+		Transaction terminateSendingTransaction = storageManager.getTransaction();
+		
 		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
 		CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
@@ -169,30 +210,50 @@
 		iterator = collection.iterator();
 		while (iterator.hasNext()) {
 			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+
+			doUpdatesIfNeeded (sequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
 			
 			if (isProportyDeletable(sequencePropertyBean.getName())) {
 				sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
 			}
-			
 		}
 		
-//		SequencePropertyBean findSequencePropertyBean2 = new SequencePropertyBean ();
-//		findSequencePropertyBean2.setSequenceID(internalSequenceId);
-//		collection = sequencePropertyBeanMgr.find(findSequencePropertyBean2);
-//		iterator = collection.iterator();
-//		while (iterator.hasNext()) {
-//			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
-//			sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
-//		}
+		terminateSendingTransaction.commit();
+		
+		//asking the listner to stop.
+		//if (clientSide)
+//			try {
+//				ListenerManager.stop(configContext,Constants.TRANSPORT_HTTP);
+//			} catch (AxisFault e) {
+//				throw new SandeshaException (e.getMessage());
+//			}
+		
+		SandeshaUtil.stopSenderForTheSequence(internalSequenceId);
+		
+	}
+	
+	private static void doUpdatesIfNeeded (String sequenceID, SequencePropertyBean propertyBean, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES)) {
+			
+			//this value cannot be completely deleted since this data will be needed by SequenceReports
+			//so saving it with the sequenceID value being the out sequenceID.
+			
+			SequencePropertyBean newBean = new SequencePropertyBean ();
+			newBean.setSequenceID(sequenceID);
+			newBean.setName(propertyBean.getName());
+			newBean.setValue(propertyBean.getValue());
+			
+			seqPropMgr.insert(newBean);
+			
+			//TODO amazingly this property does not seem to get deleted without following - in the hibernate impl 
+			//(even though the lines efter current methodcall do this).
+			seqPropMgr.delete (propertyBean.getSequenceID(),propertyBean.getName());			
+		}
 	}
 	
 	private static boolean isProportyDeletable (String name) {
 		boolean deleatable = true;
-		
-//		if (name.equals(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID)) {
-//			int i=1;
-//		}
-		
+				
 		if (Sandesha2Constants.SequenceProperties.TERMINATE_ADDED.equals(name))
 			deleatable = false;
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java Tue Jan 17 22:06:44 2006
@@ -16,8 +16,17 @@
 
 package org.apache.sandesha2.client;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
 
@@ -41,20 +50,56 @@
 		String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
 		SequenceReport sequenceReport = new SequenceReport ();
 		
-//		report.setAckedMessageCount(SequenceManager.getOutGoingSequenceAckedMessageCount (internalSequenceID,configurationContext));
-//		report.setSequenceCompleted(SequenceManager.isOutGoingSequenceCompleted (internalSequenceID,configurationContext));
-//		report.setOutGoingSequence(true);
-					
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		SequencePropertyBeanMgr seqpPropMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		Transaction reportTransaction = storageManager.getTransaction();
+		SequencePropertyBean findBean =  new SequencePropertyBean ();
+		findBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+		findBean.setValue(internalSequenceID);
+		SequencePropertyBean internalSequenceBean = seqpPropMgr.findUnique(findBean);
+		String sequenceID = internalSequenceBean.getSequenceID();
+		
+		//finding the actual seq
+		ArrayList completedMessageList =  AcknowledgementManager.getCompletedMessagesList (sequenceID,configurationContext);
+		
+		Iterator iter = completedMessageList.iterator();
+		while (iter.hasNext()) {
+			Long lng = new Long (Long.parseLong((String) iter.next()));
+			sequenceReport.addCompletedMessage(lng);
+		}
+		
+		sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_OUT);
+		boolean completed  = SequenceManager.isOutGoingSequenceCompleted(internalSequenceID,configurationContext);
+		if (completed)
+			sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_COMPLETED);
+		
+		//TODO complete
+		
+		
+		reportTransaction.commit();
+		
 		return sequenceReport;
 	}
 	
 	public static SequenceReport getIncomingSequenceReport (String sequenceID,ConfigurationContext configurationContext) throws SandeshaException {
 		
 		SequenceReport sequenceReport = new SequenceReport ();
+
+		ArrayList completedMessageList =  AcknowledgementManager.getCompletedMessagesList (sequenceID,configurationContext);
+
+		Iterator iter = completedMessageList.iterator();
+		while (iter.hasNext()) {
+			Long lng = new Long (Long.parseLong((String) iter.next()));
+			sequenceReport.addCompletedMessage(lng);
+		}
+		
+		sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_IN);
+		boolean completed  = SequenceManager.isIncomingSequenceCompleted(sequenceID,configurationContext);
+		if (completed)
+			sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_COMPLETED);
 		
-//		report.setOutGoingSequence(false);
-//		report.setAckedMessageCount(SequenceManager.getIncomingSequenceAckedMessageCount(sequenceID,configurationContext));
-//		report.setSequenceCompleted(SequenceManager.isIncomingSequenceCompleted(sequenceID,configurationContext));
+		//TODO complete
 		
 		return sequenceReport;
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/SequenceReport.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/SequenceReport.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/SequenceReport.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/SequenceReport.java Tue Jan 17 22:06:44 2006
@@ -25,8 +25,8 @@
 public class SequenceReport {
 
 	public static final byte SEQUENCE_STATUS_ACTIVE = 1;
-	public static final byte SEQUENCE_STATUS_TERMINATED = 2;
-	public static final byte SEQUENCE_STATUS_TIMEDOUT = 3;
+	public static final byte SEQUENCE_STATUS_COMPLETED = 2;
+	public static final byte SEQUENCE_STATUS_TIMED_OUT = 3;
 	
 	public static final byte SEQUENCE_DIRECTION_IN=1;
 	public static final byte SEQUENCE_DIRECTION_OUT=2;
@@ -34,15 +34,15 @@
 	private byte sequenceStatus = SEQUENCE_STATUS_ACTIVE;
 	private byte sequenceDirection = SEQUENCE_DIRECTION_OUT;
 	private String sequenceID = null;
-	private ArrayList ackedMessages = null;
+	private ArrayList completedMessages = null;
 	private boolean sequenceEstablished = false;
 	
 	public SequenceReport () {
-		ackedMessages = new ArrayList ();
+		completedMessages = new ArrayList ();
 	}
 	
 	public void setSequenceStatus (byte sequenceStatus) {
-		if (sequenceStatus>=SEQUENCE_STATUS_ACTIVE && sequenceStatus<=SEQUENCE_STATUS_TIMEDOUT) {
+		if (sequenceStatus>=SEQUENCE_STATUS_ACTIVE && sequenceStatus<=SEQUENCE_STATUS_TIMED_OUT) {
 			this.sequenceStatus = sequenceStatus;
 		}
 	}
@@ -69,8 +69,8 @@
 		this.sequenceID = sequenceID;
 	}
 	
-	public ArrayList getAckedMessages () {
-		return ackedMessages;
+	public ArrayList getCompletedMessages () {
+		return completedMessages;
 	}
 
 	public boolean isSequenceEstablished() {
@@ -82,8 +82,12 @@
 	}
 	
 	
-	public void setAckedMessage (String ackedMessage) {
-		ackedMessages.add(ackedMessage);
+	public void addCompletedMessage (Long messageNo) {
+		completedMessages.add(messageNo);
+	}
+	
+	public void setCompletedMessages (ArrayList completedMessages) {
+		this.completedMessages = completedMessages;
 	}
 	
 	

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Tue Jan 17 22:06:44 2006
@@ -122,7 +122,7 @@
 						.getSequencePropretyBeanMgr();
 				SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(
 						sequenceId,
-						Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
+						Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
 				if (receivedMsgsBean != null) {
 					String receivedMsgStr = (String) receivedMsgsBean
 							.getValue();
@@ -155,7 +155,7 @@
 							if (receivedMsgsBean == null) {
 								receivedMsgsBean = new SequencePropertyBean(
 										sequenceId,
-										Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES,
+										Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES,
 										"");
 								seqPropMgr.insert(receivedMsgsBean);
 							}
@@ -206,7 +206,7 @@
 			SequencePropertyBeanMgr seqPropMgr = storageManager
 					.getSequencePropretyBeanMgr();
 			SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(
-					sequenceId, Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
+					sequenceId, Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
 			String receivedMsgStr = (String) receivedMsgsBean.getValue();
 
 			ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Tue Jan 17 22:06:44 2006
@@ -112,9 +112,6 @@
 			return;
 		}
 
-		//Strating the sender.
-		SandeshaUtil.startSenderIfStopped(context);
-
 		//Adding the policy bean
 		RMPolicyBean policyBean = RMPolicyManager.getPolicyBean(rmMsgCtx);
 		rmMsgCtx.setProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN, policyBean);
@@ -161,7 +158,7 @@
 				throw new SandeshaException(message);
 			}
 
-			internalSequenceId = incomingSeqId;
+			internalSequenceId = SandeshaUtil.getServerSideInternalSeqIdFromIncomingSeqId ( incomingSeqId);
 
 		} else {
 			//set the internal sequence id for the client side.
@@ -181,6 +178,9 @@
 
 		}
 
+		//Strating the sender.
+		//SandeshaUtil.startSenderForTheSequence(context,internalSequenceId);
+
 		
 		//check if the first message
 
@@ -192,17 +192,28 @@
 				internalSequenceId,
 				Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
 
+		//setting async ack endpoint for the server side. (if present)
+		if (serverSide) {
+			String incomingSequenceID = SandeshaUtil.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
+			SequencePropertyBean incomingToBean = seqPropMgr.retrieve(incomingSequenceID,Sandesha2Constants.SequenceProperties.TO_EPR);
+			if (incomingToBean!=null) {
+				String incomingTo = incomingToBean.getValue();
+				msgCtx.setProperty(Sandesha2ClientAPI.AcksTo,incomingTo);
+			}
+		}
+		
 		if (messageNumber == 1) {
 			if (outSeqBean == null) {
 				sendCreateSequence = true;
 			}
-		}
-
-		//if fist message - setup the sequence for the client side
-		if (!serverSide && sendCreateSequence) {
+			
+			//if fist message - setup the sending side sequence - both for the server and the client sides
+			//if (!serverSide && sendCreateSequence) {
 			SequenceManager.setupNewClientSequence(msgCtx, internalSequenceId);
 		}
 
+
+
 		//if first message - add create sequence
 		if (sendCreateSequence) {
 
@@ -249,7 +260,15 @@
 							.getProperty(MessageContext.TRANSPORT_IN);
 					if (transportIn == null)
 						transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
-					ListenerManager.makeSureStarted(transportIn, context);
+					
+					//For receiving async Ack messages.
+//					try {
+						ListenerManager.makeSureStarted(transportIn, context);
+//					} catch (AxisFault e) {
+//						log.debug("Could not start listener...");
+//						log.debug(e.getStackTrace());
+//					}
+					
 				} else if (acksTo == null && serverSide) {
 					String incomingSequencId = SandeshaUtil
 							.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
@@ -370,7 +389,7 @@
 			String offeredSequenceId = offer.getIdentifer().getIdentifier();
 			SequencePropertyBean msgsBean = new SequencePropertyBean();
 			msgsBean.setSequenceID(offeredSequenceId);
-			msgsBean.setName(Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
+			msgsBean.setName(Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
 			msgsBean.setValue("");
 
 			SequencePropertyBean offeredSequenceBean = new SequencePropertyBean();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Tue Jan 17 22:06:44 2006
@@ -17,6 +17,7 @@
 
 package org.apache.sandesha2.msgprocessors;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
@@ -124,6 +125,7 @@
 		Collection retransmitterEntriesOfSequence = retransmitterMgr
 				.find(input);
 
+		ArrayList ackedMessagesList = new ArrayList ();
 		while (ackRangeIterator.hasNext()) {
 			AcknowledgementRange ackRange = (AcknowledgementRange) ackRangeIterator
 					.next();
@@ -135,6 +137,8 @@
 						retransmitterEntriesOfSequence, messageNo);
 				if (retransmitterBean != null)
 					retransmitterMgr.delete(retransmitterBean.getMessageID());
+				
+				ackedMessagesList.add(new Long (messageNo));
 			}
 		}
 
@@ -149,24 +153,38 @@
 		//setting acked message date.
 		//TODO add details specific to each message.
 		long noOfMsgsAcked = getNoOfMessagesAcked(sequenceAck.getAcknowledgementRanges().iterator());
-		SequencePropertyBean ackedMessagesBean = seqPropMgr.retrieve(outSequenceId,Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
+		SequencePropertyBean noOfMsgsAckedBean = seqPropMgr.retrieve(outSequenceId,Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
 		boolean added = false;
 		
-		if (ackedMessagesBean==null) {
+		if (noOfMsgsAckedBean==null) {
 			added = true;
-			ackedMessagesBean = new SequencePropertyBean ();
-			ackedMessagesBean.setSequenceID(outSequenceId);
-			ackedMessagesBean.setName(Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
+			noOfMsgsAckedBean = new SequencePropertyBean ();
+			noOfMsgsAckedBean.setSequenceID(outSequenceId);
+			noOfMsgsAckedBean.setName(Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
 		}
 		
-		ackedMessagesBean.setValue(Long.toString(noOfMsgsAcked));
+		noOfMsgsAckedBean.setValue(Long.toString(noOfMsgsAcked));
 		
 		if (added) 
-			seqPropMgr.insert(ackedMessagesBean);
+			seqPropMgr.insert(noOfMsgsAckedBean);
 		else
-			seqPropMgr.update(ackedMessagesBean);
+			seqPropMgr.update(noOfMsgsAckedBean);
 		
 		
+		//setting the completed_messages list. This gives all the messages of the sequence that were acked.
+		SequencePropertyBean allCompletedMsgsBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
+		if (allCompletedMsgsBean==null) {
+			allCompletedMsgsBean = new SequencePropertyBean ();
+			allCompletedMsgsBean.setSequenceID(internalSequenceId);
+			allCompletedMsgsBean.setName(Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
+			
+			seqPropMgr.insert(allCompletedMsgsBean);
+		}
+				
+		String str = ackedMessagesList.toString();
+		allCompletedMsgsBean.setValue(str);
+		seqPropMgr.update(allCompletedMsgsBean);
+		
 		//If all messages up to last message have been acknowledged. Add terminate Sequence message.
 		SequencePropertyBean lastOutMsgBean = seqPropMgr.retrieve(
 				internalSequenceId, Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE);
@@ -185,13 +203,19 @@
 				throw new SandeshaException(message);
 			}
 
+
+			//commiting transaction
+			ackTransaction.commit();
+
 			boolean complete = SandeshaUtil.verifySequenceCompletion(
 					sequenceAck.getAcknowledgementRanges().iterator(),
 					lastOutMessageNo);
-
+			
 			if (complete) {
+				Transaction terminateTransaction = storageManager.getTransaction();
 				addTerminateSequenceMessage(rmMsgCtx, outSequenceId,
 						internalSequenceId);
+				terminateTransaction.commit();
 			}
 		}
 		
@@ -199,8 +223,7 @@
 		//stopping the progress of the message further.
 		rmMsgCtx.pause();	
 		
-		//commiting transaction
-		ackTransaction.commit();
+
 	}
 
 	private SenderBean getRetransmitterEntry(Collection collection,

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Tue Jan 17 22:06:44 2006
@@ -132,7 +132,7 @@
 		SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
 		
 		SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
-				Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
+				Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
 
 		long msgNo = sequence.getMessageNumber().getMessageNumber();
 		if (msgNo == 0) {
@@ -175,7 +175,7 @@
 
 		InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
 
-		long nextMsgno = bean.getNextMsgNoToProcess();
+	//	long nextMsgno = bean.getNextMsgNoToProcess();
 
 		boolean inOrderInvocation = PropertyManager.getInstance()
 				.isInOrderInvocation();
@@ -235,7 +235,7 @@
 
 			//Starting the invoker if stopped.
 			SandeshaUtil
-					.startInvokerIfStopped(msgCtx.getConfigurationContext());
+					.startInvokerForTheSequence(msgCtx.getConfigurationContext(),sequenceId);
 
 		}
 
@@ -431,7 +431,7 @@
 
 			asyncAckTransaction.commit();
 
-			SandeshaUtil.startSenderIfStopped(configCtx);
+			SandeshaUtil.startSenderForTheSequence(configCtx,sequenceId);
 		}
 
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Jan 17 22:06:44 2006
@@ -17,6 +17,8 @@
 
 package org.apache.sandesha2.msgprocessors;
 
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.client.ListenerManager;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.commons.logging.Log;
@@ -88,7 +90,21 @@
 		
 		Transaction terminateTransaction = storageManager.getTransaction();
 		
-		TerminateManager.terminateReceivingSide(context,sequenceId);
+		TerminateManager.cleanReceivingSideOnTerminateMessage(context,sequenceId);
+		SandeshaUtil.stopSenderForTheSequence(sequenceId);
+		
+		//removing an entry from the listener
+		String transport = terminateSeqMsg.getTransportIn().getName().getLocalPart();
+//		try {
+			//This will throw an exception in the server side. //TODO find a better method.
+			//TODO : following causes the SAS to stop withot returning 202. find a better method or correct this
+			//ListenerManager.stop(context, transport);
+//		} catch (AxisFault e) {
+//			// TODO Auto-generated catch block
+//			e.printStackTrace();
+//			String message = "Cant stop listener...";
+//			log.debug(message);
+//		}
 		
 		terminateTransaction.commit(); 
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -19,8 +19,10 @@
 
 import java.util.Collection;
 
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
 
 /**
  * This is used to manage CreateSequence beans.
@@ -41,6 +43,8 @@
 	public boolean update(CreateSeqBean bean) throws SandeshaStorageException;
 
 	public Collection find(CreateSeqBean bean) throws SandeshaStorageException;
+	
+	public CreateSeqBean findUnique (CreateSeqBean bean) throws SandeshaException;
 
 	//public ResultSet find(String query);
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/InvokerBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/InvokerBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/InvokerBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/InvokerBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -19,6 +19,7 @@
 
 import java.util.Collection;
 
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.beans.InvokerBean;
 
@@ -41,6 +42,8 @@
 
 	public Collection find(InvokerBean bean) throws SandeshaStorageException;
 
+	public InvokerBean findUnique (InvokerBean bean) throws SandeshaException;
+	
 	public boolean update(InvokerBean bean) throws SandeshaStorageException;
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -19,7 +19,9 @@
 
 import java.util.Collection;
 
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.beans.InvokerBean;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
 
 /**
@@ -42,6 +44,8 @@
 	public Collection find(NextMsgBean bean) throws SandeshaStorageException;
 
 	public boolean update(NextMsgBean bean) throws SandeshaStorageException;
+	
+	public NextMsgBean findUnique (NextMsgBean bean) throws SandeshaException;
 
 	public Collection retrieveAll();
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -19,7 +19,10 @@
 
 import java.util.Collection;
 
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 
 /**
@@ -42,6 +45,8 @@
 	public Collection find(SenderBean bean) throws SandeshaStorageException;
 
 	public Collection find(String internalSequenceID) throws SandeshaStorageException;
+	
+	public SenderBean findUnique (SenderBean bean) throws SandeshaException;
 	
 	public Collection findMsgsToSend() throws SandeshaStorageException;
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -19,7 +19,9 @@
 
 import java.util.Collection;
 
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 
 /**
@@ -40,6 +42,8 @@
 	//public ResultSet find(String query);
 
 	public Collection find(SequencePropertyBean bean) throws SandeshaStorageException;
+	
+	public SequencePropertyBean findUnique (SequencePropertyBean bean) throws SandeshaException;
 
 	public boolean update(SequencePropertyBean bean) throws SandeshaStorageException;
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -23,7 +23,10 @@
 import java.util.Iterator;
 
 import org.apache.axis2.context.AbstractContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
 
@@ -34,7 +37,9 @@
 
 public class InMemoryCreateSeqBeanMgr implements CreateSeqBeanMgr {
 
+	Log log = LogFactory.getLog(getClass());
 	private Hashtable table = null;
+	
 
 	public InMemoryCreateSeqBeanMgr(AbstractContext context) {
 		Object obj = context.getProperty(Sandesha2Constants.BeanMAPs.CREATE_SEQUECE);
@@ -102,6 +107,21 @@
 
 	public ResultSet find(String query) {
 		throw new UnsupportedOperationException("selectRS() is not supported");
+	}
+	
+	public CreateSeqBean findUnique (CreateSeqBean bean) throws SandeshaException {
+		Collection coll = find(bean);
+		if (coll.size()>1) {
+			String message = "Non-Unique result";
+			log.error(message);
+			throw new SandeshaException (message);
+		}
+		
+		Iterator iter = coll.iterator();
+		if (iter.hasNext())
+			return (CreateSeqBean) iter.next();
+		else 
+			return null;
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -22,8 +22,12 @@
 import java.util.Hashtable;
 import java.util.Iterator;
 import org.apache.axis2.context.AbstractContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.InvokerBean;
 
 /**
@@ -32,6 +36,8 @@
  */
 
 public class InMemoryInvokerBeanMgr implements InvokerBeanMgr {
+	
+	Log log = LogFactory.getLog(getClass());
 	private Hashtable table = null;
 
 	public InMemoryInvokerBeanMgr(AbstractContext context) {
@@ -94,6 +100,21 @@
 			return false;
 
 		return table.put(bean.getMessageContextRefKey(), bean) != null;
+	}
+	
+	public InvokerBean findUnique (InvokerBean bean) throws SandeshaException {
+		Collection coll = find(bean);
+		if (coll.size()>1) {
+			String message = "Non-Unique result";
+			log.error(message);
+			throw new SandeshaException (message);
+		}
+		
+		Iterator iter = coll.iterator();
+		if (iter.hasNext())
+			return (InvokerBean) iter.next();
+		else 
+			return null;
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -23,8 +23,12 @@
 import java.util.Iterator;
 
 import org.apache.axis2.context.AbstractContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
 
 /**
@@ -34,6 +38,7 @@
 
 public class InMemoryNextMsgBeanMgr implements NextMsgBeanMgr {
 
+	Log log = LogFactory.getLog(getClass());
 	private Hashtable table = null;
 
 	public InMemoryNextMsgBeanMgr(AbstractContext context) {
@@ -102,5 +107,20 @@
 
 	public Collection retrieveAll() {
 		return table.values();
+	}
+	
+	public NextMsgBean findUnique(NextMsgBean bean) throws SandeshaException {
+		Collection coll = find(bean);
+		if (coll.size()>1) {
+			String message = "Non-Unique result";
+			log.error(message);
+			throw new SandeshaException (message);
+		}
+		
+		Iterator iter = coll.iterator();
+		if (iter.hasNext())
+			return (NextMsgBean) iter.next();
+		else 
+			return null;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -22,9 +22,13 @@
 import java.util.Iterator;
 
 import org.apache.axis2.context.AbstractContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 
 /**
@@ -33,6 +37,8 @@
  */
 
 public class InMemorySenderBeanMgr implements SenderBeanMgr {
+	
+	Log log = LogFactory.getLog(getClass());
 	private Hashtable table = null;
 
 	public InMemorySenderBeanMgr(AbstractContext context) {
@@ -166,6 +172,21 @@
 			return false;
 
 		return true; //No need to update. Being a reference does the job.
+	}
+	
+	public SenderBean findUnique(SenderBean bean) throws SandeshaException {
+		Collection coll = find(bean);
+		if (coll.size()>1) {
+			String message = "Non-Unique result";
+			log.error(message);
+			throw new SandeshaException (message);
+		}
+		
+		Iterator iter = coll.iterator();
+		if (iter.hasNext())
+			return (SenderBean) iter.next();
+		else 
+			return null;
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java Tue Jan 17 22:06:44 2006
@@ -23,8 +23,12 @@
 import java.util.Iterator;
 
 import org.apache.axis2.context.AbstractContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 
 /**
@@ -33,6 +37,8 @@
  */
 
 public class InMemorySequencePropertyBeanMgr implements SequencePropertyBeanMgr {
+	
+	Log log = LogFactory.getLog(getClass());
 	private Hashtable table = null;
 
 	public InMemorySequencePropertyBeanMgr(AbstractContext context) {
@@ -48,11 +54,7 @@
 	public boolean delete(String sequenceId, String name) {
 		
 		SequencePropertyBean bean = retrieve( sequenceId,name);
-		
-		if (bean.getName().equals(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID)) {
-			int i=1;
-		}
-		
+				
 		return table.remove(sequenceId + ":" + name) != null;
 	}
 
@@ -61,9 +63,6 @@
 	}
 
 	public boolean insert(SequencePropertyBean bean) {
-		
-
-		
 		table.put(bean.getSequenceID() + ":" + bean.getName(), bean);
 		return true;
 	}
@@ -106,6 +105,11 @@
 	}
 
 	public boolean update(SequencePropertyBean bean) {
+		
+		if (bean.getName().equals(Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES)) {
+			int i = 1;
+		}
+	
 		if (!table.contains(bean))
 			return false;
 
@@ -115,6 +119,21 @@
 
 	private String getId(SequencePropertyBean bean) {
 		return bean.getSequenceID() + ":" + bean.getName();
+	}
+	
+	public SequencePropertyBean findUnique(SequencePropertyBean bean) throws SandeshaException {
+		Collection coll = find(bean);
+		if (coll.size()>1) {
+			String message = "Non-Unique result";
+			log.error(message);
+			throw new SandeshaException (message);
+		}
+		
+		Iterator iter = coll.iterator();
+		if (iter.hasNext())
+			return (SequencePropertyBean) iter.next();
+		else 
+			return null;
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java Tue Jan 17 22:06:44 2006
@@ -19,11 +19,14 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.policy.RMPolicyBean;
 
 /**
@@ -82,7 +85,7 @@
 		loadInactivityTimeout(properties);
 		loadStoragemanagerClass(properties);
 		loadInOrderInvocation (properties);
-		
+		loadMessageTypesToDrop (properties);
 	}
 	
 	/**
@@ -239,6 +242,35 @@
 		
 	}
 	
+	private void loadMessageTypesToDrop (Properties properties) {
+		String messageTypesToDrop = properties.getProperty(Sandesha2Constants.Properties.MessageTypesToDrop);
+		boolean loaded=false;
+		
+		try {
+			if (messageTypesToDrop!=null && !Sandesha2Constants.VALUE_NONE.equals(messageTypesToDrop)) {
+				messageTypesToDrop = messageTypesToDrop.trim();
+				messageTypesToDrop = "[" + messageTypesToDrop + "]";
+				ArrayList messageTypesLst =  SandeshaUtil.getArrayListFromString(messageTypesToDrop);
+				
+				Iterator itr = messageTypesLst.iterator();
+				while (itr.hasNext()) {
+					String typeStr = (String) itr.next();
+					Integer typeNo = new Integer (typeStr);
+					propertyBean.addMsgTypeToDrop(typeNo);
+				}
+			}
+			
+		} catch (SandeshaException e) {
+			log.error(e.getMessage());
+		} catch (NumberFormatException e) {
+			String message = "Property '" + Sandesha2Constants.Properties.MessageTypesToDrop + "' contains an invalid value.";
+			log.error(message);
+			log.error(e.getMessage());
+		}
+		
+		
+	}
+	
 	
 	
 	public boolean isExponentialBackoff () {
@@ -268,6 +300,10 @@
 	
 	public boolean isInOrderInvocation () {
 		return propertyBean.isInOrder();
+	}
+	
+	public ArrayList getMessagesNotToSend () {
+		return propertyBean.getMsgTypesToDrop();
 	}
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java Tue Jan 17 22:06:44 2006
@@ -373,7 +373,7 @@
 				.getSequencePropretyBeanMgr();
 
 		SequencePropertyBean seqBean = seqPropMgr.retrieve(sequenceId,
-				Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
+				Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
 		String msgNoList = (String) seqBean.getValue();
 
 		ArrayList ackRangeArrayList = SandeshaUtil.getAckRangeArrayList(msgNoList,factory);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java Tue Jan 17 22:06:44 2006
@@ -17,6 +17,8 @@
 
 package org.apache.sandesha2.util;
 
+import java.util.ArrayList;
+
 import org.apache.sandesha2.policy.RMPolicyBean;
 
 /**
@@ -30,6 +32,7 @@
 	RMPolicyBean policyBean = new RMPolicyBean ();
 	String storageManagerClass = null;
 	boolean inOrder = true;
+	ArrayList msgTypesToDrop = null;
     
     public long getInactiveTimeoutInterval() {
         return policyBean.getInactiveTimeoutInterval();
@@ -99,4 +102,24 @@
 	public void setInOrder(boolean inOrder) {
 		this.inOrder = inOrder;
 	}
+
+	public ArrayList getMsgTypesToDrop() {
+		return msgTypesToDrop;
+	}
+
+	public void setMsgTypesToDrop(ArrayList msgTypesToDrop) {
+		this.msgTypesToDrop = msgTypesToDrop;
+	}
+	
+	public void addMsgTypeToDrop (Integer typeNo) {
+		
+		if (typeNo!=null) {
+			if (msgTypesToDrop==null) 
+				msgTypesToDrop = new ArrayList ();
+			
+			msgTypesToDrop.add(typeNo);
+		}
+	}
+	
+	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Tue Jan 17 22:06:44 2006
@@ -220,17 +220,23 @@
 		return (MessageContext) storedMsgContexts.get(key);
 	}
 
-	public static void startSenderIfStopped(ConfigurationContext context) {
-		if (!sender.isSenderStarted()) {
-			sender.start(context);
-		}
+	public static void startSenderForTheSequence(ConfigurationContext context, String sequenceID) {
+		sender.runSenderForTheSequence (context,sequenceID);
 	}
 
-	public static void startInvokerIfStopped(ConfigurationContext context) {
+	public static void stopSenderForTheSequence(String sequenceID) {
+		sender.stopSenderForTheSequence (sequenceID);
+	}
+	
+	public static void startInvokerForTheSequence(ConfigurationContext context, String sequenceID) {
 		if (!invoker.isInvokerStarted()) {
-			invoker.start(context);
+			invoker.runInvokerForTheSequence(context,sequenceID);
 		}
 	}
+	
+	public static void stopInvokerForTheSequence(String sequenceID) {
+		invoker.stopInvokerForTheSequence (sequenceID);
+	}
 
 	public static boolean verifySequenceCompletion(Iterator ackRangesIterator,
 			long lastMessageNo) {
@@ -363,15 +369,21 @@
 		return results;
 	}
 
-	public static String getServerSideIncomingSeqIdFromInternalSeqId(
-			String internalSequenceId) {
-		String incomingSequenceId = internalSequenceId;
+	public static String getServerSideIncomingSeqIdFromInternalSeqId (
+			String internalSequenceId) throws SandeshaException  {
+		
+		String startStr = Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":";
+		if (!internalSequenceId.startsWith(startStr)){
+			throw new SandeshaException ("Invalid internal sequence ID");
+		}
+		
+		String incomingSequenceId = internalSequenceId.substring(startStr.length());
 		return incomingSequenceId;
 	}
 
 	public static String getServerSideInternalSeqIdFromIncomingSeqId(
 			String incomingSequenceId) {
-		String internalSequenceId = incomingSequenceId;
+		String internalSequenceId =  Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":" + incomingSequenceId;
 		return internalSequenceId;
 	}
 
@@ -560,6 +572,7 @@
 	}
 	
 	public static ArrayList getArrayListFromString (String str) throws SandeshaException {
+		
 		if (str==null)
 			return new ArrayList ();
 		
@@ -581,11 +594,11 @@
 		
 		String subStr = str.substring(1,length-1);
 		
-		String[] sequenceIDs = subStr.split(",");
+		String[] parts = subStr.split(",");
 		
-		for (int i=0;i<sequenceIDs.length;i++) {
-			if (!"".equals(sequenceIDs[i]))
-				retArr.add(sequenceIDs[i]);
+		for (int i=0;i<parts.length;i++) {
+			if (!"".equals(parts[i]))
+				retArr.add(parts[i].trim());
 		}
 		
 		return retArr;
@@ -601,6 +614,10 @@
 		else 
 			return to + ":" +sequenceKey;
 	}
+	
+//	public static String getServerSideInternalSeqID (String incomingSeqId) {
+//		return (Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":" + incomingSeqId);
+//	}
 	
 	public static String getSequenceIDFromInternalSequenceID (String internalSequenceID, ConfigurationContext configurationContext)  throws SandeshaException {
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Tue Jan 17 22:06:44 2006
@@ -6,11 +6,13 @@
  */
 package org.apache.sandesha2.util;
 
+import java.net.BindException;
 import java.util.Collection;
 import java.util.StringTokenizer;
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.ListenerManager;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.MessageContextConstants;
@@ -71,11 +73,11 @@
 		}
 
 		StorageManager storageManager = null;
-
+		ConfigurationContext configurationContext = createSequenceMsg.getMessageContext()
+									.getConfigurationContext();
 		try {
 			storageManager = SandeshaUtil
-					.getSandeshaStorageManager(createSequenceMsg
-							.getMessageContext().getConfigurationContext());
+					.getSandeshaStorageManager(configurationContext);
 		} catch (SandeshaException e) {
 			e.printStackTrace();
 		}
@@ -84,7 +86,7 @@
 				.getSequencePropretyBeanMgr();
 
 		SequencePropertyBean receivedMsgBean = new SequencePropertyBean(
-				sequenceId, Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES, "");
+				sequenceId, Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES, "");
 		
 		//If no replyTo value. Send responses as sync.
 		SequencePropertyBean toBean = null;
@@ -112,6 +114,21 @@
 		nextMsgMgr.insert(new NextMsgBean(sequenceId, 1)); // 1 will be the next
 		// message to invoke. This will apply for only in-order invocations.
 
+		
+		SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceId);
+		
+		//Adding another entry to the ListnerManager to wait till the terminate sequence message.
+		String transport = createSequenceMsg.getMessageContext().getTransportIn().getName().getLocalPart();
+		
+		
+		//Only the client side should call below.
+//		try {
+//			//An bind method is thrown when this is done in the server side. TODO find a better method to do this.
+//			ListenerManager.makeSureStarted(transport,configurationContext);
+//		} catch (AxisFault ex) {
+//			log.info("Counght exception when starting listner. Possible server side start.");
+//		}
+		
 		updateLastActivatedTime(sequenceId,createSequenceMsg.getMessageContext().getConfigurationContext());
 		
 		return sequenceId;
@@ -124,10 +141,12 @@
 	public static void setupNewClientSequence(
 			MessageContext firstAplicationMsgCtx, String internalSequenceId)
 			throws SandeshaException {
+		
+		ConfigurationContext configurationContext = firstAplicationMsgCtx
+										.getConfigurationContext();
 
 		StorageManager storageManager = SandeshaUtil
-				.getSandeshaStorageManager(firstAplicationMsgCtx
-						.getConfigurationContext());
+				.getSandeshaStorageManager(configurationContext);
 
 		SequencePropertyBeanMgr seqPropMgr = storageManager
 				.getSequencePropretyBeanMgr();
@@ -167,12 +186,31 @@
 			seqPropMgr.insert(transportToBean);
 		}
 
+
+		SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
+		
 	}
 	
+	/**
+	 * Takes the internalSeqID as the param. Not the sequenceID.
+	 * @param internalSequenceID
+	 * @param configContext
+	 * @throws SandeshaException
+	 */
 	public static void updateLastActivatedTime (String sequenceID, ConfigurationContext configContext) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		Transaction lastActivatedTransaction = storageManager.getTransaction();
 		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+//		SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean (sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,null);
+//		SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr.findUnique(internalSequenceFindBean);
+//		if (internalSequenceBean==null) {
+//			String message = "InternalSequenceBean is not set";
+//			log.error(message);
+//			throw new SandeshaException (message);
+//		}
+//		
+//		String internalSequenceID = internalSequenceBean.getValue();
 		SequencePropertyBean lastActivatedBean = sequencePropertyBeanMgr.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
 		
 		boolean added = false;
@@ -195,6 +233,7 @@
 		lastActivatedTransaction.commit();
 	}
 	
+	
 	public static long getLastActivatedTime (String sequenceID, ConfigurationContext configContext) throws SandeshaException {
 		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
@@ -314,7 +353,7 @@
 		Transaction transaction = storageManager.getTransaction();
 		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		
-		SequencePropertyBean receivedMsgsBean = seqPropBeanMgr.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
+		SequencePropertyBean receivedMsgsBean = seqPropBeanMgr.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.COMPLETED_MESSAGES);
 		
 		//we should be able to assume that all the received messages has been acked.
 		String receivedMsgsStr = receivedMsgsBean.getValue();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Tue Jan 17 22:06:44 2006
@@ -51,33 +51,47 @@
  */
 
 public class InOrderInvoker extends Thread {
-	boolean invokerStarted = false;
-
-	ConfigurationContext context = null;
+	
+	private boolean runInvoker = false;
+	//private boolean stopInvokerAfterWork = false;
+	private ArrayList workingSequences = new ArrayList();
+	
+	private ConfigurationContext context = null;
 	
 	Log log = LogFactory.getLog(getClass());
 
-	public synchronized void stopInvoker() {
-		invokerStarted = false;
+	public synchronized void stopInvokerForTheSequence(String sequenceID) {
+		workingSequences.remove(sequenceID);
+		if (workingSequences.size()==0) {
+			
+			//runInvoker = false;
+		}
 	}
 
 	public synchronized boolean isInvokerStarted() {
-		return invokerStarted;
+		return runInvoker;
 	}
 
 	public void setConfugurationContext(ConfigurationContext context) {
 		this.context = context;
 	}
 
-	public void start(ConfigurationContext context) {
-		invokerStarted = true;
-		this.context = context;
-		super.start();
+	public synchronized void runInvokerForTheSequence(ConfigurationContext context, String sequenceID) {
+		
+		if (!workingSequences.contains(sequenceID))
+			workingSequences.add(sequenceID);
+		
+
+		if (!isInvokerStarted()) {
+			runInvoker = true;     //so that isSenderStarted()=true.
+			super.start();
+			this.context = context;
+		}
 	}
 
 	public void run() {
 
-		while (isInvokerStarted()) {
+		while (runInvoker) {
 
 			try {
 				Thread.sleep(1000);
@@ -99,30 +113,41 @@
 						.getSequencePropretyBeanMgr();
 
 				//Getting the incomingSequenceIdList
-				SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) sequencePropMgr
+				SequencePropertyBean allSequencesBean = (SequencePropertyBean) sequencePropMgr
 						.retrieve(
 								Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
 								Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-				if (sequencePropertyBean == null)
+				if (allSequencesBean == null)
 					continue;
 
-				ArrayList seqPropList = SandeshaUtil.getArrayListFromString( sequencePropertyBean
+				ArrayList allSequencesList = SandeshaUtil.getArrayListFromString( allSequencesBean
 						.getValue());
-				Iterator seqPropIt = seqPropList.iterator();
+				Iterator allSequencesItr = allSequencesList.iterator();
 
-				currentIteration: while (seqPropIt.hasNext()) {
+				currentIteration: while (allSequencesItr.hasNext()) {
 
-					String sequenceId = (String) seqPropIt.next();
+					String sequenceId = (String) allSequencesItr.next();
 
 					NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
-					if (nextMsgBean == null)
-						throw new SandeshaException(
-								"Next message not set correctly");
+					if (nextMsgBean == null) {
+
+						String message = "Next message not set correctly. Removing invalid entry.";
+						log.debug(message);
+						allSequencesItr.remove();
+						
+						//cleaning the invalid data of the all sequences.
+						allSequencesBean.setValue(allSequencesList.toString());
+						sequencePropMgr.update(allSequencesBean);	
+						
+						throw new SandeshaException (message);
+					}
 
 					long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
-					if (nextMsgno <= 0)
-						throw new SandeshaException(
-								"Invalid messaage number for the nextMsgNo");
+					if (nextMsgno <= 0) { 
+						String message = "Invalid messaage number as the Next Message Number. Removing invalid entry";
+						
+						throw new SandeshaException(message);
+					}
 
 					Iterator stMapIt = storageMapMgr.find(
 							new InvokerBean(null, nextMsgno, sequenceId))
@@ -173,8 +198,10 @@
 									.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 							if (sequence.getLastMessage() != null) {
 								
-								TerminateManager.terminateAfterInvocation(
+								TerminateManager.cleanReceivingSideAfterInvocation(
 										context, sequenceId);
+								//this sequence has no more invocations
+								stopInvokerForTheSequence(sequenceId);
 								
 								//exit from current iteration. (since an entry was removed)
 								break currentIteration;

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=370065&r1=370064&r2=370065&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Tue Jan 17 22:06:44 2006
@@ -16,10 +16,17 @@
  */
 package org.apache.sandesha2.workers;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamWriter;
+
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.client.ListenerManager;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -41,6 +48,7 @@
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
 import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.PropertyManager;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.Sequence;
@@ -56,18 +64,23 @@
 
 public class Sender extends Thread {
 
-	private boolean senderStarted = false;
-
+	private boolean runSender = false;
+	private boolean stopSenderAfterWork = false;
+	private ArrayList workingSequences = new ArrayList();
+	
 	private ConfigurationContext context = null;
 	
 	Log log = LogFactory.getLog(getClass());
 
-	public synchronized void stopSender() {
-		senderStarted = false;
+	public synchronized void stopSenderForTheSequence(String sequenceID) {
+		workingSequences.remove(sequenceID);
+		if (workingSequences.size()==0) {
+			//stopSenderAfterWork = true;
+		}
 	}
 
 	public synchronized boolean isSenderStarted() {
-		return senderStarted;
+		return runSender;
 	}
 
 	public void run() {
@@ -82,8 +95,10 @@
 			e2.printStackTrace();
 			return;
 		}
-
-		while (senderStarted) {
+		
+		while (runSender) {
+			
+			
 			try {
 				if (context == null) {
 					String message = "Can't continue the Sender. Context is null";
@@ -98,7 +113,12 @@
 
 				SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr();
 				Collection coll = mgr.findMsgsToSend();
-
+				if (coll.size()==0 && stopSenderAfterWork) {
+					runSender = false;
+					pickMessagesToSendTransaction.commit();
+					continue;
+				}
+				
 				pickMessagesToSendTransaction.commit();
 				
 				Iterator iter = coll.iterator();
@@ -116,9 +136,16 @@
 							log.debug ("ERROR: Sender has an Unavailable Message entry");
 							break;
 						}
+												
 						RMMsgContext rmMsgCtx = MsgInitializer
 								.initializeMessage(msgCtx);
 
+						//skip sending if this message has been mentioned as a message not to send (within sandesha2.properties)
+						ArrayList msgsNotToSend = PropertyManager.getInstance().getMessagesNotToSend();
+						if (msgsNotToSend!=null && msgsNotToSend.contains(new Integer (rmMsgCtx.getMessageType()))) {
+							continue;
+						}
+						
 						updateMessage(msgCtx);
 
 						
@@ -151,12 +178,15 @@
 									.piggybackAckIfPresent(rmMsgCtx);
 						}
 						
-						preSendTransaction.commit();
 
+						
+						preSendTransaction.commit();
+						
 						try {
 							//every message should be resumed (pause==false) when sending
 							boolean paused = msgCtx.isPaused();
 							
+							
 							AxisEngine engine = new AxisEngine(msgCtx
 									.getConfigurationContext());
 							if (paused) {
@@ -218,6 +248,13 @@
 
 							TerminateManager.terminateSendingSide(
 									configContext, sequenceID);
+							
+							//removing a entry from the Listener
+							String transport = msgCtx.getTransportOut().getName().getLocalPart();
+							
+							
+							//TODO complete below. Need a more eligent method which finishes the current message before ending.
+							//ListenerManager.stop(configContext,transport);
 						}
 
 						terminateCleaningTransaction.commit();
@@ -264,22 +301,29 @@
 		return true;
 	}
 
-	public void start(ConfigurationContext context) {
-		senderStarted = true;
-		this.context = context;
-		super.start();
+	public synchronized void runSenderForTheSequence(ConfigurationContext context, String sequenceID) {
+		
+		if (!workingSequences.contains(sequenceID))
+			workingSequences.add(sequenceID);
+		
+
+		if (!isSenderStarted()) {
+			runSender = true;     //so that isSenderStarted()=true.
+			super.start();
+			this.context = context;
+		}
 	}
 
 	private void updateMessage(MessageContext msgCtx1) throws SandeshaException {
-		try {
-			RMMsgContext rmMsgCtx1 = MsgInitializer.initializeMessage(msgCtx1);
-			rmMsgCtx1.addSOAPEnvelope();
-
-		} catch (AxisFault e) {
-			String message = "Exception in updating contexts";
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
+//		try {
+//			RMMsgContext rmMsgCtx1 = MsgInitializer.initializeMessage(msgCtx1);
+//			rmMsgCtx1.addSOAPEnvelope();
+//
+//		} catch (AxisFault e) {
+//			String message = "Exception in updating contexts";
+//			log.debug(message);
+//			throw new SandeshaException(message);
+//		}
 
 	}
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org