You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/12/27 14:07:19 UTC

svn commit: r359208 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/ msgprocessors/ storage/ storage/inmemory/ util/ workers/

Author: chamikara
Date: Tue Dec 27 05:06:25 2005
New Revision: 359208

URL: http://svn.apache.org/viewcvs?rev=359208&view=rev
Log:
Bug fixes.
Corrrected inactivity timeout logic.
Some changes to improve the preformance.
Corrections in the transaction logic.

Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Tue Dec 27 05:06:25 2005
@@ -23,6 +23,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -41,7 +42,8 @@
 public class AcknowledgementManager {
 
 	/**
-	 * Piggybacks any available acks of the same sequence to the given application message.
+	 * Piggybacks any available acks of the same sequence to the given
+	 * application message.
 	 * 
 	 * @param applicationRMMsgContext
 	 * @throws SandeshaException
@@ -52,6 +54,7 @@
 				.getMessageContext().getConfigurationContext();
 		StorageManager storageManager = SandeshaUtil
 				.getSandeshaStorageManager(configurationContext);
+
 		SenderBeanMgr retransmitterBeanMgr = storageManager
 				.getRetransmitterBeanMgr();
 		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager
@@ -68,7 +71,8 @@
 		String sequenceId = sequence.getIdentifier().getIdentifier();
 
 		SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr
-				.retrieve(sequenceId,
+				.retrieve(
+						sequenceId,
 						Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
 		if (internalSequenceBean == null)
 			throw new SandeshaException("Temp Sequence is not set");
@@ -82,27 +86,32 @@
 		Iterator it = collection.iterator();
 
 		if (it.hasNext()) {
+
 			SenderBean ackBean = (SenderBean) it.next();
 
-			//deleting the ack entry.
-			retransmitterBeanMgr.delete(ackBean.getMessageID());
+			long timeNow = System.currentTimeMillis();
+			if (ackBean.getTimeToSend() > timeNow) { //Piggybacking will happen only if the end of ack interval (timeToSend) is not reached.
 
-			//Adding the ack to the application message
-			MessageContext ackMsgContext = SandeshaUtil
-					.getStoredMessageContext(ackBean.getMessageContextRefKey());
-			RMMsgContext ackRMMsgContext = MsgInitializer
-					.initializeMessage(ackMsgContext);
-			if (ackRMMsgContext.getMessageType() != Sandesha2Constants.MessageTypes.ACK)
-				throw new SandeshaException("Invalid ack message entry");
-
-			SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) ackRMMsgContext
-					.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-			applicationRMMsgContext.setMessagePart(
-					Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
-					sequenceAcknowledgement);
+				//deleting the ack entry.
+				retransmitterBeanMgr.delete(ackBean.getMessageID());
 
-			applicationRMMsgContext.addSOAPEnvelope();
-		}
+				//Adding the ack to the application message
+				MessageContext ackMsgContext = SandeshaUtil
+						.getStoredMessageContext(ackBean
+								.getMessageContextRefKey());
+				RMMsgContext ackRMMsgContext = MsgInitializer
+						.initializeMessage(ackMsgContext);
+				if (ackRMMsgContext.getMessageType() != Sandesha2Constants.MessageTypes.ACK)
+					throw new SandeshaException("Invalid ack message entry");
+
+				SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) ackRMMsgContext
+						.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+				applicationRMMsgContext.setMessagePart(
+						Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+						sequenceAcknowledgement);
 
+				applicationRMMsgContext.addSOAPEnvelope();
+			}
+		}
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Tue Dec 27 05:06:25 2005
@@ -178,6 +178,8 @@
 		String OFFERED_SEQUENCE = "OfferedSequence";
 
 		String TERMINATE_ADDED = "TerminateAdded";
+		
+		String LAST_ACTIVATED_TIME = "LastActivatedTime";
 	}
 
 	public interface SOAPVersion {
@@ -298,11 +300,11 @@
 
 	int INVOKER_SLEEP_TIME = 1000;
 
-	int SENDER_SLEEP_TIME = 1000;
+	int SENDER_SLEEP_TIME = 500;
 
 	int CLIENT_SLEEP_TIME = 10000;
 
-	int TERMINATE_DELAY = 1000;
+	int TERMINATE_DELAY = 100;
 
 	String TEMP_SEQUENCE_ID = "uuid:tempID";
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java Tue Dec 27 05:06:25 2005
@@ -18,8 +18,11 @@
 package org.apache.sandesha2;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.modules.Module;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
 
 /**
  * The Module class of Sandesha2.
@@ -32,12 +35,21 @@
 
 	// initialize the module
 	public void init(AxisConfiguration axisSystem) throws AxisFault {
-
+		cleanStorage (axisSystem);
 	}
 
 	// shutdown the module
 	public void shutdown(AxisConfiguration axisSystem) throws AxisFault {
 
+	}
+	
+	private void cleanStorage (AxisConfiguration axisSystem) throws AxisFault {
+		
+		ConfigurationContext configurationContext = new ConfigurationContext (axisSystem);
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		
+		storageManager.initStorage();
+		
 	}
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Tue Dec 27 05:06:25 2005
@@ -85,12 +85,12 @@
 	 */
 	public static void terminateAfterInvocation (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
-		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
 
 		//removing storageMap entries
 		InvokerBean findStorageMapBean = new InvokerBean ();
 		findStorageMapBean.setSequenceID(sequenceID);
+		findStorageMapBean.setInvoked(true);
 		Collection collection = storageMapBeanMgr.find(findStorageMapBean);
 		Iterator iterator = collection.iterator();
 		while (iterator.hasNext()) {
@@ -98,6 +98,13 @@
 			storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
 		}
 		
+		removeReceivingSideProperties(configContext,sequenceID);
+
+	}
+	
+	private static void removeReceivingSideProperties (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		SequencePropertyBean allSequenceBean = sequencePropertyBeanMgr.retrieve(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
 		ArrayList allSequenceList = SandeshaUtil.getArrayListFromString(allSequenceBean.getValue());
 		allSequenceList.remove(sequenceID);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Tue Dec 27 05:06:25 2005
@@ -218,9 +218,23 @@
 					acksTo = (String) msgCtx
 							.getProperty(Sandesha2ClientAPI.AcksTo);
 				}
+				
+				if (msgCtx.isServerSide()) {
+					//we do not set acksTo value to anonymous when the create sequence is send from the server.
+					
+					MessageContext requestMessage = operationContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+					if (requestMessage==null) {
+						throw new SandeshaException ("Request message is not present");
+					}
+					
+					acksTo = requestMessage.getTo().getAddress();
+					
+				} else {
+					if (acksTo == null)
+						acksTo = Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
+				}
+
 
-				if (acksTo == null)
-					acksTo = Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
 
 				//If acksTo is not anonymous. Start the listner TODO: verify
 				if (!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Tue Dec 27 05:06:25 2005
@@ -36,6 +36,7 @@
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.AcknowledgementRange;
 import org.apache.sandesha2.wsrm.Nack;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -80,6 +81,9 @@
 		if (outSequenceId == null || "".equals(outSequenceId))
 			throw new SandeshaException("OutSequenceId is null");
 
+		//updating the last activated time of the sequence.
+		SequenceManager.updateLastActivatedTime(outSequenceId,rmMsgCtx.getMessageContext().getConfigurationContext());
+		
 		SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(
 				outSequenceId, Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
 
@@ -154,8 +158,6 @@
 				addTerminateSequenceMessage(rmMsgCtx, outSequenceId,
 						internalSequenceId);
 			}
-
-
 		}
 		
 		//stopping the progress of the message further.

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Tue Dec 27 05:06:25 2005
@@ -53,6 +53,7 @@
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.LastMessage;
 import org.apache.sandesha2.wsrm.Sequence;
@@ -83,23 +84,28 @@
 		if (msgCtx == null)
 			throw new SandeshaException("Message context is null");
 
-		if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
-				&& rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE)
-						.equals("true")) {
+		if (rmMsgCtx
+				.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+				&& rmMsgCtx.getProperty(
+						Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals(
+						"true")) {
 			return;
 		}
 
-		//RM will not rend sync responses. If sync acks are there this will be made true again later.
-		if(rmMsgCtx.getMessageContext().getOperationContext()!=null) {
-			rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,Constants.VALUE_FALSE);
+		//RM will not rend sync responses. If sync acks are there this will be
+		// made true again later.
+		if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
+			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+					Constants.RESPONSE_WRITTEN, Constants.VALUE_FALSE);
 		}
