You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2007/12/14 11:55:52 UTC

svn commit: r604158 - in /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util: SandeshaUtil.java TerminateManager.java

Author: mckierna
Date: Fri Dec 14 02:55:51 2007
New Revision: 604158

URL: http://svn.apache.org/viewvc?rev=604158&view=rev
Log:
fix some bugs in reallocate

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=604158&r1=604157&r2=604158&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Fri Dec 14 02:55:51 2007
@@ -28,6 +28,7 @@
 import java.util.Map;
 
 import javax.xml.namespace.QName;
+import javax.xml.stream.events.Characters;
 
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.util.CopyUtils;
@@ -1078,14 +1079,17 @@
         //internal sequence ID is different
         String internalSequenceID = oldRMSBean.getInternalSequenceID();
         //we also need to obtain the sequenceKey from the internalSequenceID.
-        String sequenceKey = 
+        String oldSequenceKey = 
           SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID, oldRMSBean.getToEndpointReference().getAddress());
-        options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey); 
+        //remove the old sequence key from the internal sequence ID
+        internalSequenceID = internalSequenceID.substring(0, internalSequenceID.length()-oldSequenceKey.length());
+        options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, 
+        		SandeshaUtil.getUUID()); //using a new sequence Key to differentiate from the old sequence 
         options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
         options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion());
       	options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE);
       	
-        //send the msgs
+        //send the msgs - this will setup a new sequence to the same endpoint
       	Iterator it = msgsToSend.iterator();
       	while(it.hasNext()){
       		MessageContext msgCtx = (MessageContext)it.next();

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=604158&r1=604157&r2=604158&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Fri Dec 14 02:55:51 2007
@@ -35,6 +35,7 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
@@ -271,7 +272,7 @@
 		long lastAckedMsg = -1;
 		
 		if(ranges.length==1){
-			//a single contiguous acked range
+			//the sequence is a single contiguous acked range
 			lastAckedMsg = ranges[0].upperValue;
 		}
 		else{
@@ -286,20 +287,50 @@
 			if(retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ || rmsBean.isTerminated()){
 				//remove all but terminate sequence messages
 				String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+				//if we have been asked to reallocate we need to send all unacked messages to a new sequence.
+				//We must ensure that we rerieve these messages in the correct order 
 				if(reallocateIfPossible
-					&& retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.APPLICATION
+					&& retransmitterBean.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION
 					&& retransmitterBean.getMessageNumber()==lastAckedMsg+1){
 					
-					//try to reallocate application msgs
+					if(log.isDebugEnabled())
+						log.debug("adding message for reallocate: " + retransmitterBean.getMessageNumber());
+					
+					//try to reallocate application msgs that are next in the outgoing list to 
 					msgsToReallocate.add(storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
+					retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+					storageManager.removeMessageContext(messageStoreKey);	
 					lastAckedMsg++;
 				}
-				retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
-				storageManager.removeMessageContext(messageStoreKey);				
+				else if(reallocateIfPossible){
+					//we are reallocating but this message does not fit the criteria. We should not delete it
+					if(log.isDebugEnabled())
+						log.debug("cannot reallocate: " + retransmitterBean.getMessageNumber());
+					if(msgsToReallocate.size()==0){
+						try{
+							//however we might need this message if there are no messages to reallocate but we still
+							//need a new sequence - we use a dummy message
+							MessageContext dummy = SandeshaUtil.cloneMessageContext(
+									storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
+							dummy.getOptions().setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_TRUE);	
+							msgsToReallocate.add(dummy);							
+						}
+						catch(Exception e){
+							if(log.isDebugEnabled())
+								log.debug("Exit: TerminateManager::cleanSendingSideData " + e);
+							throw new SandeshaStorageException(e);
+						}
+					}
+				}
+				else{
+					//we are not reallocating so just delete the messages
+					retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+					storageManager.removeMessageContext(messageStoreKey);						
+				}
 			}
 		}
 		
-		if(reallocateIfPossible && msgsToReallocate.size()>0){
+		if(reallocateIfPossible){
 			try{
 			      SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate);	
 			      reallocatedOK = true;



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org