You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/05/12 21:16:35 UTC

svn commit: r405839 [2/2] - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/ handlers/ msgprocessors/ util/ workers/

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Fri May 12 12:16:32 2006
@@ -20,6 +20,7 @@
 import javax.xml.namespace.QName;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContextFactory;
@@ -34,6 +35,7 @@
 import org.apache.sandesha2.msgprocessors.MsgProcessor;
 import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.wsrm.Sequence;
@@ -63,42 +65,71 @@
 			log.debug(message);
 			throw new AxisFault(message);
 		}
-		
-		// getting rm message
-		RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
 
-		String DONE = (String) msgCtx
-				.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+		String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
 		if (null != DONE && "true".equals(DONE))
 			return;
 
-		msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-		
-		String dummyMessageString = (String) msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
-		boolean dummyMessage = false;
-		if (dummyMessageString!=null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
-			dummyMessage = true;
-		
+		msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
 
-		MsgProcessor msgProcessor = null;
-		int messageType = rmMsgCtx.getMessageType();
-		if (messageType==Sandesha2Constants.MessageTypes.UNKNOWN) {
-			MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
-			if (requestMsgCtx!=null) {  //for the server side
-				RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
-				Sequence sequencePart = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-				if (sequencePart!=null)
-					msgProcessor = new ApplicationMsgProcessor ();// a rm intended message.
-			} else if (!msgCtx.isServerSide()) //if client side.
-			    msgProcessor = new ApplicationMsgProcessor ();
-		}else  {
-			msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+		boolean withinTransaction = false;
+		String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+			withinTransaction = true;
 		}
+
+		Transaction transaction = null;
+		if (!withinTransaction) {
+			transaction = storageManager.getTransaction();
+			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+		}
+		boolean rolebacked = false;
 		
-		if (msgProcessor!=null)
-			msgProcessor.processOutMessage(rmMsgCtx);
-	
+		try {
+			// getting rm message
+			RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+			String dummyMessageString = (String) msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
+			boolean dummyMessage = false;
+			if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
+				dummyMessage = true;
+
+			MsgProcessor msgProcessor = null;
+			int messageType = rmMsgCtx.getMessageType();
+			if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
+				MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(
+						OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+				if (requestMsgCtx != null) { // for the server side
+					RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
+					Sequence sequencePart = (Sequence) reqRMMsgCtx
+							.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+					if (sequencePart != null)
+						msgProcessor = new ApplicationMsgProcessor();// a rm
+																		// intended
+																		// message.
+				} else if (!msgCtx.isServerSide()) // if client side.
+					msgProcessor = new ApplicationMsgProcessor();
+			} else {
+				msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+			}
+
+			if (msgProcessor != null)
+				msgProcessor.processOutMessage(rmMsgCtx);
+
+		} catch (Exception e) {
+			if (!withinTransaction) {
+				transaction.rollback();
+				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+				rolebacked = true;
+			}
+		} finally {
+			if (!withinTransaction && !rolebacked) {
+				transaction.commit();
+				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+			}
+		}
 	}
 
 	public QName getName() {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Fri May 12 12:16:32 2006
@@ -39,7 +39,6 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -64,7 +63,6 @@
 	
 	public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
 		
-		
 		AckRequested ackRequested = (AckRequested) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
 		if (ackRequested==null) {
 			throw new SandeshaException ("Message identified as of type ackRequested does not have an AckRequeted element");
@@ -79,9 +77,11 @@
 		String sequenceID = ackRequested.getIdentifier().getIdentifier();
 		
 		ConfigurationContext configurationContext = rmMsgCtx.getMessageContext().getConfigurationContext();
+		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
-		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
 		
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+			
 		//Setting the ack depending on AcksTo.
 		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceID,
 				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
@@ -139,8 +139,6 @@
 		ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION,
 				msgContext.getProperty(AddressingConstants.WS_ADDRESSING_VERSION));  //TODO do this in the RMMsgCreator
 		
-//		RMMsgContext ackRMMsgCtx = rmm
-
 		String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configurationContext);
 		String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
 		
