You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2007/02/15 15:20:24 UTC

svn commit: r507936 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: ./ handlers/ msgprocessors/ polling/ storage/beans/ util/ workers/

Author: mlovett
Date: Thu Feb 15 06:20:22 2007
New Revision: 507936

URL: http://svn.apache.org/viewvc?view=rev&rev=507936
Log:
Improve performance of MakeConnection by polling less often, and paying attention to MessagePending headers

Added:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java   (with props)
Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Thu Feb 15 06:20:22 2007
@@ -502,6 +502,7 @@
 		static final String INBOUND_SEQUENCE_ID    = "Sandesha2InboundSequenceId";
 		static final String INBOUND_MESSAGE_NUMBER = "Sandesha2InboundMessageNumber";
 		static final String INBOUND_LAST_MESSAGE   = "Sandesha2InboundLastMessage";
+		static final String MAKECONNECTION_ENTRY   = "Sandesha2MakeConnectionEntry";
 	}
     
     public interface Assertions {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Thu Feb 15 06:20:22 2007
@@ -34,6 +34,7 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.msgprocessors.AckRequestedProcessor;
 import org.apache.sandesha2.msgprocessors.AcknowledgementProcessor;
+import org.apache.sandesha2.msgprocessors.MessagePendingProcessor;
 import org.apache.sandesha2.msgprocessors.SequenceProcessor;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
@@ -115,6 +116,10 @@
 			if(reqProcessor.processAckRequestedHeaders(rmMsgCtx)){
 				returnValue = InvocationResponse.SUSPEND;
 			}
+			
+			// Process MessagePending headers
+			MessagePendingProcessor pendingProcessor = new MessagePendingProcessor();
+			pendingProcessor.processMessagePendingHeaders(rmMsgCtx);
 
 			// Process the Sequence header, if there is one
 			SequenceProcessor seqProcessor = new SequenceProcessor();

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Feb 15 06:20:22 2007
@@ -20,13 +20,11 @@
 import org.apache.axiom.soap.SOAPBody;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.ServiceContext;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.commons.logging.Log;
@@ -38,7 +36,6 @@
 import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.policy.SandeshaPolicyBean;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
@@ -90,15 +87,13 @@
 
 		// Re-write the WS-A anonymous URI, if we support the RM anonymous URI. We only
 		// need to rewrite the replyTo EPR if we have an out-in MEP.
-		SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
-		if(policy.isEnableRMAnonURI()) {
-			AxisOperation op = msgContext.getAxisOperation();
-			if(op != null) {
-				int mep = op.getAxisSpecifMEPConstant();
-				if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
-					EndpointReference replyTo = rewriteEPR(msgContext.getReplyTo(), msgContext);
-					msgContext.setReplyTo(replyTo);
-				}
+		AxisOperation op = msgContext.getAxisOperation();
+		int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+		if(op != null) {
+			mep = op.getAxisSpecifMEPConstant();
+			if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+				EndpointReference replyTo = SandeshaUtil.rewriteEPR(msgContext.getReplyTo(), msgContext);
+				msgContext.setReplyTo(replyTo);
 			}
 		}
 		
@@ -279,9 +274,14 @@
 		// set this as the response highest message.
 		rmsBean.setHighestOutMessageNumber(messageNumber);
 		
-		// saving the used message number
-		if (!dummyMessage)
+		// saving the used message number, and the expected reply count
+		if (!dummyMessage) {
 			rmsBean.setNextMessageNumber(messageNumber);
+			if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+				long expectedReplies = rmsBean.getExpectedReplies();
+				rmsBean.setExpectedReplies(expectedReplies + 1);
+			}
+		}
 		
 		RelatesTo relatesTo = msgContext.getRelatesTo();
 		if(relatesTo != null) {
@@ -491,45 +491,6 @@
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
-	}
-
-	private EndpointReference rewriteEPR(EndpointReference epr, MessageContext mc)
-	throws SandeshaException
-	{
-		if (log.isDebugEnabled())
-			log.debug("Exit: SandeshaOutHandler::rewriteEPR " + epr);
-
-		// Handle EPRs that have not yet been set. These are effectively WS-A anon, and therefore
-		// we can rewrite them.
-		if(epr == null) epr = new EndpointReference(null);
-		
-		String address = epr.getAddress();
-		if(address == null ||
-		   AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
-		   AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
-			// We use the service context to co-ordinate the RM anon uuid, so that several
-			// invocations of the same target will yield stable replyTo addresses.
-			String uuid = null;
-			ServiceContext sc = mc.getServiceContext();
-			if(sc == null) {
-				String msg = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
-				throw new SandeshaException(msg);
-			}
-			synchronized (sc) {
-				uuid = (String) sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
-				if(uuid == null) {
-					uuid = SandeshaUtil.getUUID();
-					sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
-				}
-			}
-			
-			if(log.isDebugEnabled()) log.debug("Rewriting EPR with UUID " + uuid);
-			epr.setAddress(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX + uuid);
-		}
-		
-		if (log.isDebugEnabled())
-			log.debug("Exit: SandeshaOutHandler::rewriteEPR " + epr);
-		return epr;
 	}
 
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Feb 15 06:20:22 2007
@@ -164,6 +164,7 @@
 			rMDBean.setAcksToEPR(acksToEPR.getAddress());
 			rMDBean.setSequenceID(rmsBean.getOfferedSequence());
 			rMDBean.setNextMsgNoToProcess(1);
