You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2006/11/28 12:08:28 UTC

svn commit: r479987 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: msgprocessors/ polling/ transport/ util/

Author: mlovett
Date: Tue Nov 28 03:08:23 2006
New Revision: 479987

URL: http://svn.apache.org/viewvc?view=rev&rev=479987
Log:
Andy's patch to ensure each message is only stored once, instead of being stored and then updated. See SANDESHA2-52.

Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Tue Nov 28 03:08:23 2006
@@ -269,16 +269,15 @@
 
 			ackBean.setTimeToSend(timeToSend);
 
-			storageManager.storeMessageContext(key, ackMsgCtx);
 			msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
 			
-			// inserting the new ack.
-			retransmitterBeanMgr.insert(ackBean);
-
 			// passing the message through sandesha2sender
 
 			SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
 
+			// inserting the new ack.
+			retransmitterBeanMgr.insert(ackBean);
+
 			SandeshaUtil.startSenderForTheSequence(configurationContext, sequenceId);
 
 			msgContext.pause();
@@ -386,8 +385,6 @@
 		SenderBean ackRequestBean = new SenderBean();
 		ackRequestBean.setMessageContextRefKey(key);
 
-		storageManager.storeMessageContext(key, msgContext);
-
 		// Set a retransmitter lastSentTime so that terminate will be send with
 		// some delay.
 		// Otherwise this get send before return of the current request (ack).
@@ -413,9 +410,9 @@
 		ackRequestBean.setInternalSequenceID(internalSeqenceID);
 		ackRequestBean.setSequenceID(outSequenceID);
 		
-		retramsmitterMgr.insert(ackRequestBean);
-
 		SandeshaUtil.executeAndStore(ackRequestRMMsg, key);
+
+		retramsmitterMgr.insert(ackRequestBean);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Tue Nov 28 03:08:23 2006
@@ -577,12 +577,11 @@
 			createSeqEntry.setToAddress(to.getAddress());
 
 		createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-		storageManager.storeMessageContext(createSequenceMessageStoreKey, createSeqMsg); // storing the message
 		
+		SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
+
 		retransmitterMgr.insert(createSeqEntry);
 
-		SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
-		
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage");
 	}
@@ -752,16 +751,16 @@
 			appMsgEntry.setToAddress(to.getAddress());
 		
 		appMsgEntry.setInternalSequenceID(internalSequenceId);
-		storageManager.storeMessageContext(storageKey, msg);
 
 		msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-		retransmitterMgr.insert(appMsgEntry);
 
 		// increasing the current handler index, so that the message will not be
 		// going throught the SandeshaOutHandler again.
 		msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
 
 		SandeshaUtil.executeAndStore(rmMsg, storageKey);
+
+		retransmitterMgr.insert(appMsgEntry);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Tue Nov 28 03:08:23 2006
@@ -249,8 +249,6 @@
 		SenderBean closeBean = new SenderBean();
 		closeBean.setMessageContextRefKey(key);
 
-		storageManager.storeMessageContext(key, msgContext);
-
 		closeBean.setTimeToSend(System.currentTimeMillis());
 
 		closeBean.setMessageID(msgContext.getMessageID());
@@ -272,9 +270,9 @@
 		closeBean.setSequenceID(outSequenceID);
 		closeBean.setInternalSequenceID(internalSeqenceID);
 
-		retramsmitterMgr.insert(closeBean);
-
 		SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+		retramsmitterMgr.insert(closeBean);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Tue Nov 28 03:08:23 2006
@@ -394,12 +394,9 @@
 			}
 
 			ackBean.setTimeToSend(timeToSend);
-			storageManager.storeMessageContext(key, ackMsgCtx);
 
 			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
 			
-			// inserting the new ack.
-			retransmitterBeanMgr.insert(ackBean);
 			// / asyncAckTransaction.commit();
 
 			// passing the message through sandesha2sender
@@ -407,6 +404,9 @@
 			ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
 			
 			SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+			// inserting the new ack.
