You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2007/02/28 12:23:00 UTC

svn commit: r512708 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: Sandesha2Constants.java msgprocessors/ApplicationMsgProcessor.java polling/PollingManager.java storage/beans/RMSBean.java util/SandeshaUtil.java util/SequenceManager.java

Author: mlovett
Date: Wed Feb 28 03:22:57 2007
New Revision: 512708

URL: http://svn.apache.org/viewvc?view=rev&rev=512708
Log:
Put the uuid associated with an anonymous endpoint into the RMSBean, instead of on the Service Context

Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Wed Feb 28 03:22:57 2007
@@ -511,8 +511,6 @@
 	
 	static final String RETRANSMITTABLE_PHASES = "RMRetransmittablePhases";
 	
-	static final String RM_ANON_UUID = "RMAnonymousUUID";
-	
 	static final String propertiesToCopyFromReferenceMessage = "propertiesToCopyFromReferenceMessage";
 	
 	static final String propertiesToCopyFromReferenceRequestMessage = "propertiesToCopyFromReferenceRequestMessage";

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Wed Feb 28 03:22:57 2007
@@ -86,18 +86,6 @@
 		MessageContext msgContext = rmMsgCtx.getMessageContext();
 		ConfigurationContext configContext = msgContext.getConfigurationContext();
 
-		// Re-write the WS-A anonymous URI, if we support the RM anonymous URI. We only
-		// need to rewrite the replyTo EPR if we have an out-in MEP.
-		AxisOperation op = msgContext.getAxisOperation();
-		int mep = WSDLConstants.MEP_CONSTANT_INVALID;
-		if(op != null) {
-			mep = op.getAxisSpecifMEPConstant();
-			if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
-				EndpointReference replyTo = SandeshaUtil.rewriteEPR(msgContext.getReplyTo(), msgContext);
-				msgContext.setReplyTo(replyTo);
-			}
-		}
-		
 		// setting the Fault callback
 		SandeshaListener faultCallback = (SandeshaListener) msgContext.getOptions().getProperty(
 				SandeshaClientConstants.SANDESHA_LISTENER);
@@ -273,11 +261,36 @@
 		rmsBean.setHighestOutMessageNumber(messageNumber);
 		
 		// saving the used message number, and the expected reply count
+		boolean startPolling = false;
 		if (!dummyMessage) {
 			rmsBean.setNextMessageNumber(messageNumber);
+
+			// Identify the MEP associated with the message.
+			AxisOperation op = msgContext.getAxisOperation();
+			int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+			if(op != null) {
+				mep = op.getAxisSpecifMEPConstant();
+			}
+
 			if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
 				long expectedReplies = rmsBean.getExpectedReplies();
 				rmsBean.setExpectedReplies(expectedReplies + 1);
+
+				// If we support the RM anonymous URI then rewrite the ws-a anon to use the RM equivalent.
+				EndpointReference oldEndpoint = msgContext.getReplyTo();
+				String oldAddress = (oldEndpoint == null) ? null : oldEndpoint.getAddress(); 
+				EndpointReference newReplyTo = SandeshaUtil.rewriteEPR(rmsBean, msgContext.getReplyTo(), configContext);
+				String newAddress = (newReplyTo == null) ? null : newReplyTo.getAddress();
+				if(newAddress != null && !newAddress.equals(oldAddress)) {
+					// We have rewritten the replyTo. If this is the first message that we have needed to
+					// rewrite then we should set the sequence up for polling, and once we have saved the
+					// changes to the sequence then we can start the polling thread.
+					msgContext.setReplyTo(newReplyTo);
+					if(!rmsBean.isPollingMode()) {
+						rmsBean.setPollingMode(true);
+						startPolling = true;
+					}
+				}
 			}
 		}
 		
@@ -295,6 +308,10 @@
 
 		// Update the rmsBean
 		storageManager.getRMSBeanMgr().update(rmsBean);
+		
+		if(startPolling) {
+			SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);
+		}
 		
 		SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
 		if (env == null) {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Wed Feb 28 03:22:57 2007
@@ -140,7 +140,7 @@
 			boolean cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
 			long  repliesExpected = beanToPoll.getExpectedReplies();
 			if(force ||	!cleanAcks || repliesExpected > 0)
-				pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
+				pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
 		}
 
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide");
@@ -174,31 +174,35 @@
 				}
 			}
 			if(force || doPoll)
-				pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean, entry);
+				pollForSequence(null, null, nextMsgBean.getReferenceMessageKey(), nextMsgBean, entry);
 		}
 
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMDSide");
 	}
 