-		
+
 		StorageManager storageManager = SandeshaUtil
 				.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
 						.getConfigurationContext());
-		
-		Transaction applicationMsgTransaction = storageManager.getTransaction();
-		
+
+		Transaction updataMsgStringTransaction = storageManager
+				.getTransaction();
+
 		SequencePropertyBeanMgr seqPropMgr = storageManager
 				.getSequencePropretyBeanMgr();
 
@@ -112,6 +118,9 @@
 		if (configCtx == null)
 			throw new SandeshaException("Configuration Context is null");
 
+		//updating the last activated time of the sequence.
+		SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
+		
 		SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
 				Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
 
@@ -127,7 +136,8 @@
 			// EXACTLY_ONCE.
 
 			//msgCtx.pause();
-			rmMsgCtx.getMessageContext().setPausedTrue(new QName (Sandesha2Constants.IN_HANDLER_NAME));
+			rmMsgCtx.getMessageContext().setPausedTrue(
+					new QName(Sandesha2Constants.IN_HANDLER_NAME));
 
 		}
 
@@ -139,7 +149,11 @@
 		msgsBean.setValue(messagesStr);
 		seqPropMgr.update(msgsBean);
 
-		sendAckIfNeeded(rmMsgCtx, messagesStr);
+		updataMsgStringTransaction.commit();
+
+
+
+		Transaction invokeTransaction = storageManager.getTransaction();
 
 		//	Pause the messages bean if not the right message to invoke.
 		NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
