You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2006/11/28 12:08:28 UTC
svn commit: r479987 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2: msgprocessors/
polling/ transport/ util/
Author: mlovett
Date: Tue Nov 28 03:08:23 2006
New Revision: 479987
URL: http://svn.apache.org/viewvc?view=rev&rev=479987
Log:
Andy's patch to ensure each message is only stored once, instead of being stored and then updated. See SANDESHA2-52.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Tue Nov 28 03:08:23 2006
@@ -269,16 +269,15 @@
ackBean.setTimeToSend(timeToSend);
- storageManager.storeMessageContext(key, ackMsgCtx);
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- // inserting the new ack.
- retransmitterBeanMgr.insert(ackBean);
-
// passing the message through sandesha2sender
SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
+
SandeshaUtil.startSenderForTheSequence(configurationContext, sequenceId);
msgContext.pause();
@@ -386,8 +385,6 @@
SenderBean ackRequestBean = new SenderBean();
ackRequestBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
// Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
// Otherwise this get send before return of the current request (ack).
@@ -413,9 +410,9 @@
ackRequestBean.setInternalSequenceID(internalSeqenceID);
ackRequestBean.setSequenceID(outSequenceID);
- retramsmitterMgr.insert(ackRequestBean);
-
SandeshaUtil.executeAndStore(ackRequestRMMsg, key);
+
+ retramsmitterMgr.insert(ackRequestBean);
if (log.isDebugEnabled())
log.debug("Exit: AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Tue Nov 28 03:08:23 2006
@@ -577,12 +577,11 @@
createSeqEntry.setToAddress(to.getAddress());
createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- storageManager.storeMessageContext(createSequenceMessageStoreKey, createSeqMsg); // storing the message
+ SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
+
retransmitterMgr.insert(createSeqEntry);
- SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
-
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage");
}
@@ -752,16 +751,16 @@
appMsgEntry.setToAddress(to.getAddress());
appMsgEntry.setInternalSequenceID(internalSequenceId);
- storageManager.storeMessageContext(storageKey, msg);
msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- retransmitterMgr.insert(appMsgEntry);
// increasing the current handler index, so that the message will not be
// going throught the SandeshaOutHandler again.
msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
SandeshaUtil.executeAndStore(rmMsg, storageKey);
+
+ retransmitterMgr.insert(appMsgEntry);
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Tue Nov 28 03:08:23 2006
@@ -249,8 +249,6 @@
SenderBean closeBean = new SenderBean();
closeBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
closeBean.setTimeToSend(System.currentTimeMillis());
closeBean.setMessageID(msgContext.getMessageID());
@@ -272,9 +270,9 @@
closeBean.setSequenceID(outSequenceID);
closeBean.setInternalSequenceID(internalSeqenceID);
- retramsmitterMgr.insert(closeBean);
-
SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+ retramsmitterMgr.insert(closeBean);
if (log.isDebugEnabled())
log.debug("Exit: CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Tue Nov 28 03:08:23 2006
@@ -394,12 +394,9 @@
}
ackBean.setTimeToSend(timeToSend);
- storageManager.storeMessageContext(key, ackMsgCtx);
ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- // inserting the new ack.
- retransmitterBeanMgr.insert(ackBean);
// / asyncAckTransaction.commit();
// passing the message through sandesha2sender
@@ -407,6 +404,9 @@
ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(), sequenceId);
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Nov 28 03:08:23 2006
@@ -393,8 +393,6 @@
SenderBean terminateBean = new SenderBean();
terminateBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
// Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
// Otherwise this get send before return of the current request (ack).
@@ -420,8 +418,6 @@
SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
- retramsmitterMgr.insert(terminateBean);
-
SequencePropertyBean terminateAdded = new SequencePropertyBean();
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
terminateAdded.setSequencePropertyKey(outSequenceID);
@@ -430,7 +426,9 @@
seqPropMgr.insert(terminateAdded);
SandeshaUtil.executeAndStore(rmMsgCtx, key);
-
+
+ retramsmitterMgr.insert(terminateBean);
+
// Pause the message context
rmMsgCtx.pause();
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Tue Nov 28 03:08:23 2006
@@ -127,8 +127,6 @@
makeConnectionRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY,
sequencePropertyKey);
- storageManager.storeMessageContext(makeConnectionMsgStoreKey,makeConnectionRMMessage.getMessageContext());
-
//add an entry for the MakeConnection message to the sender (with ,send=true, resend=false)
SenderBean makeConnectionSenderBean = new SenderBean ();
// makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
@@ -147,9 +145,9 @@
//this message should not be sent until it is qualified. I.e. till it is sent through the Sandesha2TransportSender.
makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- senderBeanMgr.insert(makeConnectionSenderBean);
-
SandeshaUtil.executeAndStore(makeConnectionRMMessage, makeConnectionMsgStoreKey);
+
+ senderBeanMgr.insert(makeConnectionSenderBean);
} catch (SandeshaStorageException e) {
e.printStackTrace();
} catch (SandeshaException e) {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java Tue Nov 28 03:08:23 2006
@@ -73,7 +73,7 @@
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_TRUE);
- storageManager.updateMessageContext(key,msgContext);
+ storageManager.storeMessageContext(key,msgContext);
if (log.isDebugEnabled())
log.debug("Exit: Sandesha2TransportSender::invoke");
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Tue Nov 28 03:08:23 2006
@@ -1070,8 +1070,17 @@
if (msgContext.isPaused())
engine.resumeSend(msgContext);
- else
+ else {
+ //this invocation has to be a blocking one.
+
+ Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+ if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+
engine.send(msgContext);
+
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
+ }
if (log.isDebugEnabled())
log.debug("Exit: SandeshaUtil::executeAndStore");
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java Tue Nov 28 03:08:23 2006
@@ -416,8 +416,6 @@
SenderBean terminateBean = new SenderBean();
terminateBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, terminateRMMessage.getMessageContext());
-
// Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
// Otherwise this get send before return of the current request (ack).
@@ -438,10 +436,6 @@
if (to!=null)
terminateBean.setToAddress(to.getAddress());
- SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
- retramsmitterMgr.insert(terminateBean);
-
SequencePropertyBean terminateAdded = new SequencePropertyBean();
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
terminateAdded.setSequencePropertyKey(outSequenceId);
@@ -456,6 +450,9 @@
// / addTerminateSeqTransaction.commit();
SandeshaUtil.executeAndStore(terminateRMMessage, key);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+ retramsmitterMgr.insert(terminateBean);
if(log.isDebugEnabled())
log.debug("Exit: TerminateManager::addTerminateSequenceMessage");
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org