@@ -170,15 +168,15 @@
 
 			rmMsgCtx.getMessageContext().setProperty(
 					Sandesha2Constants.ACK_WRITTEN, "true");
+			
 			try {
 				engine.send(ackRMMsgCtx.getMessageContext());
 			} catch (AxisFault e1) {
 				throw new SandeshaException(e1.getMessage());
 			}
+			
 		} else {
 
-			Transaction asyncAckTransaction = storageManager.getTransaction();
-
 			SenderBeanMgr retransmitterBeanMgr = storageManager
 					.getRetransmitterBeanMgr();
 
@@ -238,8 +236,6 @@
 			//inserting the new ack.
 			retransmitterBeanMgr.insert(ackBean);
 
-			asyncAckTransaction.commit();
-
 			//passing the message through sandesha2sender
 
 			ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
@@ -255,6 +251,7 @@
 			} catch (AxisFault e) {
 				throw new SandeshaException (e.getMessage());
 			}
+			
 			
 			SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
 			

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Fri May 12 12:16:32 2006
@@ -31,7 +31,6 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -65,6 +64,8 @@
 			throw new SandeshaException(message);
 		}
 		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
+		
 		MessageContext msgCtx = rmMsgCtx.getMessageContext();
 		ConfigurationContext configCtx = msgCtx.getConfigurationContext();
 		
@@ -72,16 +73,12 @@
 		sequenceAck.setMustUnderstand(false);
 		rmMsgCtx.addSOAPEnvelope();
 
-		StorageManager storageManager = SandeshaUtil
-				.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
-						.getConfigurationContext());
+
 		SenderBeanMgr retransmitterMgr = storageManager
 				.getRetransmitterBeanMgr();
 		SequencePropertyBeanMgr seqPropMgr = storageManager
 				.getSequencePropretyBeanMgr();
 
-
-
 		Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges()
 				.iterator();
 
@@ -96,6 +93,7 @@
 		FaultManager faultManager = new FaultManager();
 		RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,outSequenceId);
 		if (faultMessageContext != null) {
+			
 			ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
 			AxisEngine engine = new AxisEngine(configurationContext);
 			
@@ -105,11 +103,13 @@
 				throw new SandeshaException ("Could not send the fault message",e);
 			}
 			
+			msgCtx.pause();
 			return;
 		}
 		
 		faultMessageContext = faultManager.checkForInvalidAcknowledgement(rmMsgCtx);
 		if (faultMessageContext != null) {
+			
 			ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
 			AxisEngine engine = new AxisEngine(configurationContext);
 			
@@ -119,25 +119,22 @@
 				throw new SandeshaException ("Could not send the fault message",e);
 			}
 			
+			msgCtx.pause();
 			return;
 		}
 		
         String internalSequenceID = SandeshaUtil.getSequenceProperty(outSequenceId,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configCtx);
 		
         //updating the last activated time of the sequence.
-		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
 		SequenceManager.updateLastActivatedTime(internalSequenceID,rmMsgCtx.getMessageContext().getConfigurationContext());
-		lastUpdatedTimeTransaction.commit();
 		
-		//Starting transaction
-		Transaction ackTransaction = storageManager.getTransaction();
-
 		SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(
 				outSequenceId, Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
 
 		if (internalSequenceBean == null || internalSequenceBean.getValue() == null) {
 			String message = "TempSequenceId is not set correctly";
 			log.debug(message);
+			
 			throw new SandeshaException(message);
 		}
 
@@ -218,9 +215,6 @@
 		allCompletedMsgsBean.setValue(str);
 		
 		seqPropMgr.update(allCompletedMsgsBean);		
-		
-		//commiting transaction
-		ackTransaction.commit();
 		
 		String lastOutMsgNoStr = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,configCtx);
 		if (lastOutMsgNoStr!=null ) {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Fri May 12 12:16:32 2006
@@ -42,7 +42,6 @@
 import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
@@ -127,8 +126,6 @@
 				.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
 						.getConfigurationContext());
 
-
-
 		FaultManager faultManager = new FaultManager();
 		RMMsgContext faultMessageContext = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx);
 		if (faultMessageContext != null) {
@@ -141,6 +138,7 @@
 				throw new SandeshaException ("Could not send the fault message",e);
 			}
 			