@@ -152,82 +166,71 @@
 
 		long nextMsgno = bean.getNextMsgNoToProcess();
 
-		if (msgCtx.isServerSide()) {
-			boolean inOrderInvocation = PropertyManager.getInstance().isInOrderInvocation();
-			if (inOrderInvocation) {
-				//pause the message
-				//msgCtx.pause();
-				rmMsgCtx.getMessageContext().setPausedTrue(new QName (Sandesha2Constants.IN_HANDLER_NAME));
-				SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr
-						.retrieve(
-								Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
-								Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
-				if (incomingSequenceListBean == null) {
-					ArrayList incomingSequenceList = new ArrayList();
-					incomingSequenceListBean = new SequencePropertyBean();
-					incomingSequenceListBean
-							.setSequenceID(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
-					incomingSequenceListBean
-							.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-					incomingSequenceListBean.setValue(incomingSequenceList.toString());
+		boolean inOrderInvocation = PropertyManager.getInstance()
+				.isInOrderInvocation();
+		if (inOrderInvocation) {
+			//pause the message
+			//msgCtx.pause();
+			rmMsgCtx.getMessageContext().setPausedTrue(
+					new QName(Sandesha2Constants.IN_HANDLER_NAME));
+			SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr
+					.retrieve(
+							Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+							Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+			if (incomingSequenceListBean == null) {
+				ArrayList incomingSequenceList = new ArrayList();
+				incomingSequenceListBean = new SequencePropertyBean();
+				incomingSequenceListBean
+						.setSequenceID(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
+				incomingSequenceListBean
+						.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+				incomingSequenceListBean.setValue(incomingSequenceList
+						.toString());
 
-					seqPropMgr.insert(incomingSequenceListBean);
-				}
+				seqPropMgr.insert(incomingSequenceListBean);
+			}
 
-				ArrayList incomingSequenceList = SandeshaUtil.getArrayListFromString(incomingSequenceListBean
-						.getValue());
+			ArrayList incomingSequenceList = SandeshaUtil
+					.getArrayListFromString(incomingSequenceListBean.getValue());
 
-				//Adding current sequence to the incoming sequence List.
-				if (!incomingSequenceList.contains(sequenceId)) {
-					incomingSequenceList.add(sequenceId);
-					
-					//saving the property.
-					incomingSequenceListBean.setValue(incomingSequenceList.toString());
-					seqPropMgr.insert(incomingSequenceListBean);
-				}
+			//Adding current sequence to the incoming sequence List.
+			if (!incomingSequenceList.contains(sequenceId)) {
+				incomingSequenceList.add(sequenceId);
+
+				//saving the property.
+				incomingSequenceListBean.setValue(incomingSequenceList
+						.toString());
+				seqPropMgr.insert(incomingSequenceListBean);
+			}
 
-				//saving the message.
-				try {
-					String key = SandeshaUtil.storeMessageContext(rmMsgCtx
-							.getMessageContext());
-					storageMapMgr.insert(new InvokerBean(key, msgNo,
-							sequenceId));
-
-					//This will avoid performing application processing more
-					// than
-					// once.
-					rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
-							"true");
+			//saving the message.
+			try {
+				String key = SandeshaUtil.storeMessageContext(rmMsgCtx
+						.getMessageContext());
+				storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
+
+				//This will avoid performing application processing more
+				// than
+				// once.
+				rmMsgCtx.setProperty(
+						Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 
-				} catch (Exception ex) {
-					throw new SandeshaException(ex.getMessage());
-				}
+			} catch (Exception ex) {
+				throw new SandeshaException(ex.getMessage());
+			}
 
-				//Starting the invoker if stopped.
-				SandeshaUtil.startInvokerIfStopped(msgCtx.getConfigurationContext());
+			//Starting the invoker if stopped.
+			SandeshaUtil
+					.startInvokerIfStopped(msgCtx.getConfigurationContext());
 
-			}
 		}
 
-//		try {
-//			MessageContext requestMessage = rmMsgCtx.getMessageContext()
-//					.getOperationContext().getMessageContext(
-//							WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-//			String requestMessageId = requestMessage.getMessageID();
-//			SequencePropertyBean checkResponseBean = seqPropMgr.retrieve(
-//					requestMessageId,
-//					Sandesha2Constants.SequenceProperties.CHECK_RESPONSE);
-//			if (checkResponseBean != null) {
-//				checkResponseBean.setValue(msgCtx);
-//				seqPropMgr.update(checkResponseBean);
-//			}
-//
-//		} catch (AxisFault e) {
-//			throw new SandeshaException(e.getMessage());
-//		}
-		
-		applicationMsgTransaction.commit();
+		invokeTransaction.commit();
+
+		//Sending acknowledgements
+		sendAckIfNeeded(rmMsgCtx, messagesStr);
+
 	}
 
 	//TODO convert following from INT to LONG
@@ -272,7 +275,7 @@
 		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceId,
 				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
 
-		EndpointReference acksTo = new EndpointReference (acksToBean.getValue());
+		EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
 		String acksToStr = acksTo.getAddress();
 
 		if (acksToStr == null || messagesStr == null)
@@ -311,9 +314,10 @@
 
 		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(
 				rmMsgCtx, ackOperation);
-		
-		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-		
+
+		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
+				"true");
+
 		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
 
 		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
@@ -351,10 +355,11 @@
 			}
 
 			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
-					org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+					org.apache.axis2.Constants.RESPONSE_WRITTEN,
+					Constants.VALUE_TRUE);
 
-			rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN,
-					"true");
+			rmMsgCtx.getMessageContext().setProperty(
+					Sandesha2Constants.ACK_WRITTEN, "true");
 			try {
 				engine.send(ackRMMsgCtx.getMessageContext());
 			} catch (AxisFault e1) {
@@ -362,6 +367,8 @@
 			}
 		} else {
 
+			Transaction asyncAckTransaction = storageManager.getTransaction();
+
 			SenderBeanMgr retransmitterBeanMgr = storageManager
 					.getRetransmitterBeanMgr();
 
@@ -380,15 +387,15 @@
 
 			RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
 					.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
-			long ackInterval = PropertyManager.getInstance().getAcknowledgementInterval();
+			long ackInterval = PropertyManager.getInstance()
+					.getAcknowledgementInterval();
 			if (policyBean != null) {
 				ackInterval = policyBean.getAcknowledgementInaterval();
 			}
-
+			
 			//Ack will be sent as stand alone, only after the retransmitter
 			// interval.
 			long timeToSend = System.currentTimeMillis() + ackInterval;
-			ackBean.setTimeToSend(timeToSend);
 
 			//removing old acks.
 			SenderBean findBean = new SenderBean();
@@ -398,14 +405,19 @@
 			findBean.setReSend(false);
 			Collection coll = retransmitterBeanMgr.find(findBean);
 			Iterator it = coll.iterator();
-			while (it.hasNext()) {
-				SenderBean retransmitterBean = (SenderBean) it
-						.next();
-				retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+			if (it.hasNext()) {
+				SenderBean oldAckBean = (SenderBean) it.next();
+				timeToSend = oldAckBean.getTimeToSend();		//If there is an old ack. This ack will be sent in the old timeToSend.
+				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
 			}
+			
+			ackBean.setTimeToSend(timeToSend);
 
 			//inserting the new ack.
 			retransmitterBeanMgr.insert(ackBean);
+
+			asyncAckTransaction.commit();
 
 			SandeshaUtil.startSenderIfStopped(configCtx);
 		}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Tue Dec 27 05:06:25 2005
@@ -33,6 +33,7 @@
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.Accept;
 import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.CreateSequenceResponse;
@@ -215,6 +216,8 @@
 			retransmitterMgr.update(tempBean);
 		}
 