-	private void pollForSequence(String sequenceId,
-								 String sequencePropertyKey,
+	private void pollForSequence(String anonUUID,       // Only set for RMS polling
+								 String internalSeqId,  // Only set for RMS polling
 								 String referenceMsgKey,
 								 RMSequenceBean rmBean,
 								 SequenceEntry entry)
 	throws SandeshaException, SandeshaStorageException, AxisFault
 	{
-		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey + ", " + referenceMsgKey + ", " + rmBean);
+		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollForSequence, rmBean: " + rmBean);
 		
 		//create a MakeConnection message  
 		String replyTo = rmBean.getReplyToEPR();
 		String wireSeqId = null;
 		String wireAddress = null;
-		if (SandeshaUtil.isWSRMAnonymous(replyTo)) {
+		if(anonUUID != null) {
+			// If we are polling on a RM anon URI then we don't want to include the sequence id
+			// in the MakeConnection message.
+			wireAddress = anonUUID;
+		} else if(SandeshaUtil.isWSRMAnonymous(replyTo)) {
 			// If we are polling on a RM anon URI then we don't want to include the sequence id
 			// in the MakeConnection message.
 			wireAddress = replyTo;
 		} else {
-			wireSeqId = sequenceId;
+			wireSeqId = rmBean.getSequenceID();
 		}
 		
 		MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context);
@@ -218,7 +222,7 @@
 		
 		//add an entry for the MakeConnection message to the sender (with ,send=true, resend=false)
 		SenderBean makeConnectionSenderBean = new SenderBean ();
-		makeConnectionSenderBean.setInternalSequenceID((rmBean instanceof RMSBean) ? sequencePropertyKey : null); // We only have internal ids for the RMS-side
+		makeConnectionSenderBean.setInternalSequenceID(internalSeqId);
 		makeConnectionSenderBean.setMessageContextRefKey(makeConnectionMsgStoreKey);
 		makeConnectionSenderBean.setMessageID(makeConnectionRMMessage.getMessageId());
 		makeConnectionSenderBean.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java Wed Feb 28 03:22:57 2007
@@ -80,6 +80,8 @@
 
 	private String offeredSequence = null;
 	
+	private String anonymousUUID = null;
+	
 	/**
 	 * This is the timestamp of when the last error occured when sending
 	 */
@@ -325,6 +327,14 @@
 		this.rmsFlags |= EXPECTED_REPLIES;
 	}
 
+	public String getAnonymousUUID() {
+		return anonymousUUID;
+	}
+
+	public void setAnonymousUUID(String anonymousUUID) {
+		this.anonymousUUID = anonymousUUID;
+	}
+
 	public String toString() {
 		StringBuffer result = new StringBuffer();
 		result.append(this.getClass().getName());
@@ -351,6 +361,7 @@
 			result.append("\nLastErrorTime    : "); result.append(lastSendErrorTimestamp);
 		}
 		result.append("\nClientCompletedMsgs: "); result.append(clientCompletedMessages);
+		result.append("\nAnonymous UUID     : "); result.append(anonymousUUID);
 		return result.toString();
 	}
 	
@@ -392,6 +403,9 @@
 		else if(bean.getOfferedSequence() != null && !bean.getOfferedSequence().equals(this.getOfferedSequence()))
 			match = false;
 
+		else if(bean.getAnonymousUUID() != null && !bean.getAnonymousUUID().equals(this.getAnonymousUUID()))
+			match = false;
+		
 // Avoid matching on the error information
 //		else if((bean.rmsFlags & LAST_SEND_ERROR_TIME_FLAG) != 0 && bean.getLastSendErrorTimestamp() != this.getLastSendErrorTimestamp())
 //			match = false;

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Wed Feb 28 03:22:57 2007
@@ -947,13 +947,12 @@
     return stackTrace;
 	}
 
-	public static EndpointReference rewriteEPR(EndpointReference epr, MessageContext mc)
+	public static EndpointReference rewriteEPR(RMSBean sourceBean, EndpointReference epr, ConfigurationContext configContext)
 	throws SandeshaException
 	{
 		if (log.isDebugEnabled())
 			log.debug("Enter: SandeshaUtil::rewriteEPR " + epr);
 
-		ConfigurationContext configContext = mc.getConfigurationContext();
 		SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
 		if(!policy.isEnableRMAnonURI()) {
 			if (log.isDebugEnabled())
@@ -969,24 +968,16 @@
 		if(address == null ||
 		   AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
 		   AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
-			// We use the service context to co-ordinate the RM anon uuid, so that several
-			// invocations of the same target will yield stable replyTo addresses.
-			String uuid = null;
-			ServiceContext sc = mc.getServiceContext();
-			if(sc == null) {
-				String msg = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
-				throw new SandeshaException(msg);
-			}
-			synchronized (sc) {
-				uuid = (String) sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
-				if(uuid == null) {
-					uuid = SandeshaUtil.getUUID();
-					sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
-				}
+			// We use the sequence to hold the anonymous uuid, so that messages assigned to the
+			// sequence will use the same UUID to identify themselves
+			String uuid = sourceBean.getAnonymousUUID();
+			if(uuid == null) {
+				uuid = Sandesha2Constants.SPEC_2007_02.ANONYMOUS_URI_PREFIX + SandeshaUtil.getUUID();
+				sourceBean.setAnonymousUUID(uuid);
 			}
 			
-			if(log.isDebugEnabled()) log.debug("Rewriting EPR with UUID " + uuid);
-			epr.setAddress(Sandesha2Constants.SPEC_2007_02.ANONYMOUS_URI_PREFIX + uuid);
+			if(log.isDebugEnabled()) log.debug("Rewriting EPR with anon URI " + uuid);
+			epr.setAddress(uuid);
 		}
 		
 		if (log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java Wed Feb 28 03:22:57 2007
@@ -20,6 +20,7 @@
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.Parameter;
 import org.apache.commons.logging.Log;
@@ -253,8 +254,9 @@
 			}
 		}
 		// In case either of the replyTo or AcksTo is anonymous, rewrite them using the AnonURI template
-		replyToEPR = SandeshaUtil.rewriteEPR(replyToEPR, firstAplicationMsgCtx);
-		acksToEPR = SandeshaUtil.rewriteEPR(acksToEPR, firstAplicationMsgCtx);
+		ConfigurationContext config = firstAplicationMsgCtx.getConfigurationContext();
+		replyToEPR = SandeshaUtil.rewriteEPR(rmsBean, replyToEPR, config);
+		acksToEPR = SandeshaUtil.rewriteEPR(rmsBean, acksToEPR, config);
 		
 		// Store both the acksTo and replyTo 
 		if(replyToEPR != null) rmsBean.setReplyToEPR(replyToEPR.getAddress());



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org