+			msgCtx.pause();
 			return;
 		}
 		
@@ -169,6 +167,7 @@
 				throw new SandeshaException ("Could not send the fault message",e);
 			}
 			
+			msgCtx.pause();
 			return;
 		}
 		
@@ -191,14 +190,8 @@
 			return;
 		}
 
-		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
-		
 		//updating the last activated time of the sequence.
 		SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
-		lastUpdatedTimeTransaction.commit();
-		
-		Transaction updataMsgStringTransaction = storageManager
-				.getTransaction();
 		
 		SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
 				Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
@@ -263,16 +256,13 @@
 		msgsBean.setValue(messagesStr);
 		seqPropMgr.update(msgsBean);
 
-		updataMsgStringTransaction.commit();
-
-		Transaction invokeTransaction = storageManager.getTransaction();
-
 		//	Pause the messages bean if not the right message to invoke.
 		NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
 		NextMsgBean bean = mgr.retrieve(sequenceId);
 
-		if (bean == null)
+		if (bean == null) {
 			throw new SandeshaException("Error- The sequence does not exist");
+		}
 
 		InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
 		
@@ -281,8 +271,6 @@
 		
 		if (inOrderInvocation) {
 			
-			//pause the message
-			rmMsgCtx.pause();
 			
 			SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr
 					.retrieve(
@@ -299,6 +287,7 @@
 				incomingSequenceListBean.setValue(incomingSequenceList
 						.toString());
 
+				//this get inserted before
 				seqPropMgr.insert(incomingSequenceListBean);
 			}
 
@@ -312,7 +301,7 @@
 				//saving the property.
 				incomingSequenceListBean.setValue(incomingSequenceList
 						.toString());
-				seqPropMgr.insert(incomingSequenceListBean);
+				seqPropMgr.update(incomingSequenceListBean);
 			}
 
 			//saving the message.
@@ -330,6 +319,9 @@
 			} catch (Exception ex) {
 				throw new SandeshaException(ex.getMessage());
 			}
+			
+			//pause the message
+			rmMsgCtx.pause();
 
 			//Starting the invoker if stopped.
 			SandeshaUtil
@@ -337,8 +329,6 @@
 
 		}
 
-		invokeTransaction.commit();
-
 		//Sending acknowledgements
 		sendAckIfNeeded(rmMsgCtx, messagesStr);
 
@@ -367,7 +357,7 @@
 				.getSandeshaStorageManager(msgCtx.getConfigurationContext());
 		SequencePropertyBeanMgr seqPropMgr = storageManager
 				.getSequencePropretyBeanMgr();
-
+		
 		Sequence sequence = (Sequence) rmMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 		String sequenceId = sequence.getIdentifier().getIdentifier();
@@ -416,7 +406,6 @@
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
 
-		Transaction outHandlerTransaction = storageManager.getTransaction();
 		boolean serverSide = msgContext.isServerSide();  
 
 		// setting message Id if null
@@ -590,16 +579,18 @@
 				throw new SandeshaException (e);
 			}
 			
-			if (requestMessageContext==null) 
+			if (requestMessageContext==null) {
 				throw new SandeshaException ("Request message context is null, cant find out the request side sequenceID");
+			}
 			
 			RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(requestMessageContext);
 			Sequence sequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 			
 			String requestSequenceID = sequence.getIdentifier().getIdentifier();
 			SequencePropertyBean specVersionBean = seqPropMgr.retrieve(requestSequenceID,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
-			if (specVersionBean==null) 
+			if (specVersionBean==null) {
 				throw new SandeshaException ("SpecVersion sequence property bean is not available for the incoming sequence. Cant find the RM version for outgoing side");
+			}
 			
 			specVersion = specVersionBean.getValue();
 		} else {
@@ -744,8 +735,6 @@
 			processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber,storageKey);
 		
 		msgContext.pause();  // the execution will be stopped.
-		outHandlerTransaction.commit();		
-
 	}
 	
 	private void addCreateSequenceMessage(RMMsgContext applicationRMMsg,

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Fri May 12 12:16:32 2006
@@ -11,7 +11,6 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
@@ -45,13 +44,12 @@
 				throw new SandeshaException ("Could not send the fault message",e);
 			}
 			
