You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by pa...@apache.org on 2009/04/03 20:27:57 UTC

svn commit: r761754 - in /webservices/sandesha/trunk/java/modules/core/src/main: java/org/apache/sandesha2/ java/org/apache/sandesha2/client/ java/org/apache/sandesha2/i18n/ java/org/apache/sandesha2/msgprocessors/ java/org/apache/sandesha2/storage/bea...

Author: parsonsd
Date: Fri Apr  3 18:27:57 2009
New Revision: 761754

URL: http://svn.apache.org/viewvc?rev=761754&view=rev
Log:
Fix to allow automatic reallocation of sequences that have timed out or been deleted.  The solution is to have a reallocated RMSBean point at the RMSBean created as part of the reallocation via a new RMSBean attribute that contains the internalSeqID of the newly created RMSBean.

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
    webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java Fri Apr  3 18:27:57 2009
@@ -245,6 +245,23 @@
 		String ENDPOINT = "Endpoint";
 		
 		String UNSUPPORTED_ELEMENT = "UnsupportedElement";
+		
+		//This is to identify an RMSBean that hasn't been reallocated
+		int NOT_REALLOCATED = 0;
+		
+		//This is to identify an RMSBean that is to be reallocated or has been reallocated
+		int REALLOCATED = 1;
+		
+		//This is to identify an RMSBean that was created for reallocation but then was reallocated itself
+		//That way we know it can be deleted
+		int ORIGINAL_REALLOCATED_BEAN_COMPLETE = 2;
+		
+		//This is to identify the RMS Bean that was created to reallocate another RMSBean 
+		int RMS_BEAN_USED_FOR_REALLOCATION = 3;
+		
+		//This is to identify an RMSBean that was attempted to be reallocated but for some reason the reallocation failed.
+		int REALLOCATION_FAILED = -1;
+		
 	}
 
 	public interface WSA {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java Fri Apr  3 18:27:57 2009
@@ -443,8 +443,19 @@
 			if (terminatedSequence) {		
 				// Delete the rmsBean
 				storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+				
+				if(tran != null && tran.isActive()) tran.commit();
+				tran = storageManager.getTransaction();				
+				
+				//Need to check if it's an RMSBean created for reallocation.  If so we need to				
+				//delete the original RMSBean that was reallocated.				
+				RMSBean reallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, rmsBean.getInternalSequenceID());
+				if(reallocatedRMSBean != null){					
+					if (log.isDebugEnabled())
+						log.debug("Removing Reallocated RMSBean " + reallocatedRMSBean);
+					storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID());
+				}
 			}
-			
 			if(tran != null && tran.isActive()) tran.commit();
 			tran = null;
 		

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Fri Apr  3 18:27:57 2009
@@ -79,8 +79,8 @@
 	public static final String propertyInvalidValue="propertyInvalidValue";
 	public static final String invalidRange="invalidRange";
 	public static final String workAlreadyAssigned="workAlreadyAssigned";
-	public static final String reallocationFailed="reallocationFailed"; 
-
+	public static final String reallocationFailed="reallocationFailed";
+	public static final String reallocationForSyncRequestReplyNotSupported="reallocationForSyncRequestReplyNotSupported";
 
 	public static final String rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";
 	public static final String unknownWSAVersion="unknownWSAVersion";

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Fri Apr  3 18:27:57 2009
@@ -177,8 +177,6 @@
 		if (msgContext.getMessageID() == null)
 			msgContext.setMessageID(SandeshaUtil.getUUID());
 
