You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2007/05/21 14:17:46 UTC

svn commit: r540130 - in /webservices/sandesha/trunk/java: config/ modules/core/src/main/java/org/apache/sandesha2/client/ modules/core/src/main/java/org/apache/sandesha2/handlers/ modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ modules/...

Author: gatfora
Date: Mon May 21 05:17:40 2007
New Revision: 540130

URL: http://svn.apache.org/viewvc?view=rev&rev=540130
Log:
Allow finer transactional control to stop window where acks are sent before transactions commit.  This reduces the time the transactions hold onto locks.  Finally improve the transaction commit/rollback handling when exceptions are thrown

Modified:
    webservices/sandesha/trunk/java/config/log4j.properties
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
    webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java

Modified: webservices/sandesha/trunk/java/config/log4j.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/config/log4j.properties?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/config/log4j.properties (original)
+++ webservices/sandesha/trunk/java/config/log4j.properties Mon May 21 05:17:40 2007
@@ -1,6 +1,6 @@
 # Set root category priority to INFO and its only appender to CONSOLE.
-#log4j.rootCategory=INFO, CONSOLE
-log4j.rootCategory=DEBUG, CONSOLE, LOGFILE
+log4j.rootCategory=INFO, CONSOLE
+#log4j.rootCategory=DEBUG, CONSOLE, LOGFILE
 
 # Set the enterprise logger category to FATAL and its only appender to CONSOLE.
 #log4j.logger.org.apache.axis2.enterprise=FATAL, CONSOLE

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java Mon May 21 05:17:40 2007
@@ -173,14 +173,15 @@
 
 			sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_ESTABLISHED);
 			fillOutgoingSequenceInfo(sequenceReport, rMSBean, storageManager);
+			
+			if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.commit();
+			reportTransaction = null;
 
 		} catch (Exception e) {
-			if (reportTransaction!=null) {
-				reportTransaction.rollback();
-				reportTransaction = null;
-			}
+			// Just log the exception
+			if(log.isDebugEnabled()) log.debug("Exception", e);
 		} finally {
-			if (reportTransaction!=null) reportTransaction.commit();
+			if (reportTransaction!=null && reportTransaction.isActive()) reportTransaction.rollback();
 		}
 
 		return sequenceReport;
@@ -276,14 +277,15 @@
 				sandeshaReport.addToNoOfCompletedMessagesMap(sequenceID, sequenceReport.getCompletedMessages().size());
 				sandeshaReport.addToSequenceStatusMap(sequenceID, sequenceReport.getSequenceStatus());
 			}
+			
+			if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.commit();
+			reportTransaction = null;
 
 		} catch (Exception e) {
-			if (reportTransaction!=null) {
-				reportTransaction.rollback();
-				reportTransaction = null;
-			}
+			// just log the error
+			if(log.isDebugEnabled()) log.debug("Exception", e);
 		} finally {
-			if (reportTransaction!=null) reportTransaction.commit();
+			if (reportTransaction!=null && reportTransaction.isActive()) reportTransaction.rollback();
 		}
 
 		return sandeshaReport;
@@ -381,9 +383,10 @@
 		if (log.isTraceEnabled())
 			log.trace("Checking if sequence " + internalSequenceId + " previously terminated");
 		
-		Transaction tran = storageManager.getTransaction();
+		Transaction tran = null;
 		
 		try {
+			tran = storageManager.getTransaction();
 			
 			RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
 			//see if the sequence is terminated
@@ -400,16 +403,14 @@
 				// Delete the rmsBean
 				storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
 			}
+			
+			if(tran != null && tran.isActive()) tran.commit();
+			tran = null;
 		