+			msgCtx.pause();
 			return;
 		}
 		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx);
 		
-		Transaction closeSequenceTransaction = storageManager.getTransaction();
-		
 		SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropretyBeanMgr();
 		SequencePropertyBean sequenceClosedBean = new SequencePropertyBean ();
 		sequenceClosedBean.setSequenceID(sequenceID);
@@ -107,9 +105,6 @@
 			String message = "Could not send the terminate sequence response";
 			throw new SandeshaException (message,e);
 		}
-		
-		
-		closeSequenceTransaction.commit();
 	}
 	
 	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Fri May 12 12:16:32 2006
@@ -34,7 +34,6 @@
 import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
@@ -82,6 +81,7 @@
 				throw new SandeshaException ("Could not send the fault message",e);
 			}
 			
+			createSeqMsg.pause();
 			return;
 		}
 
@@ -92,7 +92,6 @@
 		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
-		Transaction createSequenceTransaction = storageManager.getTransaction();   //begining of a new transaction
 
 		try {
 			String newSequenceId = SequenceManager.setupNewSequence(createSeqRMMsg);  //newly created sequnceID.
@@ -170,11 +169,8 @@
 			outMessage.setResponseWritten(true);
 
 			//commiting tr. before sending the response msg.
-			createSequenceTransaction.commit();
 			
-			Transaction updateLastActivatedTransaction = storageManager.getTransaction();
 			SequenceManager.updateLastActivatedTime(newSequenceId,createSeqRMMsg.getMessageContext().getConfigurationContext());
-			updateLastActivatedTransaction.commit();
 			
 			AxisEngine engine = new AxisEngine(context);
 			engine.send(outMessage);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri May 12 12:16:32 2006
@@ -75,7 +75,7 @@
 			.getSandeshaStorageManager(configCtx);
 		
 		//Processing for ack if available
-		Transaction ackProcessTransaction = storageManager.getTransaction();
+///		Transaction ackProcessTransaction = storageManager.getTransaction();
 		
 		SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) createSeqResponseRMMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
@@ -84,11 +84,11 @@
 			ackProcessor.processInMessage(createSeqResponseRMMsgCtx);
 		}
 
-		ackProcessTransaction.commit();
+///		ackProcessTransaction.commit();
 		
 		//Processing the create sequence response.
 		
-		Transaction createSeqResponseTransaction = storageManager.getTransaction();
+///		Transaction createSeqResponseTransaction = storageManager.getTransaction();
 		
 		CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
@@ -154,10 +154,10 @@
 		sequencePropMgr.insert(outSequenceBean);
 		sequencePropMgr.insert(internalSequenceBean);
 
-		createSeqResponseTransaction.commit();
+///		createSeqResponseTransaction.commit();
 		
 		
-		Transaction offerProcessTransaction = storageManager.getTransaction();
+///		Transaction offerProcessTransaction = storageManager.getTransaction();
 		
 		//processing for accept (offer has been sent)
 		Accept accept = createSeqResponsePart.getAccept();
@@ -217,9 +217,9 @@
 			
 		}
 
-		offerProcessTransaction.commit();
+///		offerProcessTransaction.commit();
 		
-		Transaction updateAppMessagesTransaction = storageManager.getTransaction();
+///		Transaction updateAppMessagesTransaction = storageManager.getTransaction();
 		
 		SenderBean target = new SenderBean();
 		target.setInternalSequenceID(internalSequenceId);
@@ -282,11 +282,11 @@
 			storageManager.updateMessageContext(key,applicationMsg);
 		}
 
-		updateAppMessagesTransaction.commit();
+///		updateAppMessagesTransaction.commit();
 		
-		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
+///		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
 		SequenceManager.updateLastActivatedTime(internalSequenceId,configCtx);
-		lastUpdatedTimeTransaction.commit();
+///		lastUpdatedTimeTransaction.commit();
 		
 		createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
 				.setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Fri May 12 12:16:32 2006
@@ -39,7 +39,6 @@
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -106,6 +105,8 @@
 			} catch (AxisFault e) {
 				throw new SandeshaException ("Could not send the fault message",e);
 			}