-		
-
 		/*
 		 * Internal sequence id is the one used to refer to the sequence (since
 		 * actual sequence id is not available when first msg arrives) server
@@ -230,13 +228,47 @@
 
 		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
 
+		boolean autoStartNewSeqForReallocation = false;
 		//if this is an existing sequence then we need to do some checks first
 		if(rmsBean != null)
 		{
+			//If the sequence has been reallocated we need to find out the new internalSeqID.
+			//If the internalSeqID hasn't been set yet we should auto restart.  If it has a new
+			//internalSeqID we just send the message on the new reallocated sequence. 
+			int seqReallocated = rmsBean.isReallocated();
+			if(seqReallocated == Sandesha2Constants.WSRM_COMMON.REALLOCATED){
+				if (log.isDebugEnabled())
+					log.debug("ApplicationMsgProcessor: Reallocated Sequence: " + rmsBean.getSequenceID());
+				//Try and get the new internalSeqID
+				internalSequenceId = rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
+				if(internalSequenceId != null){
+					if (log.isDebugEnabled())
+						log.debug("ApplicationMsgProcessor: InternalSeqID of new sequence: " + internalSequenceId);
+					rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceId);
+					rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+				} else {
+					autoStartNewSeqForReallocation = true;
+				}
+			} else if(seqReallocated == Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED){
+				//We can't do anymore as we have already tried to reallocate this sequence.
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID(),
+						"We have already attempted to reallocate this Sequence and we won't try again.  The sequance needs to be cleaned up manually."));
+			}
+
 			//see if the sequence is closed
-			if(rmsBean.isSequenceClosedClient() || rmsBean.isTerminateAdded() || rmsBean.isTimedOut()){
+			if(rmsBean.isSequenceClosedClient() || rmsBean.isTerminateAdded() || rmsBean.isTimedOut() || autoStartNewSeqForReallocation){
 				if(SandeshaUtil.isAutoStartNewSequence(msgContext)){
 					internalSequenceId = getSequenceID(rmMsgCtx, serverSide, true); //require a new sequence
+					if(autoStartNewSeqForReallocation){
+						if (log.isDebugEnabled())
+							log.debug("ApplicationMsgProcessor: autoStartNewSeqForReallocation: InternalSeqID of new sequence used for reallocation: " 
+										+ internalSequenceId);
+						rmsBean.setInternalSeqIDOfSeqUsedForReallocation(internalSequenceId);
+						storageManager.getRMSBeanMgr().update(rmsBean);
+						
+						if(tran != null && tran.isActive()) tran.commit();
+						tran = storageManager.getTransaction();		
+					}
 					if (log.isDebugEnabled())
 						log.debug("ApplicationMsgProcessor: auto start new sequence " + internalSequenceId + " :: " + rmsBean);
 					//set this new internal sequence ID on the msg
@@ -337,6 +369,11 @@
 					if (rmsBean == null) {
 						rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
 						rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+
+						if(autoStartNewSeqForReallocation){
+							rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.RMS_BEAN_USED_FOR_REALLOCATION);
+						}
+
 						if(rmsBean != null) outSequenceID = rmsBean.getSequenceID();
 						
 						if (rmsBean == null && appMsgProcTran != null && appMsgProcTran.isActive()) {
@@ -348,7 +385,6 @@
 							appMsgProcTran = storageManager.getTransaction();
 						}
 					}
-
 				}
 	
 			} else {
@@ -554,6 +590,7 @@
 		
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
+
 		return true;
 	}
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri Apr  3 18:27:57 2009
@@ -154,7 +154,7 @@
 		if(!rmsBeanMgr.update(rmsBean)){			
 			//Im not setting the createSeqBean sender bean to resend true as the reallocation of msgs will do this
 			try{
-				TerminateManager.terminateSendingSide(rmsBean, storageManager, true);
+				TerminateManager.terminateSendingSide(rmsBean, storageManager, true, transaction);
 			} catch(Exception e){
 				if (log.isDebugEnabled())
 					log.debug(e);					

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Fri Apr  3 18:27:57 2009
@@ -74,7 +74,7 @@
 			}
 		}
 
-		TerminateManager.terminateSendingSide (rmsBean, storageManager, false);
+		TerminateManager.terminateSendingSide (rmsBean, storageManager, false, null);
 		
 		// Stop this message travelling further through the Axis runtime
 		terminateResRMMsg.pause();

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java Fri Apr  3 18:27:57 2009
@@ -19,6 +19,7 @@
 
 package org.apache.sandesha2.storage.beans;
 
+import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.util.Range;
 import org.apache.sandesha2.util.RangeString;
 
@@ -152,6 +153,22 @@
 	 * be ignored within the match method.
 	 */
 	private int rmsFlags = 0;