+		SequenceManager.updateLastActivatedTime(newOutSequenceId,configCtx);
+		
 		updateAppMessagesTransaction.commit();
 		
 		createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Dec 27 05:06:25 2005
@@ -28,6 +28,7 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
@@ -70,6 +71,8 @@
 		TerminateManager.terminateReceivingSide(context,sequenceId);
 		
 		terminateTransaction.commit(); 
+		
+		SequenceManager.updateLastActivatedTime(sequenceId,context);
 
 		//terminateSeqMsg.pause();
 		terminateSeqRMMSg.getMessageContext().setPausedTrue(new QName (Sandesha2Constants.IN_HANDLER_NAME));

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java Tue Dec 27 05:06:25 2005
@@ -47,6 +47,8 @@
 		if (context != null)
 			this.context = context;
 	}
+	
+	public abstract void initStorage ();
 
 	public abstract Transaction getTransaction();
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java Tue Dec 27 05:06:25 2005
@@ -79,6 +79,9 @@
 			if (bean.getSequenceID() != null
 					&& !bean.getSequenceID().equals(temp.getSequenceID()))
 				select = false;
+			
+			if (bean.isInvoked()!=temp.isInvoked())
+				select = false;
 
 			if (select)
 				beans.add(temp);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Tue Dec 27 05:06:25 2005