-		} catch (SandeshaException e) {
-			if(tran!=null)
+		} finally {
+			if(tran!=null && tran.isActive())
 				tran.rollback();
-			tran = null;
-			
-			throw e;
-		} 
-		if(tran!=null)
-			tran.commit();
+		}
 	}
 	
 	/**
@@ -495,7 +496,8 @@
 			
 		} catch (AxisFault e) {
 			String message = SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.couldNotSendTerminate, e.toString());
+					SandeshaMessageKeys.couldNotSendTerminate,
+					e.toString());
 			throw new SandeshaException(message, e);
 		} finally {
 			options.setAction(oldAction);
@@ -692,7 +694,7 @@
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
 
 		// Get a transaction to retrieve the properties
-		Transaction transaction = storageManager.getTransaction();
+		Transaction transaction = null;
 		String sequenceID = null;
 		
 		try
@@ -880,13 +882,14 @@
 				invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, allowLaterDeliveryOfMissingMessages);			
 			}
 			
+			if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.commit();
+			reportTransaction = null;
+
 		} catch (Exception e) {
-			if (reportTransaction!=null) {
-				reportTransaction.rollback();
-				reportTransaction = null;
-			}
+			// Just log the exception
+			if(log.isDebugEnabled()) log.debug("Exception", e);
 		} finally {
-			if(reportTransaction != null) reportTransaction.commit();
+			if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.rollback();
 		}
 	}
 
@@ -1023,15 +1026,18 @@
 
 			if(rmdBean.getSecurityTokenData() != null) sequenceReport.setSecureSequence(true);
 			
+			if (reportTransaction!=null && reportTransaction.isActive()) reportTransaction.commit();
+			reportTransaction = null;
+
 			return sequenceReport;
 
 		} catch (Exception e) {
-			if (reportTransaction!=null) {
+			// Just log the exception
+			if(log.isDebugEnabled()) log.debug("Exception", e);
+		} finally {
+			if(reportTransaction != null && reportTransaction.isActive()) {
 				reportTransaction.rollback();
-				reportTransaction = null;
 			}
-		} finally {
-			if (reportTransaction!=null) reportTransaction.commit();
 		}
 
 		return null;
@@ -1365,14 +1371,15 @@
 		String soapNamespaceURI = null;
 		
 		// Get the RMSBean for this sequence.
-		Transaction transaction = storageManager.getTransaction();
+		Transaction transaction = null;
 		
-		try {				
+		try {
+			transaction = storageManager.getTransaction();
 			RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
 			if (rmsBean.getSoapVersion() == Sandesha2Constants.SOAPVersion.v1_2)
 				soapNamespaceURI = SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI;
 		} finally {
-			transaction.commit();
+			if(transaction != null) transaction.commit();
 		}
 		
 		return soapNamespaceURI;

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Mon May 21 05:17:40 2007
@@ -152,9 +152,10 @@
       SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getConfigurationContext(), 
           rmMsgCtx.getConfigurationContext().getAxisConfiguration());
     
-    Transaction transaction = storageManager.getTransaction();
+    Transaction transaction = null;
     
     try {
+      transaction = storageManager.getTransaction();
     
       // Check that both the Sequence header and message body have been secured properly
       RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
@@ -219,11 +220,12 @@
             rmMsgCtx.getMessageContext().getAxisService());
         rmMsgCtx.getMessageContext().setAxisOperation(duplicateMessageOperation);
       }
-      transaction.commit();
+      
+      if(transaction != null && transaction.isActive()) transaction.commit();
       transaction = null;
     }
     finally {
-      if (transaction != null)
+      if (transaction != null && transaction.isActive())
         transaction.rollback();
     }
     if (log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java Mon May 21 05:17:40 2007
@@ -111,7 +111,7 @@
 			MessageValidator.validateMessage(rmMsgCtx, storageManager);
 			
 			// commit the current transaction
-			transaction.commit();
+			if(transaction != null && transaction.isActive()) transaction.commit();
 			transaction = storageManager.getTransaction();
 
 			// Process Ack headers in the message
@@ -119,7 +119,7 @@
 			ackProcessor.processAckHeaders(rmMsgCtx);
 
 			// commit the current transaction
-			transaction.commit();
+			if(transaction != null && transaction.isActive()) transaction.commit();
 			transaction = storageManager.getTransaction();
 
 			// Process Ack Request headers in the message
@@ -133,13 +133,17 @@
 			pendingProcessor.processMessagePendingHeaders(rmMsgCtx);
 
 			// commit the current transaction
-			transaction.commit();
+			if(transaction != null && transaction.isActive()) transaction.commit();
 			transaction = storageManager.getTransaction();
 
 			// Process the Sequence header, if there is one
 			SequenceProcessor seqProcessor = new SequenceProcessor();
-			returnValue = seqProcessor.processSequenceHeader(rmMsgCtx);
+			returnValue = seqProcessor.processSequenceHeader(rmMsgCtx, transaction);
 
+			// commit the current transaction
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
+			
 		} catch (Exception e) {
 			if (log.isDebugEnabled()) 
 				log.debug("SandeshaInHandler::invoke Exception caught during processInMessage", e);
@@ -147,16 +151,6 @@
 			msgCtx.pause();
 			returnValue = InvocationResponse.SUSPEND;
 			
-			if (transaction != null) {
-				try {
-					transaction.rollback();
-					transaction = null;
-				} catch (Exception e1) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
-					log.debug(message, e);
-				}
-			}
-			
 			// Rethrow the original exception if it is an AxisFault
 			if (e instanceof AxisFault)
 				throw (AxisFault)e;
@@ -166,15 +160,17 @@
 		} 
 		finally {
 			if (log.isDebugEnabled()) log.debug("SandeshaInHandler::invoke Doing final processing");
-			if (transaction != null) {
+			if (transaction != null && transaction.isActive()) {
 				try {
-					transaction.commit();
+					transaction.rollback();
+					transaction = null;
 				} catch (Exception e) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
 					log.debug(message, e);
 				}
 			}
 		}
+		
 		if (log.isDebugEnabled())
 			log.debug("Exit: SandeshaInHandler::invoke " + returnValue);
 		return returnValue;

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon May 21 05:17:40 2007
@@ -147,34 +147,29 @@
 				opCtx.setProperty(RequestResponseTransport.HOLD_RESPONSE, Boolean.TRUE);
 			}
 
+			if (transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
+
 		} catch (Exception e) {
 			// message should not be sent in a exception situation.
 			msgCtx.pause();
 			returnValue = InvocationResponse.SUSPEND;
 
-			// rolling back the transaction
-			if (transaction != null) {
-				try {
-					transaction.rollback();
-					transaction = null;
-				} catch (Exception e1) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
-					log.debug(message, e);
-				}
-			}
-
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
 			throw new AxisFault(message, e);
+
 		} finally {
-			if (transaction != null) {
+			// roll back the transaction
+			if (transaction != null && transaction.isActive()) {
 				try {
-					transaction.commit();
-				} catch (Exception e) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
-					log.debug(message, e);
+					transaction.rollback();
+				} catch (Exception e1) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+					log.debug(message, e1);
 				}
 			}
 		}
+		
 		if (log.isDebugEnabled())
 			log.debug("Exit: SandeshaOutHandler::invoke " + returnValue);
 		return returnValue;

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon May 21 05:17:40 2007
@@ -40,6 +40,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
@@ -72,7 +73,7 @@
 		this.inboundMessageNumber = inboundMessageNumber;
 	}
 	
-	public boolean processInMessage(RMMsgContext rmMsgCtx) {
+	public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
 		if (log.isDebugEnabled()) {
 			log.debug("Enter: ApplicationMsgProcessor::processInMessage");
 			log.debug("Exit: ApplicationMsgProcessor::processInMessage");
@@ -287,11 +288,9 @@
 				lastMessage = true;
 			}
 		}
-		if (lastMessage) {
-			rmsBean.setLastOutMessage(messageNumber);
-			// Update the rmsBean
-			storageManager.getRMSBeanMgr().update(rmsBean);
-		}
+		
+		if (lastMessage) 
+			rmsBean.setLastOutMessage(messageNumber);		
 
 		// set this as the response highest message.
 		rmsBean.setHighestOutMessageNumber(messageNumber);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon May 21 05:17:40 2007
@@ -37,6 +37,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
@@ -57,7 +58,7 @@
 
 	private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
 
-	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: CloseSequenceProcessor::processInMessage");
 
@@ -120,6 +121,14 @@
 		closeSequenceResponseMsg.setResponseWritten(true);
 
 		closeSeqResponseRMMsg.addSOAPEnvelope();
+		
+		//
+		// Now that we have generated the message we can commit the transaction
+		//
+		if(transaction != null && transaction.isActive()) {
+			transaction.commit();
+			transaction = null;
+		}
 
 		AxisEngine engine = new AxisEngine(closeSequenceMsg.getConfigurationContext());
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Mon May 21 05:17:40 2007
@@ -41,6 +41,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.util.FaultManager;
@@ -64,7 +65,7 @@
 
 	private static final Log log = LogFactory.getLog(CreateSeqMsgProcessor.class);
 
-	public boolean processInMessage(RMMsgContext createSeqRMMsg) throws AxisFault {
+	public boolean processInMessage(RMMsgContext createSeqRMMsg, Transaction transaction) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: CreateSeqMsgProcessor::processInMessage");
 
@@ -259,6 +260,10 @@
 	
 			SandeshaUtil.startWorkersForSequence(context, rmdBean);
 
+			//
+			// We have done all of our updates, so commit the transaction
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			
 			AxisEngine engine = new AxisEngine(context);
 			try{
 				engine.send(outMessage);				

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Mon May 21 05:17:40 2007
@@ -38,6 +38,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -57,7 +58,7 @@
 
 	private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
 
-	public boolean processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
+	public boolean processInMessage(RMMsgContext createSeqResponseRMMsgCtx, Transaction transaction) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java Mon May 21 05:17:40 2007
@@ -9,18 +9,15 @@
 import org.apache.axis2.util.MessageContextBuilder;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.wsrm.Sequence;
 
 public class LastMessageProcessor  {
 
-	public boolean processLastMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public static void processLastMessage(RMMsgContext rmMsgCtx) throws AxisFault {
 		
 		if (!Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmMsgCtx.getRMSpecVersion()))
-			return true;
+			return;
 		
 		MessageContext msgContext = rmMsgCtx.getMessageContext();
-		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-		String sequenceId = sequence.getIdentifier().getIdentifier();
 		MessageContext outMessage = MessageContextBuilder.createOutMessageContext(msgContext);
 		
 		//add the SOAP envelope with body null
@@ -37,7 +34,6 @@
 		AxisEngine engine = new AxisEngine (rmMsgCtx.getConfigurationContext());
 		engine.send(outMessage);
 		
-		return true;
 	}
 
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Mon May 21 05:17:40 2007
@@ -20,6 +20,7 @@
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -47,7 +48,7 @@
 	 * A message is selected by the set of SenderBeans that are waiting to be sent.
 	 * This is processed using a SenderWorker. 
 	 */