+	
+	/**
+	 * Indicates the reallocation state.  The states can be either:
+	 * notReallocated - The bean hasn't been reallocated
+	 * reallocated - The bean is to be reallocated
+	 * ReallocatedBeanComplete - The bean was created for reallocation but is no longer needed as itself has been reallocated
+	 * BeanUsedForReallocation - The bean was created for reallocation
+	 * ReallocationFailed - The reallocation of this bean failed
+	 */
+	private int reallocated = Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED;
+	
+	/**
+	 * Contains the internalSeqID of the seq that has sent the reallocated msgs
+	 */
+	private String internalSeqIDOfSeqUsedForReallocation = null;
+	
 	public static final int LAST_SEND_ERROR_TIME_FLAG = 0x00000001;
 	public static final int LAST_OUT_MSG_FLAG         = 0x00000010;
 	public static final int HIGHEST_OUT_MSG_FLAG      = 0x00000100;
@@ -195,7 +212,9 @@
 		 terminationPauserForCS = beanToCopy.isTerminationPauserForCS();
 		 timedOut = beanToCopy.isTimedOut();
 		 transportTo = beanToCopy.getTransportTo();
-		 avoidAutoTermination = beanToCopy.isAvoidAutoTermination();		
+		 avoidAutoTermination = beanToCopy.isAvoidAutoTermination();	
+		 reallocated = beanToCopy.isReallocated();
+		 internalSeqIDOfSeqUsedForReallocation = beanToCopy.getInternalSeqIDOfSeqUsedForReallocation();
 	}
 
 	public String getCreateSeqMsgID() {
@@ -434,6 +453,8 @@
 		result.append("\nClientCompletedMsgs: "); result.append(clientCompletedMessages);
 		result.append("\nAnonymous UUID     : "); result.append(anonymousUUID);
 		result.append("\nSOAPVersion  : "); result.append(soapVersion);
+		result.append("\nReallocated  : "); result.append(reallocated);
+		result.append("\nInternalSeqIDOfSeqUsedForReallocation  : "); result.append(internalSeqIDOfSeqUsedForReallocation);
 		return result.toString();
 	}
 	
@@ -478,6 +499,9 @@
 		else if(bean.getAnonymousUUID() != null && !bean.getAnonymousUUID().equals(this.getAnonymousUUID()))
 			match = false;
 		
+		else if((bean.getInternalSeqIDOfSeqUsedForReallocation() != null && !bean.getInternalSeqIDOfSeqUsedForReallocation().equals(this.getInternalSeqIDOfSeqUsedForReallocation())))
+			match = false;
+		
 // Avoid matching on the error information
 //		else if((bean.rmsFlags & LAST_SEND_ERROR_TIME_FLAG) != 0 && bean.getLastSendErrorTimestamp() != this.getLastSendErrorTimestamp())
 //			match = false;
@@ -511,8 +535,26 @@
 
 		else if((bean.rmsFlags & EXPECTED_REPLIES) != 0 && bean.getExpectedReplies() != this.getExpectedReplies())
 			match = false;
+		
+
 
 		return match;
 	}
 
+	public int isReallocated() {
+		return reallocated;
+	}
+
+	public void setReallocated(int reallocated) {
+		this.reallocated = reallocated;
+	}
+
+	public String getInternalSeqIDOfSeqUsedForReallocation() {
+		return internalSeqIDOfSeqUsedForReallocation;
+	}
+
+	public void setInternalSeqIDOfSeqUsedForReallocation(String internalSeqIDOfSeqUsedForReallocation) {
+		this.internalSeqIDOfSeqUsedForReallocation = internalSeqIDOfSeqUsedForReallocation;
+	}
+
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Fri Apr  3 18:27:57 2009
@@ -619,7 +619,7 @@
 					if (log.isDebugEnabled())
 						log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader());
 	
