You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/05/14 12:52:41 UTC

svn commit: r406310 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: handlers/ msgprocessors/ storage/ storage/inmemory/ util/ workers/

Author: chamikara
Date: Sun May 14 03:52:36 2006
New Revision: 406310

URL: http://svn.apache.org/viewcvs?rev=406310&view=rev
Log:
Pusing the message context from proceeding when an exception occur.
Correction to the cleaning logic.
All the data (including message contexts) should be cleared at the end
of the sequence except for the data needed by reports. (this will be come 
specially important when a permanent storage based storage manager is 
present).
Added the removeMessageContext method to the storage manager.

Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Sun May 14 03:52:36 2006
@@ -163,11 +163,17 @@
 			}
 
 		} catch (Exception e) {
+			//message should not be sent in a exception situation.
+			msgContext.pause();
+			
 			if (!withinTransaction) {
 				transaction.rollback();
 				msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
 				rolebacked = true;
 			}
+			
+			String message = "Sandesha2 got an exception when processing the in message";
+			throw new AxisFault (message,e);
 		} finally {
 			if (!withinTransaction && !rolebacked) {
 				transaction.commit();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Sun May 14 03:52:36 2006
@@ -109,11 +109,17 @@
 			}
 
 		} catch (Exception e) {
+			//message should not be sent in a exception situation.
+			msgCtx.pause();
+			
 			if (!withinTransaction) {
 				transaction.rollback();
 				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
 				rolebacked = true;
 			}
+			
+			String message = "Sandesha2 got an exception when processing the in message";
+			throw new AxisFault (message,e);
 		} finally {
 			if (!withinTransaction && !rolebacked) {
 				transaction.commit();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Sun May 14 03:52:36 2006
@@ -119,11 +119,18 @@
 				msgProcessor.processOutMessage(rmMsgCtx);
 
 		} catch (Exception e) {
+			//message should not be sent in a exception situation.
+			msgCtx.pause();
+			
+			//rolling back the transaction
 			if (!withinTransaction) {
 				transaction.rollback();
 				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
 				rolebacked = true;
 			}
+			
+			String message = "Sandesha2 got an exception when processing the out message";
+			throw new AxisFault (message,e);
 		} finally {
 			if (!withinTransaction && !rolebacked) {
 				transaction.commit();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Sun May 14 03:52:36 2006
@@ -166,8 +166,13 @@
 			for (long messageNo = lower; messageNo <= upper; messageNo++) {
 				SenderBean retransmitterBean = getRetransmitterEntry(
 						retransmitterEntriesOfSequence, messageNo);
-				if (retransmitterBean != null)
+				if (retransmitterBean != null) {
 					retransmitterMgr.delete(retransmitterBean.getMessageID());
+					
+					//removing the application message from the storage.
+					String storageKey = retransmitterBean.getMessageContextRefKey();
+					storageManager.removeMessageContext(storageKey);
+				}
 				
 				ackedMessagesList.add(new Long (messageNo));
 			}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Sun May 14 03:52:36 2006
@@ -208,6 +208,8 @@
 		//updating the Highest_In_Msg_No property which gives the highest message number retrieved from this sequence.
 		String highetsInMsgNoStr = SandeshaUtil.getSequenceProperty(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER,configCtx);
 		String highetsInMsgKey = SandeshaUtil.getSequenceProperty(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,configCtx);
+		if (highetsInMsgKey==null)
+			highetsInMsgKey = SandeshaUtil.getUUID();
 		
 		long highestInMsgNo=0;
 		if (highetsInMsgNoStr!=null) {
@@ -219,7 +221,11 @@
 			
 			String str = new Long(msgNo).toString();
 			SequencePropertyBean highestMsgNoBean = new SequencePropertyBean (sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER,str);
-			SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean (sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,key);
+			SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean (sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,highetsInMsgKey);
+			
+			//storing the new message as the highest in message.
+			storageManager.removeMessageContext(highetsInMsgKey);
+			storageManager.storeMessageContext(highetsInMsgKey,msgCtx);
 			
 			if (highetsInMsgNoStr!=null) {
 				seqPropMgr.update(highestMsgNoBean);
@@ -415,6 +421,7 @@
 		// find internal sequence id
 		String internalSequenceId = null;
 
+
 		String storageKey = SandeshaUtil.getUUID();  //the key which will be used to store this message.
 		
 		/* Internal sequence id is the one used to refer to the sequence (since
@@ -811,7 +818,7 @@
 		// sending the message once through Sandesha2TransportSender.
 		AxisEngine engine = new AxisEngine(createSeqMsg.getConfigurationContext());
 		 try {
-			 engine.send(createSeqMsg);
+			 engine.resumeSend(createSeqMsg);
 		 } catch (AxisFault e) {
 			 throw new SandeshaException (e.getMessage());
 		 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Sun May 14 03:52:36 2006
@@ -31,7 +31,6 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -75,7 +74,6 @@
 			.getSandeshaStorageManager(configCtx);
 		
 		//Processing for ack if available
-///		Transaction ackProcessTransaction = storageManager.getTransaction();
 		
 		SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) createSeqResponseRMMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
@@ -84,12 +82,8 @@
 			ackProcessor.processInMessage(createSeqResponseRMMsgCtx);
 		}
 
-///		ackProcessTransaction.commit();
-		
 		//Processing the create sequence response.
 		
-///		Transaction createSeqResponseTransaction = storageManager.getTransaction();
-		
 		CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
 		if (createSeqResponsePart == null) {
@@ -137,6 +131,14 @@
 		createSeqBean.setSequenceID(newOutSequenceId);
 		createSeqMgr.update(createSeqBean);
 
+		SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
+		if (createSequenceSenderBean==null)
+			throw new SandeshaException ("Create sequence entry is not found");
+		
+		//removing the Create Sequence Message from the storage
+		String createSeqStorageKey = createSequenceSenderBean.getMessageContextRefKey();
+		storageManager.removeMessageContext(createSeqStorageKey);
+		
 		//deleting the create sequence entry.
 		retransmitterMgr.delete(createSeqMsgId);
 
@@ -154,11 +156,6 @@
 		sequencePropMgr.insert(outSequenceBean);
 		sequencePropMgr.insert(internalSequenceBean);
 
-///		createSeqResponseTransaction.commit();
-		
-		
-///		Transaction offerProcessTransaction = storageManager.getTransaction();
-		
 		//processing for accept (offer has been sent)
 		Accept accept = createSeqResponsePart.getAccept();
 		if (accept != null) {
@@ -217,10 +214,6 @@
 			
 		}
 
-///		offerProcessTransaction.commit();
-		
-///		Transaction updateAppMessagesTransaction = storageManager.getTransaction();
-		
 		SenderBean target = new SenderBean();
 		target.setInternalSequenceID(internalSequenceId);
 		target.setSend(false);
@@ -281,12 +274,8 @@
 			//updating the message. this will correct the SOAP envelope string.
 			storageManager.updateMessageContext(key,applicationMsg);
 		}
-
-///		updateAppMessagesTransaction.commit();
 		
-///		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
 		SequenceManager.updateLastActivatedTime(internalSequenceId,configCtx);
-///		lastUpdatedTimeTransaction.commit();
 		
 		createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
 				.setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Sun May 14 03:52:36 2006
@@ -175,6 +175,8 @@
 				seqPropMgr.insert(lastInMsgBean);
 				
 				MessageContext highestInMsg = storageManager.retrieveMessageContext(highestImMsgKey,configCtx);
+				
+				//TODO get the out message in a storage friendly manner.
 				MessageContext highestOutMessage = highestInMsg.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
 				
 				if (highestOutMessage!=null) {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java Sun May 14 03:52:36 2006
@@ -33,4 +33,8 @@
 		super (e);
 	}
 	
+	public SandeshaStorageException (String m,Exception e) {
+		super (m,e);
+	}
+	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java Sun May 14 03:52:36 2006
@@ -69,4 +69,7 @@
 	public abstract void updateMessageContext (String storageKey,MessageContext msgContext) throws SandeshaStorageException;
 
 	public abstract MessageContext retrieveMessageContext (String storageKey, ConfigurationContext configContext) throws SandeshaStorageException;
+
+	public abstract void removeMessageContext (String storageKey) throws SandeshaStorageException;
+
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Sun May 14 03:52:36 2006
@@ -131,6 +131,18 @@
 		storeMessageContext(key,msgContext);
 	}
 	
+	public void removeMessageContext(String key) throws SandeshaStorageException { 
+		HashMap storageMap = (HashMap) getContext().getProperty(MESSAGE_MAP_KEY);
+		
+		if (storageMap==null) {
+			return;
+		}
+		
+		Object entry = storageMap.get(key);
+		if (entry!=null)
+			storageMap.remove(key);
+	}
+	
 	public void  initStorage (AxisModule moduleDesc) {
 		
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java Sun May 14 03:52:36 2006
@@ -330,7 +330,12 @@
 			if (it.hasNext()) {
 				SenderBean oldAckBean = (SenderBean) it.next();
 				timeToSend = oldAckBean.getTimeToSend();		//If there is an old ack. This ack will be sent in the old timeToSend.
+				
+				//removing the retransmitted entry for the oldAck
 				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+				
+				//removing the message store entry for the old ack
+				storageManager.removeMessageContext(oldAckBean.getMessageContextRefKey());
 			}
 			
 			ackBean.setTimeToSend(timeToSend);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Sun May 14 03:52:36 2006
@@ -312,6 +312,8 @@
 		
 		SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
 		
+		
+		
 		updateClientSideListnerIfNeeded (firstAplicationMsgCtx,anonymousURI);
 		
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java Sun May 14 03:52:36 2006
@@ -27,6 +27,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.TransportOutDescription;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.commons.logging.Log;
@@ -112,6 +113,12 @@
 		while (iterator.hasNext()) {
 			InvokerBean storageMapBean = (InvokerBean) iterator.next();
 			storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
+			
+			//removing the respective message context from the message store. If this is an in-only message.
+			//In-out message will be deleted when a ack is retrieved for the out message.
+			String messageStoreKey = storageMapBean.getMessageContextRefKey();
+			storageManager.removeMessageContext(messageStoreKey);
+			
 		}
 		
 		String cleanStatus = (String) receivingSideCleanMap.get(sequenceID);
@@ -138,7 +145,13 @@
 		Iterator iterator = collection.iterator();
 		while (iterator.hasNext()) {
 			NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
-			//nextMsgBeanMgr.delete(nextMsgBean.getSequenceID());
+//			nextMsgBeanMgr.delete(nextMsgBean.getSequenceID());
+		}
+		
+		//removing the HighestInMessage entry.
+		String highestInMessageKey = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,configContext);
+		if (highestInMessageKey!=null) {
+			storageManager.removeMessageContext(highestInMessageKey);
 		}
 		
 		removeReceivingSideProperties(configContext,sequenceID);
@@ -289,12 +302,15 @@
 			}
 		}
 		
-		//removing retransmitterMgr entries
+		//removing retransmitterMgr entries and corresponding message contexts.
 		Collection collection = retransmitterBeanMgr.find(internalSequenceID);
 		Iterator iterator = collection.iterator();
 		while (iterator.hasNext()) {
 			SenderBean retransmitterBean = (SenderBean) iterator.next();
 			retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+			
+			String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+			storageManager.removeMessageContext(messageStoreKey);
 		}
 		
 		//removing the createSeqMgrEntry

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Sun May 14 03:52:36 2006
@@ -178,6 +178,12 @@
 									.resume(msgToInvoke);
 							invoked = true;
 							storageMapMgr.delete(key);
+							
+							//removing the corresponding message context as well.
+							MessageContext msgCtx = storageManager.retrieveMessageContext(key,context);
+							if (msgCtx!=null) {
+								storageManager.removeMessageContext(key);
+							}
 						} catch (AxisFault e) {
 							throw new SandeshaException(e);
 						} finally {
@@ -205,7 +211,9 @@
 				}
 				
 			} catch (Exception e1) {
-				e1.printStackTrace();
+				String message = "Sandesha2 got an exception when trying to invoke the message";
+				log.debug(message,e1);
+				
 				if (transaction!=null) {
 					transaction.rollback();
 					rolebacked = true;

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sun May 14 03:52:36 2006
@@ -118,15 +118,16 @@
 				if (senderBean==null) {
 					continue;
 			    }
-
+				
+				String key = (String) senderBean.getMessageContextRefKey();
+				MessageContext msgCtx = storageManager.retrieveMessageContext(key, context);
+				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+				
 				MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
 				boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context);
 				if (!continueSending) {
 					continue;
 				}
-				
-				String key = (String) senderBean.getMessageContextRefKey();
-				MessageContext msgCtx = storageManager.retrieveMessageContext(key, context);
 
 				if (msgCtx == null) {
 					String message = "Message context is not present in the storage";
@@ -177,21 +178,23 @@
 				TransportOutDescription transportOutDescription = msgCtx.getTransportOut();
 				TransportSender transportSender = transportOutDescription.getSender();
 					
-				//have to commit the transaction before sending. This may get changed when WS-AT is available.
-				transaction.commit();
-				
 				boolean successfullySent = false;
 				if (transportSender != null) {
+					
+					//have to commit the transaction before sending. This may get changed when WS-AT is available.
+					transaction.commit();
+					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
 					try {
 						//TODO change this to cater for security.
 						transportSender.invoke(msgCtx);
 						successfullySent = true;
 					} catch (AxisFault e) {
 						// TODO Auto-generated catch block
-					    log.debug("Could not send message");
-						log.debug(e.getStackTrace().toString());
+					    String message = "Sandesha2 got an exception when trying to send the message";
+						log.debug(message,e);
 					} finally {
 						transaction =  storageManager.getTransaction();
+						msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
 					}
 				}
 
@@ -202,11 +205,14 @@
 						bean1.setSentCount(senderBean.getSentCount());
 						bean1.setTimeToSend(senderBean.getTimeToSend());
 						mgr.update(bean1);
-					} else
+					} else {
 						mgr.delete(bean1.getMessageID());
+						
+						//removing the message from the storage.
+						String messageStoredKey = bean1.getMessageContextRefKey();
+						storageManager.removeMessageContext(messageStoredKey);
+					}
 				}
-
-				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
 				
 				if (successfullySent) {
 					if (!msgCtx.isServerSide())



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org