@@ -16,17 +16,13 @@
  */
 package org.apache.sandesha2.storage.inmemory;
 
-import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
 
 import org.apache.axis2.context.AbstractContext;
 import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.storage.RetransmitterBeanMgrTest;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Tue Dec 27 05:06:25 2005
@@ -74,4 +74,8 @@
 
 		return instance;
 	}
+	
+	public void  initStorage () {
+		
+	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Tue Dec 27 05:06:25 2005
@@ -82,7 +82,13 @@
 					baseInterval);
 		}
 
-		retransmitterBean.setTimeToSend(lastSentTime + newInterval);
+		long newTimeToSend = 0;
+		//newTimeToSend = lastSentTime + newInterval;
+		
+		long timeNow = System.currentTimeMillis();
+		newTimeToSend = timeNow + newInterval;
+		
+		retransmitterBean.setTimeToSend(newTimeToSend);
 
 		return retransmitterBean;
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Tue Dec 27 05:06:25 2005
@@ -9,12 +9,15 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.AbstractContext;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2ClientAPI;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
@@ -97,6 +100,8 @@
 		// message to invoke
 		//this will apply for only in-order invocations.
 
+		updateLastActivatedTime(sequenceId,createSequenceMsg.getMessageContext().getConfigurationContext());
+		
 		return sequenceId;
 	}
 
@@ -140,4 +145,71 @@
 		seqPropMgr.insert(acksToBean);
 
 	}