-					// Sending the message
+					//Sending the message
 					//having a surrounded try block will make sure that the error is logged here 
 					//and that this does not disturb the processing of a carrier message.
 					try {
@@ -671,7 +671,7 @@
 		
 	}
 	
-	private static InvocationResponse manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart) throws AxisFault {
+	private static InvocationResponse manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart, Transaction transaction) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: FaultManager::manageIncomingFault");
 		InvocationResponse response = InvocationResponse.CONTINUE;
@@ -743,7 +743,7 @@
 		} else if (Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode) ||
 				Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode) || 
 				Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equals(soapFaultSubcode)) {
-			processSequenceUnknownFault(rmMsgCtx, fault, identifier);
+			processSequenceUnknownFault(rmMsgCtx, fault, identifier, transaction);
 		} 
 		
 		// If the operation is an Sandesha In Only operation, or the fault is a recognised fault,
@@ -783,7 +783,7 @@
 
 	    	// constructing the fault
 	    	AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart, rmMsgCtx);
-	    	response = manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+	    	response = manageIncomingFault (axisFault, rmMsgCtx, faultPart, transaction);
 	    	
 	    	if(transaction != null && transaction.isActive()) transaction.commit();
 	    	transaction = null;
@@ -966,7 +966,7 @@
 		// Cleanup sending side.
 		if (log.isDebugEnabled())
 			log.debug("Terminating sending sequence " + rmsBean);
-		TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+		TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: FaultManager::processCreateSequenceRefusedFault");
@@ -980,7 +980,7 @@
 	 * @param fault
 	 * @param identifier 
 	 */
-	private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID) throws AxisFault {
+	private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID, Transaction transaction) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: FaultManager::processSequenceUnknownFault " + sequenceID);
 
@@ -998,16 +998,16 @@
 			// Cleanup sending side.
 			if (log.isDebugEnabled())
 				log.debug("Terminating sending sequence " + rmsBean);
-			if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true)){
+			if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true, transaction)){
 				// We did not reallocate so we notify the clients of a failure
 				notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
+				
+				//Mark the RMSBean as reallocation failed and update last activation time
+				transaction = storageManager.getTransaction();
+				rmsBean.setLastActivatedTime(System.currentTimeMillis());
+				storageManager.getRMSBeanMgr().update(rmsBean);
+				if(transaction != null && transaction.isActive()) transaction.commit();
 			}
-			
-			// Update the last activated time.
-			rmsBean.setLastActivatedTime(System.currentTimeMillis());
-			
-			// Update the bean in the map
-			storageManager.getRMSBeanMgr().update(rmsBean);
 		}
 		else {
 			RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Fri Apr  3 18:27:57 2009
@@ -44,6 +44,7 @@
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.client.Options;
 import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.client.async.Callback;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -59,6 +60,8 @@
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.engine.Handler;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.util.CallbackReceiver;
 import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,6 +77,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
@@ -1015,10 +1019,22 @@
 		            
 		return targetEnv;
 	}
