You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2006/12/19 15:46:09 UTC

svn commit: r488693 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: ./ client/ handlers/ msgprocessors/ msgreceivers/ polling/ storage/inmemory/ util/ workers/

Author: mlovett
Date: Tue Dec 19 06:46:08 2006
New Revision: 488693

URL: http://svn.apache.org/viewvc?view=rev&rev=488693
Log:
Associate the Sandesha transaction with the thread that is running the code instead of the message context

Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Tue Dec 19 06:46:08 2006
@@ -541,8 +541,6 @@
 	
 	String POLLING_MANAGER = "PollingManager";
 	
-	String WITHIN_TRANSACTION = "WithinTransaction";
-	
 	String STORAGE_MANAGER_PARAMETER  = "Sandesha2StorageManager";
 	
 	String POST_FAILURE_MESSAGE = "PostFailureMessage";

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java Tue Dec 19 06:46:08 2006
@@ -110,20 +110,10 @@
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
 		RMSBeanMgr createSeqMgr = storageManager.getRMSBeanMgr();
 
-		String withinTransactionStr = (String) configurationContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-		boolean withinTransaction = false;
-		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
-			withinTransaction = true;
-
 		Transaction reportTransaction = null;
-		if (!withinTransaction) {
-			reportTransaction = storageManager.getTransaction();
-			configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-		}
-
-		boolean rolebacked = false;
 
 		try {
+			reportTransaction = storageManager.getTransaction();
 
 			sequenceReport.setInternalSequenceID(internalSequenceID);
 
@@ -177,16 +167,12 @@
 			fillOutgoingSequenceInfo(sequenceReport, internalSequenceID, outSequenceID, seqPropMgr);
 
 		} catch (Exception e) {
-			if (!withinTransaction && reportTransaction!=null) {
+			if (reportTransaction!=null) {
 				reportTransaction.rollback();
-				configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-				rolebacked = true;
+				reportTransaction = null;
 			}
 		} finally {
-			if (!withinTransaction && !rolebacked && reportTransaction!=null) {
-				reportTransaction.commit();
-				configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-			}
+			if (reportTransaction!=null) reportTransaction.commit();
 		}
 
 		return sequenceReport;
@@ -237,20 +223,10 @@
 		SandeshaReport sandeshaReport = new SandeshaReport();
 		SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
 
-		String withinTransactionStr = (String) configurationContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-		boolean withinTransaction = false;
-		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
-			withinTransaction = true;
-
 		Transaction reportTransaction = null;
-		if (!withinTransaction) {
-			reportTransaction = storageManager.getTransaction();
-			configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-		}
-
-		boolean rolebacked = false;
 
 		try {
+			reportTransaction = storageManager.getTransaction();
 
 			internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
 			Collection collection = seqPropMgr.find(internalSequenceFindBean);
@@ -285,16 +261,12 @@
 			}
 
 		} catch (Exception e) {
-			if (!withinTransaction && reportTransaction!=null) {
+			if (reportTransaction!=null) {
 				reportTransaction.rollback();
-				configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-				rolebacked = true;
+				reportTransaction = null;
 			}
 		} finally {
-			if (!withinTransaction && !rolebacked && reportTransaction!=null) {
-				reportTransaction.commit();
-				configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-			}
+			if (reportTransaction!=null) reportTransaction.commit();
 		}
 
 		return sandeshaReport;
@@ -641,13 +613,14 @@
 		String sequenceID = null;
 		
 		try
-		{			
+		{
+			transaction = storageManager.getTransaction();
 			sequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(internalSequenceID, storageManager);		
 		}
 		finally
 		{
 			// Commit the transaction as it was only a retrieve
-			transaction.commit();
+			if(transaction != null) transaction.commit();
 		}
 		
 		if (sequenceID == null)
@@ -683,17 +656,18 @@
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
 
 		// Get a transaction to obtain sequence information
-		Transaction transaction = storageManager.getTransaction();
-		
+		Transaction transaction = null;
 		String sequenceID = null;
 		
 		try
 		{
+			transaction = storageManager.getTransaction();
 			sequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(internalSequenceID, storageManager);
 		}
 		finally
 		{
-			transaction.commit();
+			// Commit the tran whatever happened
+			if(transaction != null) transaction.commit();
 		}
 		
 		if (sequenceID == null)
@@ -802,21 +776,12 @@
 	public static void forceDispatchOfInboundMessages(ConfigurationContext configContext, 
 			String sequenceID,
 			boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
-		String withinTransactionStr = (String) configContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-		boolean withinTransaction = false;
-		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
-			withinTransaction = true;
 
 		Transaction reportTransaction = null;
-		if (!withinTransaction) {
-			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration());
-			reportTransaction = storageManager.getTransaction();
-			configContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-		}
-
-		boolean rolledback = false;
 
 		try {
+			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration());
+			reportTransaction = storageManager.getTransaction();
 
 			//only do this if we are running inOrder
 			if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
@@ -830,16 +795,12 @@
 			}
 			
 		} catch (Exception e) {
-			if (!withinTransaction && reportTransaction!=null) {
+			if (reportTransaction!=null) {
 				reportTransaction.rollback();
-				configContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-				rolledback = true;
+				reportTransaction = null;
 			}
 		} finally {
-			if (!withinTransaction && !rolledback && reportTransaction!=null) {
-				reportTransaction.commit();
-				configContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-			}
+			if(reportTransaction != null) reportTransaction.commit();
 		}
 	}
 