+	
+	public static void updateLastActivatedTime (String sequenceID, ConfigurationContext configContext) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+		Transaction lastActivatedTransaction = storageManager.getTransaction();
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		SequencePropertyBean lastActivatedBean = sequencePropertyBeanMgr.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		
+		boolean added = false;
+		
+		if (lastActivatedBean==null) {
+			added = true;
+			lastActivatedBean = new SequencePropertyBean ();
+			lastActivatedBean.setSequenceID(sequenceID);
+			lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		}
+		
+		long currentTime = System.currentTimeMillis();
+		lastActivatedBean.setValue(Long.toString(currentTime));
+		
+		if (added)
+			sequencePropertyBeanMgr.insert(lastActivatedBean);
+		else
+			sequencePropertyBeanMgr.update(lastActivatedBean);
+		
+		lastActivatedTransaction.commit();
+	}
+	
+	public static long getLastActivatedTime (String sequenceID, ConfigurationContext configContext) throws SandeshaException {
+		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		
+		long lastActivatedTime = -1;
+		
+		if (lastActivatedBean!=null) {
+			lastActivatedTime = Long.parseLong(lastActivatedBean.getValue());
+		}
+		
+		return lastActivatedTime;
+	}
+		
+	public static boolean hasSequenceTimedOut (String sequenceID, RMMsgContext rmMsgCtx) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
+		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
+			.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
+		if (policyBean == null) {
+			//loading default policies.
+			policyBean = PropertyManager.getInstance().getRMPolicyBean();
+		}
+
+		boolean sequenceTimedOut = false;
+		
+		SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		if (lastActivatedBean!=null) {
+			long lastActivatedTime = Long.parseLong(lastActivatedBean.getValue());
+			long timeNow = System.currentTimeMillis();
+			if (lastActivatedTime+policyBean.getInactiveTimeoutInterval()<timeNow)
+				sequenceTimedOut = true;
+		}
+		
+		return sequenceTimedOut;
+	}
+	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Tue Dec 27 05:06:25 2005
@@ -182,6 +182,7 @@
 							Sequence sequence = (Sequence) rmMsg
 									.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 							if (sequence.getLastMessage() != null) {
+								
 								TerminateManager.terminateAfterInvocation(
 										context, sequenceId);
 								

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Tue Dec 27 05:06:25 2005
@@ -18,6 +18,7 @@
 
 import java.util.Collection;
 import java.util.Iterator;
+
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -25,9 +26,9 @@
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.soap.SOAPEnvelope;
 import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2ClientAPI;
 import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.TerminateManager;
 import org.apache.sandesha2.storage.StorageManager;
@@ -37,11 +38,14 @@
 import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.Sequence;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
 /**
- * This is responsible for sending and re-sending messages of Sandesha2. This represent a thread that keep running all
- * the time. This keep looking at the Sender table to find out any entries that should be sent.
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
  * 
  * @author Chamikara Jayalath <ch...@gmail.com>
  */
@@ -63,29 +67,32 @@
 	public void run() {
 
 		StorageManager storageManager = null;
-		
+
 		try {
-			storageManager = SandeshaUtil
-			.getSandeshaStorageManager(context);
+			storageManager = SandeshaUtil.getSandeshaStorageManager(context);
 		} catch (SandeshaException e2) {
 			// TODO Auto-generated catch block
 			System.out.println("ERROR: Could not start sender");
 			e2.printStackTrace();
 			return;
 		}
-		
+
 		while (senderStarted) {
 			try {
 				if (context == null)
 					throw new SandeshaException(
 							"Can't continue the Sender. Context is null");
 
+				Transaction pickMessagesToSendTransaction = storageManager.getTransaction(); //starting
+																			   // a
+																			   // new
+																			   // transaction
 
-				Transaction sendTransaction = storageManager.getTransaction(); //starting a new transaction
-				
-				SenderBeanMgr mgr = storageManager
-						.getRetransmitterBeanMgr();
+				SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr();
 				Collection coll = mgr.findMsgsToSend();
+
+				pickMessagesToSendTransaction.commit();
+				
 				Iterator iter = coll.iterator();
 
 				while (iter.hasNext()) {
@@ -96,9 +103,10 @@
 							.getStoredMessageContext(key);
 
 					try {
-						
-						if (msgCtx==null) {
-							System.out.println("ERROR: Sender has an Unavailable Message entry");
+
+						if (msgCtx == null) {
+							System.out
+									.println("ERROR: Sender has an Unavailable Message entry");
 							break;
 						}
 						RMMsgContext rmMsgCtx = MsgInitializer
@@ -121,57 +129,100 @@
 												+ "' message.");
 							}
 						}
+						
+						Transaction preSendTransaction = storageManager.getTransaction();
 
-						if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+						int messageType = rmMsgCtx.getMessageType();
+						
+						if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
+							
+							Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+							String sequenceID = sequence.getIdentifier().getIdentifier();
+							//checking weather the sequence has been timed out.
+							boolean sequenceTimedOut = SequenceManager.hasSequenceTimedOut (sequenceID, rmMsgCtx);;
+							if (sequenceTimedOut) {
+								//sequence has been timed out.
+								//do time out processing.
+								
+								TerminateManager.terminateSendingSide(context,sequenceID);
+								throw new SandeshaException ("Sequence timed out");
+							}
+							
 							//piggybacking if an ack if available for the same
 							// sequence.
 							AcknowledgementManager
 									.piggybackAckIfPresent(rmMsgCtx);
+							
 						}
+						
+						preSendTransaction.commit();
 
 						try {
-							AxisEngine engine = new AxisEngine (msgCtx.getConfigurationContext());
-							engine.send(msgCtx);
-//							if (msgCtx.isPaused())
-//								engine.resumeSend(msgCtx);
-//							else
-//								engine.send(msgCtx);
 							
+							AxisEngine engine = new AxisEngine(msgCtx
+									.getConfigurationContext());
+							engine.send(msgCtx);
+							//							if (msgCtx.isPaused())
+							//								engine.resumeSend(msgCtx);
+							//							else
+							//								engine.send(msgCtx);
+
 						} catch (Exception e) {
 							//Exception is sending. retry later
 							System.out
 									.println("Exception thrown in sending...");
 							e.printStackTrace();
+							//e.printStackTrace();
+
 						}
+						
+						Transaction postSendTransaction = storageManager.getTransaction();
 
 						MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
+
+						if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+							Sequence sequence = (Sequence) rmMsgCtx
+									.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+							long messageNo = sequence.getMessageNumber()
+									.getMessageNumber();
+						}
+
 						retransmitterAdjuster.adjustRetransmittion(bean);
 
-//						mgr.update(bean);
-						
-						if (bean.isReSend())
-							mgr.update(bean);
-						else
-							mgr.delete(bean.getMessageID());
-						
-						sendTransaction.commit();		//commiting the current transaction
+						//update or delete only if the object is still present.
+						SenderBean bean1 = mgr.retrieve(bean.getMessageID());
+						if (bean1 != null) {
+							if (bean.isReSend())
+								mgr.update(bean);
+							else
+								mgr.delete(bean.getMessageID());
+						}
+
+						postSendTransaction.commit(); //commiting the current
+												  // transaction
 
-						Transaction processResponseTransaction = storageManager.getTransaction();
+						Transaction processResponseTransaction =
+						storageManager.getTransaction();
 						if (!msgCtx.isServerSide())
 							checkForSyncResponses(msgCtx);
-						
+												
 						processResponseTransaction.commit();
-						
-						Transaction terminateCleaningTransaction = storageManager.getTransaction();
-						if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+
+						Transaction terminateCleaningTransaction = storageManager
+								.getTransaction();
+						if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
 							//terminate sending side.
-							TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-							String sequenceID = terminateSequence.getIdentifier().getIdentifier();
-							ConfigurationContext configContext = msgCtx.getConfigurationContext();
-							
-							TerminateManager.terminateSendingSide(configContext,sequenceID);
+							TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx
+									.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+							String sequenceID = terminateSequence
+									.getIdentifier().getIdentifier();
+							ConfigurationContext configContext = msgCtx
+									.getConfigurationContext();
+
+							TerminateManager.terminateSendingSide(
+									configContext, sequenceID);
 						}
-						
+
 						terminateCleaningTransaction.commit();
 
 					} catch (AxisFault e1) {
@@ -179,30 +230,15 @@
 					} catch (Exception e3) {
 						e3.printStackTrace();
 					}