-	
-	public static void reallocateMessagesToNewSequence(StorageManager storageManager, RMSBean oldRMSBean, List<MessageContext> msgsToSend)throws AxisFault{
-	    if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
-	        log.debug("Enter: SandeshaUtil::reallocateMessagesToNewSequence");
+
+
+	/** 
+	* ReallocateMessages to a new sequence
+	* @param storageManager
+	* @param oldRMSBean
+	* @param msgsToSend
+	* @param transaction
+	* 
+	*/
+	public static void reallocateMessagesToNewSequence(StorageManager storageManager, RMSBean oldRMSBean, 
+		List<MessageContext> msgsToSend, Transaction transaction)
+		throws AxisFault, SandeshaException{
+
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Enter: SandeshaUtil::reallocateMessagesToNewSequence");
 	    
 		ConfigurationContext ctx = storageManager.getContext();
 		ServiceClient client = new ServiceClient(ctx,  null);
@@ -1027,30 +1043,68 @@
 		Options options = client.getOptions();
 		options.setTo(oldRMSBean.getToEndpointReference());
 		options.setReplyTo(oldRMSBean.getReplyToEndpointReference());
-		
-        //internal sequence ID is different
-        String internalSequenceID = oldRMSBean.getInternalSequenceID();
-        //we also need to obtain the sequenceKey from the internalSequenceID.
-        String oldSequenceKey = 
-          SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID, oldRMSBean.getToEndpointReference().getAddress());
-        //remove the old sequence key from the internal sequence ID
-        internalSequenceID = internalSequenceID.substring(0, internalSequenceID.length()-oldSequenceKey.length());
-        options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, 
-        		SandeshaUtil.getUUID()); //using a new sequence Key to differentiate from the old sequence 
-        options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
-        options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion());
-      	options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE);
-      	
-        //send the msgs - this will setup a new sequence to the same endpoint
-      	Iterator<MessageContext> it = msgsToSend.iterator();
-      	while(it.hasNext()){
-      		MessageContext msgCtx = (MessageContext)it.next();
-      		client.getOptions().setAction(msgCtx.getWSAAction());
-      		client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
-      	}
-      	
-	    if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
-	        log.debug("Exit: SandeshaUtil::reallocateMessagesToNewSequence");
+
+		//internal sequence ID is different
+		String internalSequenceID = oldRMSBean.getInternalSequenceID();
+
+		options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
+		options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion());
+		options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE);
+
+		//Update the RMSBean so as to mark it as reallocated if it isn't an RMSbean created for a previous reallocation
+		RMSBean originallyReallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, oldRMSBean.getInternalSequenceID());
+		if(originallyReallocatedRMSBean == null){
+			oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATED);
+			storageManager.getRMSBeanMgr().update(oldRMSBean);
+		} else {
+			options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, originallyReallocatedRMSBean.getInternalSequenceID());
+			originallyReallocatedRMSBean.setInternalSeqIDOfSeqUsedForReallocation(null);	
+			storageManager.getRMSBeanMgr().update(originallyReallocatedRMSBean);
+
+			//Setting this property so that the bean can be deleted
+			oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE);
+			oldRMSBean.setInternalSeqIDOfSeqUsedForReallocation(originallyReallocatedRMSBean.getInternalSequenceID());
+			storageManager.getRMSBeanMgr().update(oldRMSBean);
+		}
+
+		//Commit current transaction that wraps the manageFaultMsg as we are about to start
+		//resending msgs on a new seq and they will need to get a transaction on the 
+		//current thread
+		if(transaction != null && transaction.isActive()) transaction.commit();
+
+		//send the msgs - this will setup a new sequence to the same endpoint
+		Iterator<MessageContext> it = msgsToSend.iterator();
+
+		while(it.hasNext()){
+			MessageContext msgCtx = (MessageContext)it.next();
+
+			//Set the action
+			client.getOptions().setAction(msgCtx.getWSAAction());
+
+			//Set the message ID
+			client.getOptions().setMessageId(msgCtx.getMessageID());
+
+			//Get the AxisOperation
+			AxisOperation axisOperation = msgCtx.getAxisOperation();
+
+			//If it's oneway or async, reallocate
+			//Fail if replyTo is annonymous as this is currently not supported because in twoway we can't get responses back to th eold something
+			if(axisOperation.getAxisSpecificMEPConstant() == WSDLConstants.MEP_CONSTANT_OUT_ONLY){
+				client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
+			} else if (client.getOptions().getReplyTo().hasAnonymousAddress()){
+				oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+				storageManager.getRMSBeanMgr().update(oldRMSBean);
+				throw new SandeshaException(SandeshaMessageKeys.reallocationForSyncRequestReplyNotSupported);
+			} else {
+				MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
+				Object callback = ((CallbackReceiver)msgReceiver).lookupCallback(msgCtx.getMessageID());	
+				client.setAxisService(msgCtx.getAxisService());
+				client.sendReceiveNonBlocking(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement(), (Callback)callback);
+			}
+		}
+
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Exit: SandeshaUtil::reallocateMessagesToNewSequence");
 	}
 
   /**
@@ -1276,4 +1330,16 @@
 		return result;
 	}
 
+	public static RMSBean isLinkedToReallocatedRMSBean(StorageManager storageManager, String internalSeqID) throws SandeshaException {
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isLinkedToReallocatedRMSBean");
+ 
+		//Need to check if it's an RMSBean created for reallocation.
+		RMSBean finderBean = new RMSBean();
+		finderBean.setInternalSeqIDOfSeqUsedForReallocation(internalSeqID);
+		RMSBean reallocatedRMSBean = storageManager.getRMSBeanMgr().findUnique(finderBean);
+	
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isLinkedToReallocatedRMSBean, ReallocatedRMSBean: " + reallocatedRMSBean);
+		return reallocatedRMSBean;
+	}
+
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Fri Apr  3 18:27:57 2009
@@ -25,8 +25,11 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -40,12 +43,13 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.InvokerBean;
 import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 
 /**
@@ -231,14 +235,14 @@
 	 * @return true if the reallocation happened sucessfully
 	 */
 	public static boolean terminateSendingSide(RMSBean rmsBean, 
-			StorageManager storageManager, boolean reallocate) throws SandeshaException {
+			StorageManager storageManager, boolean reallocate, Transaction transaction) throws SandeshaException {
 
 		// Indicate that the sequence is terminated
 		rmsBean.setTerminated(true);
 		rmsBean.setTerminateAdded(true);
 		storageManager.getRMSBeanMgr().update(rmsBean);
 		
-		return cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager, rmsBean, reallocate);
+		return cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager, rmsBean, reallocate, transaction);
 	}
 
 	public static void timeOutSendingSideSequence(String internalSequenceId,
@@ -249,11 +253,11 @@
 		rmsBean.setLastActivatedTime(System.currentTimeMillis());
 		storageManager.getRMSBeanMgr().update(rmsBean);
 
-		cleanSendingSideData(internalSequenceId, storageManager, rmsBean, false);
+		cleanSendingSideData(internalSequenceId, storageManager, rmsBean, false, null);
 	}
 
 	private static boolean cleanSendingSideData(String internalSequenceId, StorageManager storageManager, 
-			RMSBean rmsBean, boolean reallocateIfPossible) throws SandeshaException {
+			RMSBean rmsBean, boolean reallocateIfPossible, Transaction transaction) throws SandeshaException {
 
 		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
 			log.debug("Enter: TerminateManager::cleanSendingSideData " + internalSequenceId + ", " + reallocateIfPossible);
@@ -274,12 +278,15 @@
 		if(ranges.length==1){
 			//the sequence is a single contiguous acked range
 			lastAckedMsg = ranges[0].upperValue;
-		}
-		else{
-			//cannot reallocate as there are gaps
-			reallocateIfPossible=false;
-			if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
-				log.debug("cannot reallocate sequence as there are gaps");
+		} else{
+			if(reallocateIfPossible){
+				//cannot reallocate as there are gaps
+				rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+				storageManager.getRMSBeanMgr().update(rmsBean);
+				reallocateIfPossible=false;
+				if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+					log.debug("cannot reallocate sequence as there are gaps");
+			}
 		}
 		
 		while (iterator.hasNext()) {
@@ -332,14 +339,48 @@
 		
 		if(reallocateIfPossible){
 			try{
-			      SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate);	
-			      reallocatedOK = true;
-			}
-			catch(Exception e){
-				//want that the reallocation failed
+				SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate, transaction);	
+				reallocatedOK = true;
+			
+				//If the reallocation was successful and the RMSBean being reallocated was originally created for reallocation
+				//the RMSBean can be deleted.
+				transaction = storageManager.getTransaction();
+				if(rmsBean.isReallocated() == Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE){
+					rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED);
+					storageManager.getRMSBeanMgr().update(rmsBean);
+				}
+				
+				if(transaction != null && transaction.isActive()) transaction.commit();
+				transaction = null;
+			} catch(Exception e){
+				
 				if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
 					log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID(), e.toString()));				