@@ -869,17 +830,18 @@
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
 		
 		// Get a transaction for getting the sequence properties
-		Transaction transaction = storageManager.getTransaction();
-
+		Transaction transaction = null;
 		String sequenceID = null;
 		
 		try
 		{
+			transaction = storageManager.getTransaction();
 			sequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(internalSequenceID, storageManager);
 		}
 		finally
 		{
-			transaction.commit();
+			// Commit the tran whatever happened
+			if(transaction != null) transaction.commit();
 		}
 		
 		if (sequenceID == null)
@@ -1061,20 +1023,10 @@
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx,configCtx.getAxisConfiguration());
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
 
-		String withinTransactionStr = (String) configCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-		boolean withinTransaction = false;
-		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
-			withinTransaction = true;
-
 		Transaction reportTransaction = null;
-		if (!withinTransaction) {
-			reportTransaction = storageManager.getTransaction();
-			configCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-		}
-		
-		boolean rolebacked = false;
 
 		try {
+			reportTransaction = storageManager.getTransaction();
 
 			SequenceReport sequenceReport = new SequenceReport();
 
@@ -1083,7 +1035,6 @@
 
 			Iterator iter = completedMessageList.iterator();
 			while (iter.hasNext()) {
-				;
 				sequenceReport.addCompletedMessage((Long) iter.next());
 			}
 
@@ -1101,16 +1052,12 @@
 			return sequenceReport;
 
 		} catch (Exception e) {
-			if (!withinTransaction && reportTransaction!=null) {
+			if (reportTransaction!=null) {
 				reportTransaction.rollback();
-				configCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-				rolebacked = true;
+				reportTransaction = null;
 			}
 		} finally {
-			if (!withinTransaction && !rolebacked && reportTransaction!=null) {
-				reportTransaction.commit();
-				configCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-			}
+			if (reportTransaction!=null) reportTransaction.commit();
 		}
 
 		return null;
@@ -1142,17 +1089,18 @@
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
 
 		// Get a transaction to obtain sequence information
-		Transaction transaction = storageManager.getTransaction();
-		
+		Transaction transaction = null;
 		String sequenceID = null;
 		
 		try
 		{
+			transaction = storageManager.getTransaction();
 			sequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(internalSequenceID, storageManager);
 		}
 		finally
 		{
-			transaction.commit();
+			// Commit the tran whatever happened
+			if(transaction != null) transaction.commit();
 		}
 		
 		if (sequenceID == null)
@@ -1328,15 +1276,14 @@
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
 		
-		Transaction transaction = storageManager.getTransaction();
-
+		Transaction transaction = null;
 		String resultString = null;
     
 		try 
-		{		
+		{
+			transaction = storageManager.getTransaction();
 			// Lookup the last failed to send error
 			SequencePropertyBean errorBean = seqPropMgr.retrieve(internalSequenceId, Sandesha2Constants.SequenceProperties.LAST_FAILED_TO_SEND_ERROR);
-		
 			
 			// Get the value from the 
 			if (errorBean != null)
@@ -1344,7 +1291,8 @@
 		}
 		finally
 		{
-			transaction.commit();
+			// Commit the tran whatever happened
+			if(transaction != null) transaction.commit();
 		}
 		
 		if (log.isDebugEnabled())
@@ -1387,12 +1335,13 @@
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
 		
 		// Create a transaction for the retrieve operation
-		Transaction transaction = storageManager.getTransaction();
-	
+		Transaction transaction = null;
 		long resultTime = -1;
 
 		try
-		{		
+		{
+			transaction = storageManager.getTransaction();
+			
 			// Lookup the last failed to send error
 			SequencePropertyBean errorTSBean = 
 				seqPropMgr.retrieve(internalSequenceId, Sandesha2Constants.SequenceProperties.LAST_FAILED_TO_SEND_ERROR_TIMESTAMP);
@@ -1404,7 +1353,7 @@
 		finally
 		{
 			// commit the transaction as it was only a retrieve
-			transaction.commit();
+			if(transaction != null) transaction.commit();
 		}
 		
 		if (log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Tue Dec 19 06:46:08 2006
@@ -75,9 +75,7 @@
 			return returnValue; // Reinjected messages are not processed by Sandesha2 inflow
 													// handlers
 
-		boolean withinTransaction = false;
 		Transaction transaction = null;
-		boolean rolebacked = false;
 
 		try {
 			
@@ -90,11 +88,6 @@
 				return returnValue;
 			}
 
-			String withinTransactionStr = (String) msgContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-			if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
-				withinTransaction = true;
-			}
-
 			StorageManager storageManager = null;
 			try {
 				storageManager = SandeshaUtil
@@ -109,10 +102,7 @@
 				return returnValue;
 			}
 
-			if (!withinTransaction) {
-				transaction = storageManager.getTransaction();
-				msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-			}
+			transaction = storageManager.getTransaction();
 
 			RMMsgContext rmMessageContext = MsgInitializer.initializeMessage(msgContext);
 
@@ -143,11 +133,10 @@
 			msgContext.pause();
 			returnValue = InvocationResponse.SUSPEND;
 
-			if (!withinTransaction && transaction != null) {
+			if (transaction != null) {
 				try {
 					transaction.rollback();
-					msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-					rolebacked = true;
+					transaction = null;
 				} catch (Exception e1) {
 					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
 					log.debug(message, e);
@@ -159,10 +148,9 @@
 				log.debug("Exit: SandeshaGlobalInHandler::invoke ", e);
 			throw new AxisFault(message, e);
 		} finally {
-			if (!withinTransaction && !rolebacked && transaction != null) {
+			if (transaction != null) {
 				try {
 					transaction.commit();
-					msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
 				} catch (Exception e) {
 					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
 					if (log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Tue Dec 19 06:46:08 2006
@@ -85,20 +85,10 @@
 		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
 
-		boolean withinTransaction = false;
-		String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
-			withinTransaction = true;
-		}
-
 		Transaction transaction = null;
-		if (!withinTransaction) {
-			transaction = storageManager.getTransaction();
-			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-		}
-		boolean rolebacked = false;
 
 		try {
+			transaction = storageManager.getTransaction();
 
 			AxisService axisService = msgCtx.getAxisService();
 			if (axisService == null) {
@@ -146,11 +136,10 @@
 			msgCtx.pause();
 			returnValue = InvocationResponse.SUSPEND;
 			
-			if (!withinTransaction) {
+			if (transaction != null) {
 				try {
 					transaction.rollback();
-					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-					rolebacked = true;
+					transaction = null;
 				} catch (Exception e1) {
 					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
 					log.debug(message, e);
@@ -161,10 +150,9 @@
 			throw new AxisFault(message, e);
 		} 
 		finally {
-			if (!withinTransaction && !rolebacked) {
+			if (transaction != null) {
 				try {
 					transaction.commit();
-					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
 				} catch (Exception e) {
 					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
 					log.debug(message, e);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Tue Dec 19 06:46:08 2006
@@ -113,20 +113,11 @@
 		msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
 
-		boolean withinTransaction = false;
-		String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
-			withinTransaction = true;
-		}
-
 		Transaction transaction = null;
-		if (!withinTransaction) {
-			transaction = storageManager.getTransaction();
-			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-		}
-		boolean rolebacked = false;
 
 		try {
+			transaction = storageManager.getTransaction();
+			
 			// getting rm message
 			RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
 
@@ -172,11 +163,10 @@
 			returnValue = InvocationResponse.SUSPEND;
 
 			// rolling back the transaction
-			if (!withinTransaction) {
+			if (transaction != null) {
 				try {
 					transaction.rollback();
-					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-					rolebacked = true;
+					transaction = null;
 				} catch (Exception e1) {
 					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
 					log.debug(message, e);
@@ -186,10 +176,9 @@
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
 			throw new AxisFault(message, e);
 		} finally {
-			if (!withinTransaction && !rolebacked) {
+			if (transaction != null) {
 				try {
 					transaction.commit();
-					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
 				} catch (Exception e) {
 					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
 					log.debug(message, e);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Tue Dec 19 06:46:08 2006
@@ -505,9 +505,6 @@
 		createSeqMsg.setRelationships(null); // create seq msg does not
 												// relateTo anything
 		
-		// Set that the create sequence message is part of a transaction.
-		createSeqMsg.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-		
 		String createSequenceMessageStoreKey = SandeshaUtil.getUUID(); // the key that will be used to store 
 																	   //the create sequence message.
 		

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Tue Dec 19 06:46:08 2006
@@ -25,7 +25,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.msgprocessors.MsgProcessor;
@@ -57,23 +56,12 @@
 		// that we will have a MsgProcessor every time.
 		MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
 		if(msgProcessor != null) {
-			boolean withinTransaction = false;
-			String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-			if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
-				withinTransaction = true;
-			}
-
 			Transaction transaction = null;
-			if (!withinTransaction) {
+			
+			try {
 				ConfigurationContext context = msgCtx.getConfigurationContext();
 				StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());				
 				transaction = storageManager.getTransaction();
-				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-			}
-			
-			boolean rolledBack = false;
-			
-			try {
 
 				msgProcessor.processInMessage(rmMsgCtx);
 
@@ -83,11 +71,10 @@
 				// message should not be sent in a exception situation.
 				msgCtx.pause();
 	
-				if (!withinTransaction) {
+				if (transaction != null) {
 					try {
 						transaction.rollback();
-						msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-						rolledBack = true;
+						transaction = null;
 					} catch (Exception e1) {
 						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
 						log.debug(message, e);
@@ -97,10 +84,9 @@
 				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.inMsgError, e.toString());
 				throw new AxisFault(message, e);
 			} finally {
-				if (!withinTransaction && !rolledBack) {
+				if (transaction != null) {
 					try {
 						transaction.commit();
-						msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
 					} catch (Exception e) {
 						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
 						log.debug(message, e);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Tue Dec 19 06:46:08 2006
@@ -70,12 +70,12 @@
 			try {
 				t = storageManager.getTransaction();
 				pollRMDSide();
-				t.commit();
+				if(t != null) t.commit();
+				t = null;
 
 				t = storageManager.getTransaction();
 				pollRMSSide();
-				t.commit();
-
+				if(t != null) t.commit();
 				t = null;
 			} catch (Exception e) {
 				if(log.isDebugEnabled()) log.debug("Exception", e);
@@ -85,7 +85,6 @@
 					} catch(Exception e2) {
 						if(log.isDebugEnabled()) log.debug("Exception during rollback", e);
 					}
-					t = null;
 				}
 			}
 			try {
@@ -164,9 +163,6 @@
 		RMMsgContext referenceRMMessage = MsgInitializer.initializeMessage(referenceMessage);
 		RMMsgContext makeConnectionRMMessage = RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
 				sequenceId, WSRMAnonReplyToURI, storageManager);
-		
-		// Put our transaction onto the message context
-		makeConnectionRMMessage.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
 		
 		makeConnectionRMMessage.setProperty(MessageContext.TRANSPORT_IN,null);
 		//storing the MakeConnection message.

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Tue Dec 19 06:46:08 2006
@@ -88,9 +88,11 @@
 				result = new InMemoryTransaction(this, name, id);
 				transactions.put(key, result);
 			} else {
-				// We just returned an existing transaction. That might be ok, but it
-				// might be an indication of a real problem.
+				// We don't want to return an existing transaction, as someone else should
+				// decide if we commit it or not. If we get here then we probably have a
+				// bug.
 				if(log.isDebugEnabled()) log.debug("Possible re-used transaction: " + result);
+				result = null;
 			}
 		}
 		return result;

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Tue Dec 19 06:46:08 2006
@@ -137,8 +137,6 @@
 			StorageManager storageManager) throws SandeshaException {
 		ConfigurationContext configurationContext = messageContext.getConfigurationContext();
 
-		configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, messageContext
-				.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
 		SequenceReport report = SandeshaClient.getOutgoingSequenceReport(internalSequenceID, configurationContext);
 		TerminateManager.timeOutSendingSideSequence(configurationContext,sequencePropertyKey ,internalSequenceID, false, storageManager);
 

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java Tue Dec 19 06:46:08 2006
@@ -156,10 +156,17 @@
 							}
 						}
 						
-						transaction.commit();
 					}
 					catch(Exception e){
-						transaction.rollback();
+						if(transaction != null) {
+							transaction.rollback();
+							transaction = null;
+						}
+					} finally {
+						if(transaction != null) {
+							transaction.commit();
+							transaction = null;
+						}
 					}
 		
 				}//end while
@@ -228,7 +235,6 @@
 			doPauseIfNeeded();
 
 			Transaction transaction = null;
-			boolean rolebacked = false;
 
 			try {
 				StorageManager storageManager = SandeshaUtil
@@ -321,7 +327,10 @@
 
 					String messageContextKey = bean.getMessageContextRefKey();
 					
-					transaction.commit();
+					if(transaction != null) {
+						transaction.commit();
+						transaction = null;
+					}
 
 					// start a new worker thread and let it do the invocation.
 					InvokerWorker worker = new InvokerWorker(context,
@@ -343,7 +352,7 @@
 				if (transaction != null) {
 					try {
 						transaction.rollback();
-						rolebacked = true;
+						transaction = null;
 					} catch (Exception e1) {
 						String message = SandeshaMessageHelper.getMessage(
 								SandeshaMessageKeys.rollbackError, e1
@@ -355,9 +364,10 @@
 						.getMessage(SandeshaMessageKeys.invokeMsgError);
 				log.debug(message, e);
 			} finally {
-				if (!rolebacked && transaction != null) {
+				if (transaction != null) {
 					try {
 						transaction.commit();
+						transaction = null;
 					} catch (Exception e) {
 						String message = SandeshaMessageHelper.getMessage(
 								SandeshaMessageKeys.commitError, e.toString());

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java Tue Dec 19 06:46:08 2006
@@ -9,7 +9,6 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
@@ -60,20 +59,16 @@
 
 			String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsg);
 			
-			//endint the transaction before invocation.
-			transaction.commit();
+			// ending the transaction before invocation.
+			if(transaction != null) {
+				transaction.commit();
+				transaction = null;
+			}
 				
 			boolean invoked = false;
 			
 			try {
 
-				// Invocation is not done within a transation. This
-				// may get changed when WS-AT is available.
-				
-				// Invoking the message.
-				msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
-						Sandesha2Constants.VALUE_TRUE);
-
 				boolean postFailureInvocation = false;
 
 				// StorageManagers should st following property to
@@ -154,15 +149,14 @@
 					nextMsgMgr.update(rMDBean);
 				}				
 			}
-		} catch (SandeshaStorageException e) {
-			transaction.rollback();
-		} catch (SandeshaException e) {
-			log.error(e.toString(), e);
 		} catch (Exception e) {
 			log.error(e.toString(), e);
+			if(transaction != null) {
+				transaction.rollback();
+				transaction = null;
+			}
 		} finally {
-			if (transaction!=null && transaction.isActive())
-				transaction.commit();
+			if (transaction!=null) transaction.commit();
 			
 			if (workId !=null && lock!=null) {
 				lock.removeWork(workId);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Tue Dec 19 06:46:08 2006
@@ -73,7 +73,6 @@
 			doPauseIfNeeded();
 
 			Transaction transaction = null;
-			boolean rolebacked = false;
 
 			try {
 				if (context == null) {
@@ -85,7 +84,6 @@
 					throw new SandeshaException(message);
 				}
 				
-				// TODO make sure this locks on reads.
 				transaction = storageManager.getTransaction();
 
 				SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
@@ -116,7 +114,10 @@
 					continue;
 				}
 
-				transaction.commit();
+				if(transaction != null) {
+					transaction.commit();
+					transaction = null;
+				}
 
 				// start a worker which will work on this messages.
 				SenderWorker worker = new SenderWorker(context, senderBean);
@@ -141,7 +142,7 @@
 				if (transaction != null) {
 					try {
 						transaction.rollback();
-						rolebacked = true;
+						transaction = null;
 					} catch (Exception e1) {
 						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
 								.toString());
@@ -153,9 +154,10 @@
 
 				log.debug(message, e);
 			} finally {
-				if (transaction != null && !rolebacked) {
+				if (transaction != null) {
 					try {
 						transaction.commit();
+						transaction = null;
 					} catch (Exception e) {
 						String message = SandeshaMessageHelper
 								.getMessage(SandeshaMessageKeys.commitError, e.toString());

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=488693&r1=488692&r2=488693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Tue Dec 19 06:46:08 2006
@@ -71,13 +71,11 @@
 			String key = senderBean.getMessageContextRefKey();
 			MessageContext msgCtx = storageManager.retrieveMessageContext(key, configurationContext);
       
-      if (msgCtx == null) {
-        // This sender bean has already been processed
-        return;
-      }
+			if (msgCtx == null) {
+				// This sender bean has already been processed
+				return;
+			}
       
-			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-
 			RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
 
 			boolean continueSending = MessageRetransmissionAdjuster.adjustRetransmittion(rmMsgCtx, senderBean, configurationContext,
@@ -168,9 +166,10 @@
 
 			// have to commit the transaction before sending. This may
 			// get changed when WS-AT is available.
-			transaction.commit();
-			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
-					Sandesha2Constants.VALUE_FALSE);
+			if(transaction != null) {
+				transaction.commit();
+				transaction = null;
+			}
 
 			try {
 
@@ -252,22 +251,25 @@
 					seqPropMgr.insert(etsBean);
 					
 					// Commit the properties
-					transaction.commit();
+					if(transaction != null) {
+						transaction.commit();
+						transaction = null;
+					}
 				}
 				catch (Exception e1)
 				{
 					if (log.isErrorEnabled())
 						log.error(e1);
 					
-					if (transaction != null && transaction.isActive())
+					if (transaction != null) {
 						transaction.rollback();
+						transaction = null;
+					}
 				}
 				
-			} finally {
-				transaction = storageManager.getTransaction();
-				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
-						Sandesha2Constants.VALUE_TRUE);
 			}
+			// Establish the transaction for post-send processing
+			transaction = storageManager.getTransaction();
 
 			// update or delete only if the object is still present.
 			SenderBean bean1 = senderBeanMgr
@@ -288,12 +290,13 @@
 
 			if (successfullySent) {
 				if (!msgCtx.isServerSide())
-        {
-          // Commit the transaction to release the SenderBean
-          transaction.commit();
-          transaction = storageManager.getTransaction();
+				{
+					// Commit the transaction to release the SenderBean
+					transaction.commit();
+					transaction = null;
+					transaction = storageManager.getTransaction();
 					checkForSyncResponses(msgCtx);
-        }
+				}
 			}
 
 			if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
@@ -318,35 +321,14 @@
 						storageManager);
 			}
 
-			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-		} catch (SandeshaStorageException e) { 
-			if (log.isDebugEnabled())
-				log.debug("Caught exception", e);
-			if (transaction!=null && transaction.isActive())
-				transaction.rollback();
-		} catch (SandeshaException e) {
-			if (log.isDebugEnabled())
-				log.debug("Caught exception", e);
-			if (transaction!=null && transaction.isActive())
-				transaction.rollback();
-		} catch (MissingResourceException e) {
-			if (log.isFatalEnabled())
-			  log.fatal("Unable to load message bundle", e);
-			if (transaction!=null && transaction.isActive())
-				transaction.rollback();
-		} catch (AxisFault e) {
-			if (log.isDebugEnabled())
-				log.debug("Caught exception", e);
-			if (transaction!=null && transaction.isActive())
-				transaction.rollback();
 		} catch (Exception e) {
-			if (log.isDebugEnabled())
-				log.debug("Caught exception", e);
-			if (transaction!=null && transaction.isActive())
+			if (log.isDebugEnabled()) log.debug("Caught exception", e);
+			if (transaction!=null) {
 				transaction.rollback();
+				transaction = null;
+			}
 		} finally {
-			if (transaction!=null && transaction.isActive())
-				transaction.commit();
+			if (transaction!=null) transaction.commit();
 			
 			if (lock!=null && workId!=null) {
 				lock.removeWork(workId);
@@ -439,12 +421,6 @@
 				
 				return;
 			}
-
-			// if the request msg ctx is withina a transaction, processing if
-			// the response should also happen
-			// withing the same transaction
-			responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, msgCtx
-					.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
 
 			if (resenvelope != null) {
 				responseMessageContext.setEnvelope(resenvelope);



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org