+			rMDBean.setOutboundSequence(rmsBean.getSequenceID());
 
 			//Storing the referenceMessage of the sending side sequence as the reference message
 			//of the receiving side as well.

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Thu Feb 15 06:20:22 2007
@@ -10,7 +10,6 @@
 import org.apache.axis2.context.ContextFactory;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.TransportOutDescription;
 import org.apache.commons.logging.Log;

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java Thu Feb 15 06:20:22 2007
@@ -2,43 +2,35 @@
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.polling.PollingManager;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SequenceEntry;
 import org.apache.sandesha2.wsrm.MessagePending;
-import org.apache.sandesha2.wsrm.Sequence;
 
 public class MessagePendingProcessor {
 
 	private static final Log log = LogFactory.getLog(MessagePendingProcessor.class);
 	
-	public boolean processMessagePendingHeaders (MessageContext message) throws AxisFault {
+	public void processMessagePendingHeaders(RMMsgContext message) throws AxisFault {
 		
 		if (log.isDebugEnabled())
 			log.debug("Enter: MessagePendingProcessor::processMessagePendingHeaders");
 
-		boolean messagePaused = false;
-		
-		RMMsgContext rmMsgContext = MsgInitializer.initializeMessage(message);
-		Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-		MessagePending messagePending = (MessagePending) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
-		
-		if (sequence!=null) {
-			String sequenceId = sequence.getIdentifier().getIdentifier();
-			
-			if (messagePending!=null) {
-				boolean pending = messagePending.isPending();
-				if (pending) {
-					ConfigurationContext context = rmMsgContext.getConfigurationContext();
+		MessagePending messagePending = (MessagePending) message.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
+		if (messagePending!=null) {
+			boolean pending = messagePending.isPending();
+			if (pending) {
+				SequenceEntry entry = (SequenceEntry) message.getProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY);
+				if(entry != null) {
+					ConfigurationContext context = message.getConfigurationContext();
 					StorageManager storage = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
 					PollingManager pollingManager = storage.getPollingManager();
-					if(pollingManager != null) pollingManager.schedulePollingRequest(sequenceId, false);
+					if(pollingManager != null) pollingManager.schedulePollingRequest(entry.getSequenceId(), entry.isRmSource());
 				}
 			}
 		}
@@ -47,8 +39,6 @@
 		
 		if (log.isDebugEnabled())
 			log.debug("Exit: MessagePendingProcessor::processMessagePendingHeaders");
-
-		return messagePaused;
 	}
 
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Thu Feb 15 06:20:22 2007
@@ -43,6 +43,7 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.InvokerBean;
 import org.apache.sandesha2.storage.beans.RMDBean;
@@ -263,6 +264,18 @@
 		if (!msgNoPresentInList)
 		{
 			serverCompletedMessageRanges.addRange(new Range(msgNo));
+		}
+		
+		// If the message is a reply to an outbound message then we can update the RMSBean that
+		// matches.
+		String outboundSequence = bean.getOutboundSequence();
+		if(outboundSequence != null) {
+			RMSBean outBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outboundSequence);
+			if(outBean != null && outBean.getExpectedReplies() > 0 ) {
+				outBean.setExpectedReplies(outBean.getExpectedReplies() - 1);
+				RMSBeanMgr outMgr = storageManager.getRMSBeanMgr();
+				outMgr.update(outBean);
+			}
 		}
 		
 		// Update the RMD bean

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Thu Feb 15 06:20:22 2007
@@ -23,6 +23,7 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
@@ -37,10 +38,12 @@
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.RMSequenceBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SequenceEntry;
 
 /**
  * This class is responsible for sending MakeConnection requests. This is a seperate thread that
@@ -68,11 +71,14 @@
 		Transaction t = null;
 		try {
 			// If we have request scheduled, handle them first, and then pick
-			// pick a sequence using a round-robin approach.
+			// pick a sequence using a round-robin approach. Scheduled polls
+			// bypass the normal polling checks, to make sure that they happen
+			boolean forcePoll = false;
 			SequenceEntry entry = null;
 			synchronized (this) {
 				if(!scheduledPollingRequests.isEmpty()) {
 					entry = (SequenceEntry) scheduledPollingRequests.removeFirst();
+					forcePoll = true;
 				}
 			}
 			if(entry == null) {
@@ -93,9 +99,9 @@
 
 			t = storageManager.getTransaction();
 			if(entry.isRmSource()) {
-				pollRMSSide(entry);
+				pollRMSSide(entry, forcePoll);
 			} else {
-				pollRMDSide(entry);
+				pollRMDSide(entry, forcePoll);
 			}
 			if(t != null) t.commit();
 			t = null;
@@ -115,8 +121,8 @@
 		return false;
 	}
 	
-	private void pollRMSSide(SequenceEntry entry) throws AxisFault {
-		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMSSide");
+	private void pollRMSSide(SequenceEntry entry, boolean force) throws AxisFault {
+		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMSSide, force: " + force);
 		
 		RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
 		RMSBean findRMS = new RMSBean();
@@ -129,14 +135,19 @@
 			// This sequence must have been terminated, or deleted
 			stopThreadForSequence(entry.getSequenceId(), true);
 		} else {
-			pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll);
+			// The sequence is there, but we still only poll if we are expecting reply messages,
+			// or if we don't have clean ack state.
+			boolean cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
+			long  repliesExpected = beanToPoll.getExpectedReplies();
+			if(force ||	!cleanAcks || repliesExpected > 0)
+				pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
 		}
 
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide");
 	}
 
-	private void pollRMDSide(SequenceEntry entry) throws AxisFault {
-		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMDSide");
+	private void pollRMDSide(SequenceEntry entry, boolean force) throws AxisFault {
+		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMDSide, force: " + force);
 		RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
 		RMDBean findBean = new RMDBean();
 		findBean.setPollingMode(true);
@@ -148,7 +159,22 @@
 			// This sequence must have been terminated, or deleted
 			stopThreadForSequence(entry.getSequenceId(), false);
 		} else {
-			pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean);
+			// The sequence is still there, but if we have a running related sequence
+			// that is not expecting replies then there is no need to poll.
+			boolean doPoll = true;
+			String outboundSequence = nextMsgBean.getOutboundSequence();
+			if(outboundSequence != null) {
+				RMSBean findRMS = new RMSBean();
+				findRMS.setSequenceID(outboundSequence);
+				findRMS.setTerminated(false);
+				RMSBeanMgr mgr = storageManager.getRMSBeanMgr();
+				RMSBean outbound = mgr.findUnique(findRMS);
+				if(outbound != null && outbound.getExpectedReplies() == 0) {
+					doPoll = false;
+				}
+			}
+			if(force || doPoll)
+				pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean, entry);
 		}
 
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMDSide");
@@ -157,25 +183,34 @@
 	private void pollForSequence(String sequenceId,
 								 String sequencePropertyKey,
 								 String referenceMsgKey,
-								 RMSequenceBean rmBean)
+								 RMSequenceBean rmBean,
+								 SequenceEntry entry)
 	throws SandeshaException, SandeshaStorageException, AxisFault
 	{
 		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey + ", " + referenceMsgKey + ", " + rmBean);
 		
 		//create a MakeConnection message  
 		String replyTo = rmBean.getReplyToEPR();
-		String WSRMAnonReplyToURI = null;
+		String wireSeqId = null;
+		String wireAddress = null;
 		if (SandeshaUtil.isWSRMAnonymous(replyTo)) {
 			// If we are polling on a RM anon URI then we don't want to include the sequence id
 			// in the MakeConnection message.
-			sequenceId = null;
-			WSRMAnonReplyToURI = replyTo;
+			wireAddress = replyTo;
+		} else {
+			wireSeqId = sequenceId;
 		}
 		
 		MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context);
 		RMMsgContext referenceRMMessage = MsgInitializer.initializeMessage(referenceMessage);
 		RMMsgContext makeConnectionRMMessage = RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
-				rmBean, sequenceId, WSRMAnonReplyToURI, storageManager);
+				rmBean, wireSeqId, wireAddress, storageManager);
+		
+		// Store properties so that we know which sequence we are polling for. This can be used
+		// to match reply sequences up to requests, as well as to help process messagePending
+		// headers.
+		OperationContext ctx = makeConnectionRMMessage.getMessageContext().getOperationContext();
+		ctx.setProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY, entry);
 		
 		makeConnectionRMMessage.setProperty(MessageContext.TRANSPORT_IN,null);
 		//storing the MakeConnection message.

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java Thu Feb 15 06:20:22 2007
@@ -58,6 +58,12 @@
 	 * To Address of the messages that will be received for this sequence.
 	 */
 	private String toAddress;
+	
+	/**
+	 * Client side, we keep track of inbound and outbound sequence pairs. Each
+	 * inbound sequence has the identifier of the associated outbound sequence.
+	 */
+	private String outboundSequence;
 
 	/**
 	 * Comment for <code>nextMsgNoToProcess</code>
@@ -157,6 +163,15 @@
 		this.toAddress = toAddress;
 	}
 
+	public String getOutboundSequence() {
+		return outboundSequence;
+	}
+
+	public void setOutboundSequence(String outboundSequence) {
+		this.outboundSequence = outboundSequence;
+	}
+
+
 	public String toString() {
 		StringBuffer result = new StringBuffer();
 		result.append(this.getClass().getName());
@@ -195,6 +210,9 @@
 			equal = false;
 
 		else if(bean.getToAddress() != null && !bean.getToAddress().equals(this.getToAddress()))
+			equal = false;
+		
+		else if(bean.getOutboundSequence() != null && !bean.getOutboundSequence().equals(this.getOutboundSequence()))
 			equal = false;
 		
 		else if ((bean.rmdFlags & NEXT_MSG_NO_FLAG) != 0 && bean.getNextMsgNoToProcess() != this.getNextMsgNoToProcess())

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java Thu Feb 15 06:20:22 2007
@@ -122,6 +122,11 @@
 	private long numberOfMessagesAcked = 0;
 
 	/**
+	 * The number of reply messages that we expect
+	 */
+	private long expectedReplies = 0;
+	
+	/**
 	 * Flags that are used to check if the primitive types on this bean
 	 * have been set. If a primitive type has not been set then it will
 	 * be ignored within the match method.
@@ -136,6 +141,7 @@
 	private static final int SEQ_CLOSED_CLIENT_FLAG    = 0x01000000;
 	private static final int ACKED_MESSAGES_FLAG       = 0x10000000;
 	private static final int TERM_PAUSER_FOR_CS        = 0x00000002;
+	private static final int EXPECTED_REPLIES          = 0x00000020;
 
   /**
    * In WSRM Anon URI scenario, we may not want to terminate a perticular sequence until the CreateSequence has been received
@@ -310,6 +316,15 @@
 	}
 
 
+	public long getExpectedReplies() {
+		return expectedReplies;
+	}
+
+	public void setExpectedReplies(long expectedReplies) {
+		this.expectedReplies = expectedReplies;
+		this.rmsFlags |= EXPECTED_REPLIES;
+	}
+
 	public String toString() {
 		StringBuffer result = new StringBuffer();
 		result.append(this.getClass().getName());
@@ -327,6 +342,7 @@
 		result.append("\nTimedOut         : "); result.append(timedOut);
 		result.append("\nClosedClient     : "); result.append(sequenceClosedClient);
 		result.append("\nNumAckedMsgs     : "); result.append(numberOfMessagesAcked);
+		result.append("\nExpectedReplies  : "); result.append(expectedReplies);
 		result.append("\nTransportTo      : "); result.append(transportTo);
 		result.append("\nOfferedEndPoint  : "); result.append(offeredEndPoint);
 		result.append("\nOfferedSequence  : "); result.append(offeredSequence);
@@ -403,7 +419,11 @@
 		
 		else if((bean.rmsFlags & TERM_PAUSER_FOR_CS) != 0 && bean.isTerminationPauserForCS() != this.isTerminationPauserForCS())
 			match = false;
-		
+
+		else if((bean.rmsFlags & EXPECTED_REPLIES) != 0 && bean.getExpectedReplies() != this.getExpectedReplies())
+			match = false;
+
 		return match;
 	}
+
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Thu Feb 15 06:20:22 2007
@@ -941,4 +941,53 @@
     String stackTrace = baos.toString();
     return stackTrace;
 	}
+
+	public static EndpointReference rewriteEPR(EndpointReference epr, MessageContext mc)
+	throws SandeshaException
+	{
+		if (log.isDebugEnabled())
+			log.debug("Enter: SandeshaUtil::rewriteEPR " + epr);
+
+		ConfigurationContext configContext = mc.getConfigurationContext();
+		SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
+		if(!policy.isEnableRMAnonURI()) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: SandeshaUtil::rewriteEPR, anon uri is disabled");
+			return epr;
+		}
+
+		// Handle EPRs that have not yet been set. These are effectively WS-A anon, and therefore
+		// we can rewrite them.
+		if(epr == null) epr = new EndpointReference(null);
+		
+		String address = epr.getAddress();
+		if(address == null ||
+		   AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
+		   AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
+			// We use the service context to co-ordinate the RM anon uuid, so that several
+			// invocations of the same target will yield stable replyTo addresses.
+			String uuid = null;
+			ServiceContext sc = mc.getServiceContext();
+			if(sc == null) {
+				String msg = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
+				throw new SandeshaException(msg);
+			}
+			synchronized (sc) {
+				uuid = (String) sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
+				if(uuid == null) {
+					uuid = SandeshaUtil.getUUID();
+					sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
+				}
+			}
+			
+			if(log.isDebugEnabled()) log.debug("Rewriting EPR with UUID " + uuid);
+			epr.setAddress(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX + uuid);
+		}
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: SandeshaUtil::rewriteEPR " + epr);
+		return epr;
+	}
+
+
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java Thu Feb 15 06:20:22 2007
@@ -31,6 +31,7 @@
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.workers.SequenceEntry;
 import org.apache.sandesha2.wsrm.CreateSequence;
 
 /**
@@ -82,6 +83,13 @@
 		}
 
 		MessageContext createSeqContext = createSequenceMsg.getMessageContext();
+		
+		// If this create is the result of a MakeConnection, then we must have a related
+		// outbound sequence.
+		SequenceEntry entry = (SequenceEntry) createSeqContext.getProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY);
+		if(entry != null && entry.isRmSource()) {
+			rmdBean.setOutboundSequence(entry.getSequenceId());
+		}
 
 		rmdBean.setServerCompletedMessages(new RangeString());
 		
@@ -260,7 +268,11 @@
 				}
 			}
 		}
-		// Store both the acksTo and replyTo
+		// In case either of the replyTo or AcksTo is anonymous, rewrite them using the AnonURI template
+		replyToEPR = SandeshaUtil.rewriteEPR(replyToEPR, firstAplicationMsgCtx);
+		acksToEPR = SandeshaUtil.rewriteEPR(acksToEPR, firstAplicationMsgCtx);
+		
+		// Store both the acksTo and replyTo 
 		if(replyToEPR != null) rmsBean.setReplyToEPR(replyToEPR.getAddress());
 		if(acksToEPR  != null) rmsBean.setAcksToEPR(acksToEPR.getAddress());
 		

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java Thu Feb 15 06:20:22 2007
@@ -299,42 +299,4 @@
 			}
 		}
 	}
-	
-	protected class SequenceEntry {
-		private String  sequenceId;
-		private boolean rmSource;
-		
-		public SequenceEntry(String sequenceId, boolean rmSource) {
-			this.sequenceId = sequenceId;
-			this.rmSource = rmSource;
-		}
-		public boolean isRmSource() {
-			return rmSource;
-		}
-		public String getSequenceId() {
-			return sequenceId;
-		}
-
-
-		public boolean equals(Object o) {
-			if(o == null) return false;
-			if(o == this) return true;
-			if(o.getClass() != getClass()) return false;
-			
-			SequenceEntry other = (SequenceEntry) o;
-			if(sequenceId != null) {
-				if(!sequenceId.equals(other.sequenceId)) return false;
-			} else {
-				if(other.sequenceId != null) return false;
-			}
-			
-			return rmSource == other.rmSource;
-		}
-		public int hashCode() {
-			int result = 1;
-			if(sequenceId != null) result = sequenceId.hashCode();
-			if(rmSource) result = -result;
-			return result;
-		}
-	}
 }

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java?view=auto&rev=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java Thu Feb 15 06:20:22 2007
@@ -0,0 +1,59 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+package org.apache.sandesha2.workers;
+
+import java.io.Serializable;
+
+public class SequenceEntry implements Serializable {
+	private static final long serialVersionUID = -6823171634616402792L;
+
+	private String  sequenceId;
+	private boolean rmSource;
+	
+	public SequenceEntry(String sequenceId, boolean rmSource) {
+		this.sequenceId = sequenceId;
+		this.rmSource = rmSource;
+	}
+	public boolean isRmSource() {
+		return rmSource;
+	}
+	public String getSequenceId() {
+		return sequenceId;
+	}
+
+
+	public boolean equals(Object o) {
+		if(o == null) return false;
+		if(o == this) return true;
+		if(o.getClass() != getClass()) return false;
+		
+		SequenceEntry other = (SequenceEntry) o;
+		if(sequenceId != null) {
+			if(!sequenceId.equals(other.sequenceId)) return false;
+		} else {
+			if(other.sequenceId != null) return false;
+		}
+		
+		return rmSource == other.rmSource;
+	}
+	public int hashCode() {
+		int result = 1;
+		if(sequenceId != null) result = sequenceId.hashCode();
+		if(rmSource) result = -result;
+		return result;
+	}
+}

Propchange: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org