-			}			
+			
+				//Reallocation Failed
+				//Need to mark any RMSBeans involved as failed so that we don't attempt to send
+				//anymore messages on these seq's.  The client will have to manually reallocate and
+				//administer the sequences.
+				transaction = storageManager.getTransaction();
+				
+				rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+				storageManager.getRMSBeanMgr().update(rmsBean);
+				
+				String intSeqIDOfOriginallyReallocatedSeq = rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
+				if(intSeqIDOfOriginallyReallocatedSeq != null){
+					RMSBean origRMSBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, intSeqIDOfOriginallyReallocatedSeq);
+					origRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+					storageManager.getRMSBeanMgr().update(origRMSBean);
+				}
+				
+				 if(transaction != null && transaction.isActive()) transaction.commit();
+					transaction = null;
+				
+			} finally {
+				if (transaction != null && transaction.isActive()) {
+					transaction.rollback();
+				}
+			}		
 		}
 		
 		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Fri Apr  3 18:27:57 2009
@@ -427,7 +427,7 @@
 
 	private void deleteRMSBeans(List<RMSBean> rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime)
 
-	throws SandeshaStorageException {
+	throws SandeshaStorageException, SandeshaException {
 		if (log.isDebugEnabled())
 			log.debug("Enter: Sender::deleteRMSBeans");
 
@@ -437,12 +437,24 @@
 			RMSBean rmsBean = (RMSBean) beans.next();
 			long timeNow = System.currentTimeMillis();
 			long lastActivated = rmsBean.getLastActivatedTime();
+
 			// delete sequences that have been timedout or deleted for more than
 			// the SequenceRemovalTimeoutInterval
-
-			if ((lastActivated + deleteTime) < timeNow) {
+			if (((lastActivated + deleteTime) < timeNow) &&
+				(rmsBean.isReallocated() == Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED)) {
 				if (log.isDebugEnabled())
 					log.debug("Removing RMSBean " + rmsBean);
+
+				//Need to check if it's an RMSBean created for reallocation.  If so we need to
+				//delete the original RMSBean that was reallocated.
+				RMSBean reallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, rmsBean.getInternalSequenceID());
+				
+				if(reallocatedRMSBean != null){
+					if (log.isDebugEnabled())
+						log.debug("Removing Reallocated RMSBean " + reallocatedRMSBean);
+					storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID());
+				}
+
 				storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
 				storageManager.removeMessageContext(rmsBean.getReferenceMessageStoreKey());
 			}
@@ -616,7 +628,7 @@
 					
 					// Mark the sequence as terminated
 					RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(manager, id);
-					TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+					TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
 					
 					if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages.  Orphaned message of type TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found.  Deleting this message with a sequence ID of : " + id);
 					// Delete the terminate sender bean.

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Fri Apr  3 18:27:57 2009
@@ -418,7 +418,7 @@
 					String sequenceID = terminateSequence.getIdentifier().getIdentifier();
 	
 					RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
-					TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+					TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
 					
 					if(transaction != null && transaction.isActive()) transaction.commit();
 					transaction = null;

Modified: webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties Fri Apr  3 18:27:57 2009
@@ -82,7 +82,8 @@
 msgContextNotSet=Sandesha2 Internal Error: ''MessageContext'' is null.
 transportOutNotPresent=Sandesha2 Internal Error: original transport sender is not present.
 workAlreadyAssigned=Work ''{0}'' is already assigned to a different Worker. Will try the next one.
-reallocationFailed=The sequence ''{0}'' could not be reallocated due to the error ''{1}''.
+reallocationFailed=Reallocation of msgs from sequence ''{0}'' failed, ''{1}''.
+reallocationForSyncRequestReplyNotSupported=Reallocation for sync requestReply not supported.
 couldNotFindOperation=Could not find operation for message type {0} and spec level {1}.
 cannotChooseAcksTo=Could not find an appropriate acksTo for the reply sequence, given inbound sequence {0} and bean info {1}.
 cannotChooseSpecLevel=Could not find an appropriate specification level for the reply sequence, given inbound sequence {0} and bean info {1}.



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org