+			
+			terminateSeqMsg.pause();
 			return;
 		}
 		
@@ -113,7 +114,6 @@
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
 		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
 
-		Transaction terminateReceivedTransaction = storageManager.getTransaction();
 		SequencePropertyBean terminateReceivedBean = new SequencePropertyBean ();
 		terminateReceivedBean.setSequenceID(sequenceId);
 		terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
@@ -127,9 +127,6 @@
 		
 		setUpHighestMsgNumbers(context,storageManager,sequenceId,terminateSeqRMMsg);
 		
-		terminateReceivedTransaction.commit();
-		
-		Transaction terminateTransaction = storageManager.getTransaction();
 		TerminateManager.cleanReceivingSideOnTerminateMessage(context,sequenceId);
 		
 		
@@ -139,16 +136,12 @@
 		sequencePropertyBeanMgr.insert(terminatedBean);
 		
 		
-		terminateTransaction.commit(); 
-		
 		//removing an entry from the listener
 		String transport = terminateSeqMsg.getTransportIn().getName().getLocalPart();
 	
-		Transaction lastUpdatedTransaction = storageManager.getTransaction();
 		SequenceManager.updateLastActivatedTime(sequenceId,context);
 		
-		lastUpdatedTransaction.commit();		
-		terminateSeqRMMsg.pause();
+		terminateSeqMsg.pause();
 	}
 
 	private void setUpHighestMsgNumbers (ConfigurationContext configCtx, StorageManager storageManager, String sequenceID, RMMsgContext terminateRMMsg) throws SandeshaException {
@@ -281,7 +274,7 @@
         if (outSequenceID==null)
         	throw new SandeshaException ("SequenceID was not found. Cannot send the terminate message");
         
-		Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+///		Transaction addTerminateSeqTransaction = storageManager.getTransaction();
 		
 		String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
 				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED,configurationContext);
@@ -384,7 +377,7 @@
 		rmMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
 		rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
 		rmMsgCtx.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