-	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
 		if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::processInMessage " + rmMsgCtx.getSOAPEnvelope().getBody());
 
 		MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION);
@@ -110,7 +111,7 @@
 			return false;
 		}
 		
-		replyToPoll(rmMsgCtx, senderBean, storageManager, pending, makeConnection.getNamespaceValue());
+		replyToPoll(rmMsgCtx, senderBean, storageManager, pending, makeConnection.getNamespaceValue(), transaction);
 		
 		if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage");
 		return false;
@@ -120,7 +121,8 @@
 			SenderBean matchingMessage,
 			StorageManager storageManager,
 			boolean pending,
-			String namespace)
+			String namespace,
+			Transaction transaction)
 	throws AxisFault
 	{
 		if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::replyToPoll");
@@ -175,6 +177,10 @@
 		returnMessage.setOperationContext(context);
 		
 		returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
+		
+		//
+		// Commit the current transaction, so that the SenderWorker can do it's own locking
+		if(transaction != null && transaction.isActive()) transaction.commit();
 		
 		//running the MakeConnection through a SenderWorker.
 		//This will allow Sandesha2 to consider both of following senarios equally.

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java Mon May 21 05:17:40 2007
@@ -19,6 +19,7 @@
 
 import org.apache.axis2.AxisFault;
 import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.storage.Transaction;
 
 /**
  * The message processor interface.
@@ -31,7 +32,7 @@
 	 * @return true if the msg context has been paused
 	 * @throws AxisFault
 	 */
-	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+	public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault;
 	
 	/**
 	 * 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Mon May 21 05:17:40 2007
@@ -42,6 +42,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
@@ -66,7 +67,7 @@
 
 	private static final Log log = LogFactory.getLog(SequenceProcessor.class);
 
-	public InvocationResponse processSequenceHeader(RMMsgContext rmMsgCtx) throws AxisFault {
+	public InvocationResponse processSequenceHeader(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: SequenceProcessor::processSequenceHeader");
 		
@@ -74,7 +75,7 @@
 		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 		if(sequence != null) {
 			// This is a reliable message, so hand it on to the main routine
-			result = processReliableMessage(rmMsgCtx);
+			result = processReliableMessage(rmMsgCtx, transaction);
 		} else {
 			if (log.isDebugEnabled())
 				log.debug("Message does not contain a sequence header");
@@ -84,7 +85,7 @@
 		return result;
 	}
 	
-	public InvocationResponse processReliableMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public InvocationResponse processReliableMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: SequenceProcessor::processReliableMessage");
 
@@ -214,12 +215,12 @@
 		
 			  SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
 			    
-			  // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
+			  // this is effectively a poll for the replyMessage, so re-use the logic in the MakeConnection
 			  // processor. This will use this thread to re-send the reply, writing it into the transport.
 			  // As the reply is now written we do not want to continue processing, or suspend, so we abort.
 			  if(replyMessageBean != null) {
 			  	if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
-			   	MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
+			   	MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null, transaction);
 					result = InvocationResponse.ABORT;
 					if (log.isDebugEnabled())
 						log.debug("Exit: SequenceProcessor::processReliableMessage, replayed message: " + result);
@@ -313,13 +314,14 @@
 		// - We have async acks
 		boolean backchannelFree = (replyTo != null && !replyTo.hasAnonymousAddress()) ||
 									WSDLConstants.MEP_CONSTANT_IN_ONLY == mep;
+		
+		boolean sendAck = false;
+		
 		EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
 		if (acksTo.hasAnonymousAddress() && backchannelFree) {
 			Object responseWritten = msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
-			if (responseWritten==null || !Constants.VALUE_TRUE.equals(responseWritten)) {
-				RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
-				msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
-				AcknowledgementManager.sendAckNow(ackRMMsgContext);
+			if (responseWritten==null || !Constants.VALUE_TRUE.equals(responseWritten)) {				
+				sendAck = true;
 			}
 		} else if (!acksTo.hasAnonymousAddress()) {
 			SandeshaPolicyBean policyBean = SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation());
@@ -365,6 +367,24 @@
 			// This will avoid performing application processing more than once.
 			rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 
+		}
+
+		if (transaction != null && transaction.isActive()) 
+			transaction.commit();
+		
+		if (sendAck) {
+			try {
+				transaction = storageManager.getTransaction();
+				
+				RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
+				msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);	
+				AcknowledgementManager.sendAckNow(ackRMMsgContext);
+				if (transaction != null && transaction.isActive()) transaction.commit();
+				transaction = null;
+			
+			} finally {
+				if (transaction != null && transaction.isActive()) transaction.rollback();
+			}
 		}
 		
 		if (log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon May 21 05:17:40 2007
@@ -29,7 +29,6 @@
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.transport.http.server.AxisHttpResponseImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
@@ -40,6 +39,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
@@ -64,7 +64,7 @@
 
 	private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
 
-	public boolean processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
+	public boolean processInMessage(RMMsgContext terminateSeqRMMsg, Transaction transaction) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
@@ -150,12 +150,15 @@
 
 		//sending the terminate sequence response
 		if (terminateSequenceResponse != null) {
+			//
+			// As we have processed the input and prepared the response we can commit the
+			// transaction now.
+			if(transaction != null && transaction.isActive()) transaction.commit();
 			
 			MessageContext outMessage = terminateSequenceResponse.getMessageContext();
 			EndpointReference toEPR = outMessage.getTo();
 			
-			AxisEngine engine = new AxisEngine(terminateSeqMsg
-					.getConfigurationContext());
+			AxisEngine engine = new AxisEngine(terminateSeqMsg.getConfigurationContext());
 						
 			outMessage.setServerSide(true);
 						
@@ -308,7 +311,7 @@
 							throw new SandeshaException (message);
 						}
 						
-						RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
+						//RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
 									
 					}
 				}

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Mon May 21 05:17:40 2007
@@ -29,6 +29,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
@@ -43,7 +44,7 @@
 
 	private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
 	
-	public boolean processInMessage(RMMsgContext terminateResRMMsg)
+	public boolean processInMessage(RMMsgContext terminateResRMMsg, Transaction transaction)
 			throws AxisFault { 
 		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
 		

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Mon May 21 05:17:40 2007
@@ -72,31 +72,23 @@
 				StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());				
 				transaction = storageManager.getTransaction();
 
-				msgProcessor.processInMessage(rmMsgCtx);
+				msgProcessor.processInMessage(rmMsgCtx, transaction);
 
 				//If message is a LastMessage then we deligate the processing to the LastMessageProcessor
 				Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 				if (sequence!=null && sequence.getLastMessage()!=null) {
-					LastMessageProcessor lastMsgProcessor = new LastMessageProcessor ();
-					lastMsgProcessor.processLastMessage(rmMsgCtx);
+					LastMessageProcessor.processLastMessage(rmMsgCtx);
 				}
-				
+
+        if(transaction != null && transaction.isActive()) transaction.commit();
+        transaction = null;
+
 			} catch (Exception e) {
 				if (log.isDebugEnabled())
 					log.debug("Exception caught during processInMessage", e);
 				// message should not be sent in a exception situation.
 				msgCtx.pause();
 	
-				if (transaction != null) {
-					try {
-						transaction.rollback();
-						transaction = null;
-					} catch (Exception e1) {
-						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
-						log.debug(message, e);
-					}
-				}
-	
 				if (!(e instanceof AxisFault)) {
 					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.inMsgError, e.toString());
 					throw new AxisFault(message, e);
@@ -104,12 +96,12 @@
 				
 				throw (AxisFault)e;
 			} finally {
-				if (transaction != null) {
+				if (transaction != null && transaction.isActive()) {
 					try {
-						transaction.commit();
-					} catch (Exception e) {
-						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
-						log.debug(message, e);
+						transaction.rollback();
+					} catch (Exception e1) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+						log.debug(message, e1);
 					}
 				}
 			}

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java Mon May 21 05:17:40 2007
@@ -108,11 +108,12 @@
 
 		} catch (Exception e) {
 			if(log.isDebugEnabled()) log.debug("Exception", e);
-			if(t != null) {
+		} finally {
+			if(t != null && t.isActive()) {
 				try {
 					t.rollback();
 				} catch(Exception e2) {
-					if(log.isDebugEnabled()) log.debug("Exception during rollback", e);
+					if(log.isDebugEnabled()) log.debug("Exception during rollback", e2);
 				}
 			}
 		}

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java Mon May 21 05:17:40 2007
@@ -23,9 +23,9 @@
 
 public interface Transaction {
 	
-	public void commit ();
+	public void commit () throws SandeshaStorageException;
 	
-	public void rollback ();
+	public void rollback () throws SandeshaStorageException;
 	
 	//indicates that the transaction has been started, but has not been committed or rolledbacked yet.
 	public boolean isActive ();

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Mon May 21 05:17:40 2007
@@ -46,6 +46,7 @@
 	private InMemoryTransaction waitingForTran = null;
 	private boolean sentMessages = false;
 	private boolean receivedMessages = false;
+	private boolean active = true;
 	
 	InMemoryTransaction(InMemoryStorageManager manager, String threadName, int id) {
 		if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::<init>");
@@ -62,14 +63,16 @@
 			SandeshaThread invoker = manager.getInvoker();
 			if(invoker != null) invoker.wakeThread();
 		}
+		active = false;
 	}
 
 	public void rollback() {
 		releaseLocks();
+		active = false;
 	}
 	
 	public boolean isActive () {
-		return !enlistedBeans.isEmpty();
+		return active;
 	}
 
 	public void enlist(RMBean bean) throws SandeshaStorageException {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Mon May 21 05:17:40 2007
@@ -602,19 +602,20 @@
 		SOAPFault faultPart = envelope.getBody().getFault();
 
 		if (faultPart != null) {
-			Transaction transaction = null;
-			
-			try {
-				transaction = storageManager.getTransaction();
-				// constructing the fault
-				AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart);
-				response = manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+	    Transaction transaction = null;
 
-				if(transaction != null && transaction.isActive()) transaction.commit();
-				transaction = null;
-			} finally {
-				if (transaction != null && transaction.isActive())
-					transaction.rollback();
+	    try {
+	    	transaction = storageManager.getTransaction();
+
+	    	// constructing the fault
+	    	AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart);
+	    	response = manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+	    	
+	    	if(transaction != null && transaction.isActive()) transaction.commit();
+	    	transaction = null;
+	    } finally {
+	    	if (transaction != null && transaction.isActive())
+	    		transaction.rollback();
 			}
 		}
 		return response;

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java Mon May 21 05:17:40 2007
@@ -95,11 +95,11 @@
 				//invoke each bean in turn. 
 				//NOTE: here we are breaking ordering
 				while(stMapIt.hasNext()){
-					transaction = storageManager.getTransaction();
-					InvokerBean invoker = (InvokerBean)stMapIt.next();
-					
 					//invoke the app
 					try{
+						transaction = storageManager.getTransaction();
+						InvokerBean invoker = (InvokerBean)stMapIt.next();
+						
 						// start a new worker thread and let it do the invocation.
 						String workId = sequenceID + "::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
 					   //piece of work that will be assigned to the Worker.
@@ -149,15 +149,15 @@
 							rmdBeanMgr.update(rMDBean);
 						}
 						
+						if(transaction != null && transaction.isActive()) transaction.commit();
+						transaction = null;
 					}
 					catch(Exception e){
-						if(transaction != null) {
-							transaction.rollback();
-							transaction = null;
-						}
+						// Just log the error
+						if(log.isDebugEnabled()) log.debug("Exception", e);
 					} finally {
-						if(transaction != null) {
-							transaction.commit();
+						if(transaction != null && transaction.isActive()) {
+							transaction.rollback();
 							transaction = null;
 						}
 					}
@@ -327,30 +327,21 @@
 				
 				processedMessage = true;
 			}
+			
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
 		} catch (Exception e) {
-			if (transaction != null) {
-				try {
-					transaction.rollback();
-					transaction = null;
-				} catch (Exception e1) {
-					String message = SandeshaMessageHelper.getMessage(
-							SandeshaMessageKeys.rollbackError, e1
-									.toString());
-					log.debug(message, e1);
-				}
-			}
 			String message = SandeshaMessageHelper
 					.getMessage(SandeshaMessageKeys.invokeMsgError);
-			log.debug(message, e);
+			if(log.isDebugEnabled()) log.debug(message, e);
 		} finally {
-			if (transaction != null) {
+			if (transaction != null && transaction.isActive()) {
 				try {
-					transaction.commit();
-					transaction = null;
+					transaction.rollback();
 				} catch (Exception e) {
 					String message = SandeshaMessageHelper.getMessage(
-							SandeshaMessageKeys.commitError, e.toString());
-					log.debug(message, e);
+							SandeshaMessageKeys.rollbackError, e.toString());
+					if(log.isDebugEnabled()) log.debug(message, e);
 				}
 			}
 		}
@@ -359,5 +350,5 @@
 			log.debug("Exit: InOrderInvoker::internalRun");
 		return sleep;
 	}
-
+	
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Mon May 21 05:17:40 2007
@@ -14,6 +14,7 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
@@ -105,17 +106,15 @@
 					transaction.commit();
 					transaction = storageManager.getTransaction();
 				}
-
 			} catch (Exception e) {
 				if (log.isDebugEnabled())
 					log.debug("Exception :", e);
-				if(transaction!=null){
-					transaction.rollback();
-					transaction = storageManager.getTransaction();
-				}
+
 				handleFault(rmMsg, e);
 			}
 
+
+			
 			if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
 				Sequence sequence = (Sequence) rmMsg
 						.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -158,18 +157,25 @@
 				rMDBean.setNextMsgNoToProcess(nextMsgNo);
 				storageManager.getRMDBeanMgr().update(rMDBean);
 			}
+			
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
+			
 		} catch (Exception e) {
 			if (log.isErrorEnabled())
 				log.error(e.toString(), e);
-			if(transaction != null) {
-				transaction.rollback();
-				transaction = null;
-			}
 		} finally {
-			if (transaction!=null) transaction.commit();
-			
 			if (workId !=null && lock!=null) {
 				lock.removeWork(workId);
+			}
+
+			if (transaction!=null && transaction.isActive()) {
+				try {
+					transaction.rollback();
+				} catch (SandeshaStorageException e) {
+					if (log.isWarnEnabled())
+						log.warn("Caught exception rolling back transaction", e);
+				}
 			}
 		}
 		

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Mon May 21 05:17:40 2007
@@ -174,6 +174,9 @@
 			// If we got to here then we found work to do on the sequence, so we should
 			// remember not to sleep at the end of the list of sequences.
 			processedMessage = true;
+			
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
 
 		} catch (Exception e) {
 
@@ -184,28 +187,16 @@
 			//TODO rollback only if a SandeshaStorageException.
 			//This allows the other Exceptions to be used within the Normal flow.
 			
-			if (transaction != null) {
-				try {
-					transaction.rollback();
-					transaction = null;
-				} catch (Exception e1) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
-							.toString());
-					log.debug(message, e1);
-				}
-			}
-
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
-
 			log.debug(message, e);
 		} finally {
-			if (transaction != null) {
+			if (transaction != null && transaction.isActive()) {
 				try {
-					transaction.commit();
+					transaction.rollback();
 					transaction = null;
 				} catch (Exception e) {
 					String message = SandeshaMessageHelper
-							.getMessage(SandeshaMessageKeys.commitError, e.toString());
+							.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
 					log.debug(message, e);
 				}
 			}
@@ -228,9 +219,10 @@
 		RMSBean finderBean = new RMSBean();
 		finderBean.setTerminated(true);
 		
-		Transaction transaction = storageManager.getTransaction();
+		Transaction transaction = null;
 		
 		try {
+			transaction = storageManager.getTransaction();
 			
 			SandeshaPolicyBean propertyBean = 
 				SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());			
@@ -301,11 +293,21 @@
 		    }
 			} 	    
 	    
-    } catch (SandeshaException e) {
-    	if (log.isErrorEnabled())
-    		log.error(e);
-    } finally {
-			transaction.commit();
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
+			
+		} catch (SandeshaException e) {
+			if (log.isErrorEnabled())
+				log.error(e);
+		} finally {
+			if(transaction != null && transaction.isActive()) {
+				try {
+					transaction.rollback();
+				} catch (SandeshaStorageException e) {
+					if (log.isDebugEnabled())
+						log.debug("Caught exception rolling back transaction", e);
+				}
+			}
 		}
 		
 		if (log.isDebugEnabled()) 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Mon May 21 05:17:40 2007
@@ -32,11 +32,11 @@
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMSBean;
-import org.apache.sandesha2.storage.beans.RMSequenceBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
@@ -185,14 +185,6 @@
 			// sending the message
 			boolean successfullySent = false;
 
-			// have to commit the transaction before sending. This may
-			// get changed when WS-AT is available.
-			if(transaction != null) {
-				transaction.commit();
-				transaction = null;
-				transaction = storageManager.getTransaction();
-			}
-
 			// Although not actually sent yet, update the send count to indicate an attempt
 			if (senderBean.isReSend()) {
 				SenderBean bean2 = senderBeanMgr.retrieve(senderBean.getMessageID());
@@ -271,11 +263,10 @@
 				// Store the Exception as a sequence property to enable the client to lookup the last 
 				// exception time and timestamp.
 				
-				// Create a new Transaction
-				transaction = storageManager.getTransaction();
-			
 				try
 				{
+					// Create a new Transaction
+					transaction = storageManager.getTransaction();
 					
 					// Get the internal sequence id from the context
 					String internalSequenceId = (String)rmMsgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
@@ -300,7 +291,7 @@
 				{
 					if (log.isErrorEnabled())
 						log.error(e1);
-					
+				} finally {
 					if (transaction != null) {
 						transaction.rollback();
 						transaction = null;
@@ -340,34 +331,45 @@
 			if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
 					&&
 					 (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) {
-				transaction = storageManager.getTransaction();
-				//terminate message sent using the SandeshaClient. Since the terminate message will simply get the
-				//InFlow of the reference message get called which could be zero sized (OutOnly operations).
-				
-				// terminate sending side if this is the WSRM 1.0 spec. 
-				// If the WSRM versoion is 1.1 termination will happen in the terminate sequence response message.
-				
-				TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx
-						.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-				String sequenceID = terminateSequence.getIdentifier().getIdentifier();
-
-				RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
-				TerminateManager.terminateSendingSide(rmsBean, storageManager);
-				
-				transaction.commit();
+				try {
+					transaction = storageManager.getTransaction();
+					//terminate message sent using the SandeshaClient. Since the terminate message will simply get the
+					//InFlow of the reference message get called which could be zero sized (OutOnly operations).
+					
+					// terminate sending side if this is the WSRM 1.0 spec. 
+					// If the WSRM versoion is 1.1 termination will happen in the terminate sequence response message.
+					
+					TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx
+							.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+					String sequenceID = terminateSequence.getIdentifier().getIdentifier();
+	
+					RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
+					TerminateManager.terminateSendingSide(rmsBean, storageManager);
+					
+					if(transaction != null && transaction.isActive()) transaction.commit();
+					transaction = null;
+				} finally {
+					if(transaction != null && transaction.isActive()) {
+						transaction.rollback();
+						transaction = null;
+					}
+				}
 			}
 
 		} catch (Exception e) {
-			if (log.isErrorEnabled()) log.error("Caught exception", e);
-			if (transaction!=null) {
-				transaction.rollback();
-				transaction = null;
-			}
+			if (log.isDebugEnabled()) log.debug("Caught exception", e);
 		} finally {
-			if (transaction!=null) transaction.commit();
-			
 			if (lock!=null && workId!=null) {
 				lock.removeWork(workId);
+			}
+
+			if (transaction!=null && transaction.isActive()) {
+				try {
+					transaction.rollback();
+				} catch (SandeshaStorageException e) {
+					if (log.isWarnEnabled())
+						log.warn("Caught exception rolling back transaction", e);
+				}
 			}
 		}
 		

Modified: webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java (original)
+++ webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java Mon May 21 05:17:40 2007
@@ -116,6 +116,7 @@
 		// Test sync echo with no offer, and the 1.1 spec
 		clientOptions = new Options();
 		clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1);
+		org.apache.log4j.BasicConfigurator.configure();
 		runEcho(clientOptions, false, false, true,true,true);
 	}
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org