+			retransmitterBeanMgr.insert(ackBean);
 
 			SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(), sequenceId);
 		}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Nov 28 03:08:23 2006
@@ -393,8 +393,6 @@
 		SenderBean terminateBean = new SenderBean();
 		terminateBean.setMessageContextRefKey(key);
 
-		storageManager.storeMessageContext(key, msgContext);
-
 		// Set a retransmitter lastSentTime so that terminate will be send with
 		// some delay.
 		// Otherwise this get send before return of the current request (ack).
@@ -420,8 +418,6 @@
 
 		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
 
-		retramsmitterMgr.insert(terminateBean);
-
 		SequencePropertyBean terminateAdded = new SequencePropertyBean();
 		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
 		terminateAdded.setSequencePropertyKey(outSequenceID);
@@ -430,7 +426,9 @@
 		seqPropMgr.insert(terminateAdded);
 		
 		SandeshaUtil.executeAndStore(rmMsgCtx, key);
-		
+	
+		retramsmitterMgr.insert(terminateBean);
+
 		// Pause the message context
 		rmMsgCtx.pause();
 

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Tue Nov 28 03:08:23 2006
@@ -127,8 +127,6 @@
 				makeConnectionRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY,
 						sequencePropertyKey);
 				
-				storageManager.storeMessageContext(makeConnectionMsgStoreKey,makeConnectionRMMessage.getMessageContext());
-				
 				//add an entry for the MakeConnection message to the sender (with ,send=true, resend=false)
 				SenderBean makeConnectionSenderBean = new SenderBean ();
 //				makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
@@ -147,9 +145,9 @@
 				//this message should not be sent until it is qualified. I.e. till it is sent through the Sandesha2TransportSender.
 				makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
 				
-				senderBeanMgr.insert(makeConnectionSenderBean);
-				
 				SandeshaUtil.executeAndStore(makeConnectionRMMessage, makeConnectionMsgStoreKey);
+				
+				senderBeanMgr.insert(makeConnectionSenderBean);				
 			} catch (SandeshaStorageException e) {
 				e.printStackTrace();
 			} catch (SandeshaException e) {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java Tue Nov 28 03:08:23 2006
@@ -73,7 +73,7 @@
 		
 		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_TRUE);
 		
-		storageManager.updateMessageContext(key,msgContext);
+		storageManager.storeMessageContext(key,msgContext);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: Sandesha2TransportSender::invoke");

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Tue Nov 28 03:08:23 2006
@@ -1070,8 +1070,17 @@
 
 		if (msgContext.isPaused())
 			engine.resumeSend(msgContext);
-		else
+		else {
+			//this invocation has to be a blocking one.
+			
+			Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+			if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
+				msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+			
 			engine.send(msgContext);
+			
+			msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
+		}
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: SandeshaUtil::executeAndStore");

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java Tue Nov 28 03:08:23 2006
@@ -416,8 +416,6 @@
 		SenderBean terminateBean = new SenderBean();
 		terminateBean.setMessageContextRefKey(key);
 
-		storageManager.storeMessageContext(key, terminateRMMessage.getMessageContext());
-
 		// Set a retransmitter lastSentTime so that terminate will be send with
 		// some delay.
 		// Otherwise this get send before return of the current request (ack).
@@ -438,10 +436,6 @@
 		if (to!=null)
 			terminateBean.setToAddress(to.getAddress());
 
-		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
-		retramsmitterMgr.insert(terminateBean);
-
 		SequencePropertyBean terminateAdded = new SequencePropertyBean();
 		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
 		terminateAdded.setSequencePropertyKey(outSequenceId);
@@ -456,6 +450,9 @@
 		
 		// / addTerminateSeqTransaction.commit();
 		SandeshaUtil.executeAndStore(terminateRMMessage, key);
+		
+		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+		retramsmitterMgr.insert(terminateBean);
 
 		if(log.isDebugEnabled())
 			log.debug("Exit: TerminateManager::addTerminateSequenceMessage");



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org