-		addTerminateSeqTransaction.commit();
+///		addTerminateSeqTransaction.commit();
 		
 	    AxisEngine engine = new AxisEngine (configurationContext);
 	    try {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java Fri May 12 12:16:32 2006
@@ -291,7 +291,7 @@
 			
 		} else {
 
-			Transaction asyncAckTransaction = storageManager.getTransaction();
+///			Transaction asyncAckTransaction = storageManager.getTransaction();
 
 			SenderBeanMgr retransmitterBeanMgr = storageManager
 					.getRetransmitterBeanMgr();
@@ -338,7 +338,7 @@
 			
 			//inserting the new ack.
 			retransmitterBeanMgr.insert(ackBean);
-			asyncAckTransaction.commit();
+///			asyncAckTransaction.commit();
 
 			//passing the message through sandesha2sender
 			ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java Fri May 12 12:16:32 2006
@@ -411,6 +411,9 @@
 			} else {
 				SequencePropertyBeanMgr seqPropMgr = storageManager
 						.getSequencePropretyBeanMgr();
+				
+				//TODO get the acksTo value using the property key.
+				
 				String sequenceId = data.getSequenceId();
 				SequencePropertyBean acksToBean = seqPropMgr.retrieve(
 						sequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Fri May 12 12:16:32 2006
@@ -22,6 +22,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.client.SandeshaClient;
 import org.apache.sandesha2.client.SandeshaClientConstants;
@@ -132,6 +133,8 @@
 	
 	private void finalizeTimedOutSequence (String internalSequenceID, String sequenceID ,MessageContext messageContext) throws SandeshaException {
 		ConfigurationContext configurationContext = messageContext.getConfigurationContext();
+		
+		configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,messageContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
 		SequenceReport report = SandeshaClient.getOutgoingSequenceReport(internalSequenceID ,configurationContext);
 		TerminateManager.timeOutSendingSideSequence(configurationContext,internalSequenceID, false);
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java Fri May 12 12:16:32 2006
@@ -166,7 +166,7 @@
 		try {
 			processor.setup();
 		} catch (NoSuchMethodException e) {
-			throw new SandeshaException(e.getMessage());
+			throw new SandeshaException(e);
 		}
 
 		processor.processPolicy(policy);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Fri May 12 12:16:32 2006
@@ -419,7 +419,7 @@
 	
 	public static long getOutGoingSequenceAckedMessageCount (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
-		Transaction transaction = storageManager.getTransaction();
+///		Transaction transaction = storageManager.getTransaction();
 		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		
 		SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
@@ -447,14 +447,14 @@
 			return 0; //No acknowledgement has been received yet.
 		
 		long noOfMessagesAcked = Long.parseLong(ackedMsgBean.getValue());
-		transaction.commit();
+///		transaction.commit();
 		
 		return noOfMessagesAcked;
 	}
 	
 	public static boolean isOutGoingSequenceCompleted (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
-		Transaction transaction = storageManager.getTransaction();
+///		Transaction transaction = storageManager.getTransaction();
 		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		
 		SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
@@ -484,14 +484,14 @@
 		if ("true".equals(terminateAddedBean.getValue()))
 			return true;
 
-		transaction.commit();
+///		transaction.commit();
 		return false;
 	}
 	
 	public static boolean isIncomingSequenceCompleted (String sequenceID, ConfigurationContext configurationContext) throws SandeshaException {
 		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
-		Transaction transaction = storageManager.getTransaction();
+///		Transaction transaction = storageManager.getTransaction();
 		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		
 		SequencePropertyBean terminateReceivedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
@@ -500,7 +500,7 @@
 		if (terminateReceivedBean!=null && "true".equals(terminateReceivedBean.getValue()))
 			complete = true;
 		
-		transaction.commit();
+///		transaction.commit();
 		return complete;
 	}
 	

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java Fri May 12 12:16:32 2006
@@ -240,8 +240,8 @@
 		if (Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
 			deleatable = false;
 		
-		if (Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
-			deleatable = false;
+//		if (Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
+//			deleatable = false;
 		
 		if (Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED.equals(name))
 			deleatable = false;
@@ -316,6 +316,7 @@
 			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
 			doUpdatesIfNeeded (outSequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
 			
+			//TODO all properties which hv the temm:Seq:id as the key should be deletable.
 			if (isProportyDeletable(sequencePropertyBean.getName())) {
 				sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
 			}
@@ -330,7 +331,7 @@
 		StorageManager storageManager = SandeshaUtil
 				.getSandeshaStorageManager(configurationContext);
 
-		Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+///		Transaction addTerminateSeqTransaction = storageManager.getTransaction();
 		
 		SequencePropertyBeanMgr seqPropMgr = storageManager
 				.getSequencePropretyBeanMgr();
@@ -424,7 +425,7 @@
 		terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
 		terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
 		terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
-		addTerminateSeqTransaction.commit();
+///		addTerminateSeqTransaction.commit();
 		
 	    AxisEngine engine = new AxisEngine (configurationContext);
 	    try {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Fri May 12 12:16:32 2006
@@ -100,43 +100,44 @@
 				log.debug(ex.getMessage());
 			}
 
+			Transaction transaction = null;
+			boolean rolebacked = false;
+			
 			try {
-				StorageManager storageManager = SandeshaUtil
-						.getSandeshaStorageManager(context);
+				StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
 				NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
 
-				InvokerBeanMgr storageMapMgr = storageManager
-						.getStorageMapBeanMgr();
+				InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
 
 				SequencePropertyBeanMgr sequencePropMgr = storageManager
 						.getSequencePropretyBeanMgr();
 
-				Transaction preInvocationTransaction = storageManager.getTransaction();
+				transaction = storageManager.getTransaction();
 				
 				//Getting the incomingSequenceIdList
 				SequencePropertyBean allSequencesBean = (SequencePropertyBean) sequencePropMgr
 						.retrieve(
 								Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
 								Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-				if (allSequencesBean == null)
-					continue;
-
-				ArrayList allSequencesList = SandeshaUtil.getArrayListFromString (allSequencesBean
-						.getValue());
 				
-				preInvocationTransaction.commit();
+				if (allSequencesBean == null) {
+					continue;
+				}
+				ArrayList allSequencesList = SandeshaUtil.getArrayListFromString (allSequencesBean.getValue());
 				
 				Iterator allSequencesItr = allSequencesList.iterator();
-
+				
 				currentIteration: while (allSequencesItr.hasNext()) {
-
 					String sequenceId = (String) allSequencesItr.next();
 					
-					Transaction invocationTransaction = storageManager.getTransaction();   //Transaction based invocation
+					//commiting the old transaction
+					transaction.commit();
+					
+					//starting a new transaction for the new iteration.
+					transaction = storageManager.getTransaction();
 					
 					NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
 					if (nextMsgBean == null) {
-
 						String message = "Next message not set correctly. Removing invalid entry.";
 						log.debug(message);
 						allSequencesItr.remove();
@@ -144,14 +145,12 @@
 						//cleaning the invalid data of the all sequences.
 						allSequencesBean.setValue(allSequencesList.toString());
 						sequencePropMgr.update(allSequencesBean);	
-						
-						throw new SandeshaException (message);
+						continue;
 					}
 
 					long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
 					if (nextMsgno <= 0) { 
-						String message = "Invalid messaage number as the Next Message Number. Removing invalid entry";
-						
+						String message = "Invalid message number as the Next Message Number.";
 						throw new SandeshaException(message);
 					}
 
@@ -163,57 +162,36 @@
 					
 					while (stMapIt.hasNext()) {
 
-						InvokerBean stMapBean = (InvokerBean) stMapIt
-								.next();
+						InvokerBean stMapBean = (InvokerBean) stMapIt.next();
 						String key = stMapBean.getMessageContextRefKey();
 
-
 						MessageContext msgToInvoke = storageManager.retrieveMessageContext(key,context);
+						RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
 
-						RMMsgContext rmMsg = MsgInitializer
-								.initializeMessage(msgToInvoke);
-						Sequence seq = (Sequence) rmMsg
-								.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
-						long msgNo = seq.getMessageNumber().getMessageNumber();
-
+						//have to commit the transaction before invoking. This may get changed when WS-AT is available.
+						transaction.commit();
+						
 						try {
-							//Invoking the message.
-
-							//currently Transaction based invocation can be supplied only for the in-only case.
-							
-							if (!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern())) {
-								invocationTransaction.commit();
-							}
-							
+							//Invoking the message.														
+							msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
 							new AxisEngine (msgToInvoke.getConfigurationContext())
 									.resume(msgToInvoke);
 							invoked = true;
-							
-							if (!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern())) {
-								invocationTransaction = storageManager.getTransaction();
-							}						
-
 							storageMapMgr.delete(key);
 						} catch (AxisFault e) {
 							throw new SandeshaException(e);
+						} finally {
+							transaction = storageManager.getTransaction();
 						}
-
+						
 						//undating the next msg to invoke
 
-
 						if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
 							Sequence sequence = (Sequence) rmMsg
 									.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 							if (sequence.getLastMessage() != null) {
-								
 								TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId);
-								
-								//this sequence has no more invocations
-//								stopInvokerForTheSequence(sequenceId);
-								
 								//exit from current iteration. (since an entry was removed)
-								invocationTransaction.commit();
 								break currentIteration;
 							}
 						}
@@ -223,15 +201,19 @@
 						nextMsgno++;
 						nextMsgBean.setNextMsgNoToProcess(nextMsgno);
 						nextMsgMgr.update(nextMsgBean);
-						invocationTransaction.commit();
-					}
+					}	
 				}
 				
-			} catch (SandeshaException e1) {
+			} catch (Exception e1) {
 				e1.printStackTrace();
+				if (transaction!=null) {
+					transaction.rollback();
+					rolebacked = true;
+				}
+			} finally { 
+				if (!rolebacked && transaction!=null) 
+					transaction.commit();
 			}
 		}
-		
-		int i = 1;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Fri May 12 12:16:32 2006
@@ -58,11 +58,9 @@
 public class Sender extends Thread {
 
 	private boolean runSender = false;
-	private boolean stopSenderAfterWork = false;
 	private ArrayList workingSequences = new ArrayList();
 	private ConfigurationContext context = null;
 	private static final Log log = LogFactory.getLog(Sender.class);
-	private ThreadPool threadPool = new ThreadPool ();
 
 	public synchronized void stopSenderForTheSequence(String sequenceID) {
 		workingSequences.remove(sequenceID);
@@ -103,29 +101,29 @@
 				log.debug("End printing Interrupt...");
 			}
 			
+			Transaction transaction = null;
+			boolean rolebacked = false;
+			
 			try {
 				if (context == null) {
 					String message = "Can't continue the Sender. Context is null";
 					log.debug(message);
 					throw new SandeshaException(message);
 				}
-
-				Transaction pickMessagesToSendTransaction = storageManager.getTransaction();
-
+				
+				transaction = storageManager.getTransaction();
+				
 				SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr();
 				SenderBean senderBean = mgr.getNextMsgToSend();
 				if (senderBean==null) {
-					pickMessagesToSendTransaction.commit();
 					continue;
 			    }
-				
 
 				MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
 				boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context);
-				if (!continueSending)
+				if (!continueSending) {
 					continue;
-				
-				pickMessagesToSendTransaction.commit();
+				}
 				
 				String key = (String) senderBean.getMessageContextRefKey();
 				MessageContext msgCtx = storageManager.retrieveMessageContext(key, context);
@@ -161,8 +159,6 @@
 				
 				updateMessage(msgCtx);
 
-				Transaction preSendTransaction = storageManager.getTransaction();
-
 				int messageType = rmMsgCtx.getMessageType();
 				if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
 					Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -177,16 +173,16 @@
 					AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx);
 				}
 				