-
-					//changing the values of the sent bean.
-					//bean.setLastSentTime(System.currentTimeMillis());
-					//bean.setSentCount(bean.getSentCount() + 1);
-
-					//update if resend=true otherwise delete. (reSend=false
-					// means
-					// send only once).
-//					if (bean.isReSend())
-//						mgr.update(bean);
-//					else
-//						mgr.delete(bean.getMessageID());
-
 				}
-				
-				
-				
+
 			} catch (SandeshaException e) {
 				e.printStackTrace();
 				return;
 			}
 
 			try {
-				Thread.sleep(2000);
+				Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
 			} catch (InterruptedException e1) {
 				//e1.printStackTrace();
 				System.out.println("Sender was interupted...");
@@ -248,58 +284,61 @@
 
 	}
 
-	private void checkForSyncResponses(MessageContext msgCtx)  {
+	private void checkForSyncResponses(MessageContext msgCtx) {
 
 		try {
-		boolean responsePresent = (msgCtx
-				.getProperty(MessageContext.TRANSPORT_IN) != null);
+			boolean responsePresent = (msgCtx
+					.getProperty(MessageContext.TRANSPORT_IN) != null);
 
-		if (responsePresent) {
-			//create the response
-			MessageContext response = new MessageContext(msgCtx
-					.getConfigurationContext(), msgCtx.getSessionContext(), msgCtx
-					.getTransportIn(), msgCtx.getTransportOut());
-			response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
-					.getProperty(MessageContext.TRANSPORT_IN));
-
-			response.setServerSide(false);
-
-			//If request is REST we assume the response is REST, so set the
-			// variable
-			response.setDoingREST(msgCtx.isDoingREST());
-			response
-					.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
-			response.setServiceGroupContext(msgCtx.getServiceGroupContext());
-			response.setServiceContext(msgCtx.getServiceContext());
-			response.setAxisService(msgCtx.getAxisService());
-			response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
-
-			//setting the in-flow.
-			//ArrayList inPhaseHandlers =
-			// response.getAxisOperation().getRemainingPhasesInFlow();
-			/*
-			 * if (inPhaseHandlers==null || inPhaseHandlers.isEmpty()) {
-			 * ArrayList phases =
-			 * msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
-			 * response.getAxisOperation().setRemainingPhasesInFlow(phases); }
-			 */
-
-			//Changed following from TransportUtils to SandeshaUtil since op.
-			// context is anavailable.
-			SOAPEnvelope resenvelope = null;
-			resenvelope = SandeshaUtil.createSOAPMessage(response, msgCtx
-					.getEnvelope().getNamespace().getName());
-
-
-			if (resenvelope != null) {
-				AxisEngine engine = new AxisEngine(msgCtx.getConfigurationContext());
-				response.setEnvelope(resenvelope);
-				engine.receive(response);
+			if (responsePresent) {
+				//create the response
+				MessageContext response = new MessageContext(msgCtx
+						.getConfigurationContext(), msgCtx.getSessionContext(),
+						msgCtx.getTransportIn(), msgCtx.getTransportOut());
+				response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
+						.getProperty(MessageContext.TRANSPORT_IN));
+
+				response.setServerSide(false);
+
+				//If request is REST we assume the response is REST, so set the
+				// variable
+				response.setDoingREST(msgCtx.isDoingREST());
+				response.setServiceGroupContextId(msgCtx
+						.getServiceGroupContextId());
+				response
+						.setServiceGroupContext(msgCtx.getServiceGroupContext());
+				response.setServiceContext(msgCtx.getServiceContext());
+				response.setAxisService(msgCtx.getAxisService());
+				response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
+
+				//setting the in-flow.
+				//ArrayList inPhaseHandlers =
+				// response.getAxisOperation().getRemainingPhasesInFlow();
+				/*
+				 * if (inPhaseHandlers==null || inPhaseHandlers.isEmpty()) {
+				 * ArrayList phases =
+				 * msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
+				 * response.getAxisOperation().setRemainingPhasesInFlow(phases); }
+				 */
+
+				//Changed following from TransportUtils to SandeshaUtil since
+				// op.
+				// context is anavailable.
+				SOAPEnvelope resenvelope = null;
+				resenvelope = SandeshaUtil.createSOAPMessage(response, msgCtx
+						.getEnvelope().getNamespace().getName());
+
+				if (resenvelope != null) {
+					AxisEngine engine = new AxisEngine(msgCtx
+							.getConfigurationContext());
+					response.setEnvelope(resenvelope);
+					engine.receive(response);
+				}
 			}
-		}
-		
-		}catch (Exception e) {
-			System.out.println("Exception was throws in processing the sync response...");
+
+		} catch (Exception e) {
+			System.out
+					.println("Exception was throws in processing the sync response...");
 		}
 	}
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org