You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2007/12/14 11:55:52 UTC
svn commit: r604158 - in
/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util:
SandeshaUtil.java TerminateManager.java
Author: mckierna
Date: Fri Dec 14 02:55:51 2007
New Revision: 604158
URL: http://svn.apache.org/viewvc?rev=604158&view=rev
Log:
fix some bugs in reallocate
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=604158&r1=604157&r2=604158&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Fri Dec 14 02:55:51 2007
@@ -28,6 +28,7 @@
import java.util.Map;
import javax.xml.namespace.QName;
+import javax.xml.stream.events.Characters;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.CopyUtils;
@@ -1078,14 +1079,17 @@
//internal sequence ID is different
String internalSequenceID = oldRMSBean.getInternalSequenceID();
//we also need to obtain the sequenceKey from the internalSequenceID.
- String sequenceKey =
+ String oldSequenceKey =
SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID, oldRMSBean.getToEndpointReference().getAddress());
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+ //remove the old sequence key from the internal sequence ID
+ internalSequenceID = internalSequenceID.substring(0, internalSequenceID.length()-oldSequenceKey.length());
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,
+ SandeshaUtil.getUUID()); //using a new sequence Key to differentiate from the old sequence
options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion());
options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE);
- //send the msgs
+ //send the msgs - this will setup a new sequence to the same endpoint
Iterator it = msgsToSend.iterator();
while(it.hasNext()){
MessageContext msgCtx = (MessageContext)it.next();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=604158&r1=604157&r2=604158&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Fri Dec 14 02:55:51 2007
@@ -35,6 +35,7 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.SandeshaStorageException;
@@ -271,7 +272,7 @@
long lastAckedMsg = -1;
if(ranges.length==1){
- //a single contiguous acked range
+ //the sequence is a single contiguous acked range
lastAckedMsg = ranges[0].upperValue;
}
else{
@@ -286,20 +287,50 @@
if(retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ || rmsBean.isTerminated()){
//remove all but terminate sequence messages
String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+ //if we have been asked to reallocate we need to send all unacked messages to a new sequence.
+ //We must ensure that we rerieve these messages in the correct order
if(reallocateIfPossible
- && retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.APPLICATION
+ && retransmitterBean.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION
&& retransmitterBean.getMessageNumber()==lastAckedMsg+1){
- //try to reallocate application msgs
+ if(log.isDebugEnabled())
+ log.debug("adding message for reallocate: " + retransmitterBean.getMessageNumber());
+
+ //try to reallocate application msgs that are next in the outgoing list to
msgsToReallocate.add(storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
+ retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+ storageManager.removeMessageContext(messageStoreKey);
lastAckedMsg++;
}
- retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
- storageManager.removeMessageContext(messageStoreKey);
+ else if(reallocateIfPossible){
+ //we are reallocating but this message does not fit the criteria. We should not delete it
+ if(log.isDebugEnabled())
+ log.debug("cannot reallocate: " + retransmitterBean.getMessageNumber());
+ if(msgsToReallocate.size()==0){
+ try{
+ //however we might need this message if there are no messages to reallocate but we still
+ //need a new sequence - we use a dummy message
+ MessageContext dummy = SandeshaUtil.cloneMessageContext(
+ storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
+ dummy.getOptions().setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_TRUE);
+ msgsToReallocate.add(dummy);
+ }
+ catch(Exception e){
+ if(log.isDebugEnabled())
+ log.debug("Exit: TerminateManager::cleanSendingSideData " + e);
+ throw new SandeshaStorageException(e);
+ }
+ }
+ }
+ else{
+ //we are not reallocating so just delete the messages
+ retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+ storageManager.removeMessageContext(messageStoreKey);
+ }
}
}
- if(reallocateIfPossible && msgsToReallocate.size()>0){
+ if(reallocateIfPossible){
try{
SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate);
reallocatedOK = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org