-				preSendTransaction.commit();
-				
 				//sending the message
 				TransportOutDescription transportOutDescription = msgCtx.getTransportOut();
 				TransportSender transportSender = transportOutDescription.getSender();
 					
+				//have to commit the transaction before sending. This may get changed when WS-AT is available.
+				transaction.commit();
+				
 				boolean successfullySent = false;
 				if (transportSender != null) {
 					try {
-						
 						//TODO change this to cater for security.
 						transportSender.invoke(msgCtx);
 						successfullySent = true;
@@ -194,11 +190,11 @@
 						// TODO Auto-generated catch block
 					    log.debug("Could not send message");
 						log.debug(e.getStackTrace().toString());
+					} finally {
+						transaction =  storageManager.getTransaction();
 					}
 				}
 
-				Transaction postSendTransaction = storageManager.getTransaction();
-
 				// update or delete only if the object is still present.
 				SenderBean bean1 = mgr.retrieve(senderBean.getMessageID());
 				if (bean1 != null) {
@@ -210,14 +206,13 @@
 						mgr.delete(bean1.getMessageID());
 				}
 
-				postSendTransaction.commit(); // commiting the current transaction
-
+				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+				
 				if (successfullySent) {
 					if (!msgCtx.isServerSide())
 						checkForSyncResponses(msgCtx);
 				}
-
-				Transaction terminateCleaningTransaction = storageManager.getTransaction();
+				
 				if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
 					// terminate sending side.
 					TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
@@ -228,15 +223,22 @@
 					TerminateManager.terminateSendingSide(configContext,internalSequenceID, msgCtx.isServerSide());
 				}
 
-				terminateCleaningTransaction.commit();
+				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
 				
-			} catch (AxisFault e) {
+			} catch (Exception e) {
 				String message = "An Exception was throws in sending";
 				log.debug(message,e);
 				
 				// TODO : when this is the client side throw the exception to
 				// the client when necessary.
 
+				if (transaction!=null) {
+					transaction.rollback();
+					rolebacked = true;
+				}
+			} finally {
+				if (transaction!=null && !rolebacked) 
+					transaction.commit();
 			}
 		}
 	}
@@ -311,7 +313,12 @@
 				log.debug("Valid SOAP envelope not found");
 				log.debug(e.getStackTrace().toString());
 			}
-
+			
+			//if the request msg ctx is withina a transaction, processing if the response should also happen
+			//withing the same transaction
+			responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION
+					,msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
+			
 			if (resenvelope != null) {
 				responseMessageContext.setEnvelope(resenvelope);
 				AxisEngine engine = new AxisEngine(msgCtx



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org