You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/09/28 16:49:19 UTC

svn commit: r450876 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: i18n/ msgprocessors/ polling/ storage/beans/ storage/inmemory/ util/ workers/

Author: chamikara
Date: Thu Sep 28 07:49:16 2006
New Revision: 450876

URL: http://svn.apache.org/viewvc?view=rev&rev=450876
Log:
Changed to polling manager to do the polling processing logic using the NextMsgBean manager.
This seems to be the correct way since NextMsgBean is the bean item that hs details about an incoming sequence.


Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/NextMsgBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Thu Sep 28 07:49:16 2006
@@ -67,6 +67,7 @@
 	public static final String couldNotCopyParameters="couldNotCopyParameters";
 	public static final String senderBeanNotFound="senderBeanNotFound";
 	public static final String workAlreadyAssigned="workAlreadyAssigned";
+	public static final String cannotSendToTheAddress="cannotSendToTheAddress";
 	
 
 

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Thu Sep 28 07:49:16 2006
@@ -60,7 +60,8 @@
 storedKeyNotPresent=Sandesha2 Internal Error: stored key not present in the retransmittable message
 invalidQName=Invalid QName string: {0}
 senderBeanNotFound=SenderBean was not found
-workAlreadyAssigned=Work '{0}' is already assigned to a different Worker. Will try the next one
+workAlreadyAssigned=Work ''{0}'' is already assigned to a different Worker. Will try the next one
+cannotSendToTheAddress=Message cannot be sent to the address ''{0}'' 
 
 
 dummyCallback=Sandesha2 Internal Error: dummy callback was called but this should never happen.

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Thu Sep 28 07:49:16 2006
@@ -234,6 +234,10 @@
 			ackBean.setMessageID(ackMsgCtx.getMessageID());
 			ackBean.setReSend(false);
 			ackBean.setSequenceID(sequenceId);
+			
+			EndpointReference to = ackMsgCtx.getTo();
+			if (to!=null)
+				ackBean.setToAddress(to.getAddress());
 
 			// this will be set to true in the sender.
 			ackBean.setSend(true);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Sep 28 07:49:16 2006
@@ -840,21 +840,21 @@
 		
 		//TODO set the replyTo of CreateSeq (and others) to Anymomous if Application Msgs hv it as Anonymous.
 		
-		//checking weather the sequence is in polling mode.
-		boolean pollingMode = false;
-		EndpointReference replyTo = applicationRMMsg.getReplyTo();
-		if (replyTo!=null && SandeshaUtil.isWSRMAnonymousReplyTo(replyTo.getAddress()))
-			pollingMode = true;
-		else if (replyTo!=null && offer!=null && 
-				(AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyTo.getAddress()) || 
-						AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyTo.getAddress())))
-			pollingMode = true;
+//		//checking weather the sequence is in polling mode.
+//		boolean pollingMode = false;
+//		EndpointReference replyTo = applicationRMMsg.getReplyTo();
+//		if (replyTo!=null && SandeshaUtil.isWSRMAnonymousReplyTo(replyTo.getAddress()))
+//			pollingMode = true;
+//		else if (replyTo!=null && offer!=null && 
+//				(AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyTo.getAddress()) || 
+//						AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyTo.getAddress())))
+//			pollingMode = true;
+//		
+//		createSeqBean.setPollingMode(pollingMode);
 		
-		createSeqBean.setPollingMode(pollingMode);
-		
-		//if PollingMode is true, starting the pollingmanager.
-		if (pollingMode)
-			SandeshaUtil.startPollingManager(configCtx);
+//		//if PollingMode is true, starting the pollingmanager.
+//		if (pollingMode)
+//			SandeshaUtil.startPollingManager(configCtx);
 		
 		SecurityToken token = (SecurityToken) createSeqRMMessage.getProperty(Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
 		if(token != null) {
@@ -885,6 +885,9 @@
 		createSeqEntry.setInternalSequenceID(sequencePropertyKey);
 		// this will be set to true in the sender
 		createSeqEntry.setSend(true);
+		EndpointReference to = createSeqRMMessage.getTo();
+		if (to!=null)
+			createSeqEntry.setToAddress(to.getAddress());
 
 		createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
 		createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
@@ -1085,7 +1088,10 @@
 			// Send will be set to true at the sender.
 			msg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
 		}
-
+		EndpointReference to = rmMsg.getTo();
+		if (to!=null)
+			appMsgEntry.setToAddress(to.getAddress());
+		
 		appMsgEntry.setInternalSequenceID(internalSequenceId);
 		storageManager.storeMessageContext(storageKey, msg);
 		retransmitterMgr.insert(appMsgEntry);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Thu Sep 28 07:49:16 2006
@@ -207,6 +207,9 @@
 				log.debug(message);
 				throw new AxisFault(message);
 			}
+			
+			//TODO add createSequenceResponse message as the referenceMessage to the NextMsgBean.
+			
 
 			SequencePropertyBean acksToBean = new SequencePropertyBean(newSequenceId,
 					Sandesha2Constants.SequenceProperties.ACKS_TO_EPR, acksTo.getAddress());

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Sep 28 07:49:16 2006
@@ -22,6 +22,7 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPFactory;
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.context.ConfigurationContext;
@@ -186,7 +187,39 @@
 			NextMsgBean nextMsgBean = new NextMsgBean();
 			nextMsgBean.setSequenceID(offeredSequenceId);
 			nextMsgBean.setNextMsgNoToProcess(1);
+			
 
+			boolean pollingMode = false;
+			if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
+				String replyToAddress = SandeshaUtil.getSequenceProperty(sequencePropertyKey, 
+								Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, storageManager);
+				if (replyToAddress!=null) {
+					if (AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyToAddress))
+						pollingMode = true;
+					else if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyToAddress))
+						pollingMode = true;
+					else if (replyToAddress.startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
+						pollingMode = true;
+				}
+			}
+			
+			//Storing the createSequence of the sending side sequence as the reference message.
+			//This can be used when creating new outgoing messages.
+			
+			String createSequenceMsgStoreKey = createSeqBean.getCreateSequenceMsgStoreKey();
+			MessageContext createSequenceMsg = storageManager.retrieveMessageContext(createSequenceMsgStoreKey, configCtx);
+			
+			String newMessageStoreKey = SandeshaUtil.getUUID();
+			storageManager.storeMessageContext(newMessageStoreKey,createSequenceMsg);
+			
+			nextMsgBean.setReferenceMessageKey(newMessageStoreKey);
+			
+			nextMsgBean.setPollingMode(pollingMode);
+			
+			//if PollingMode is true, starting the pollingmanager.
+			if (pollingMode)
+				SandeshaUtil.startPollingManager(configCtx);
+			
 			NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
 			nextMsgMgr.insert(nextMsgBean);
 

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Thu Sep 28 07:49:16 2006
@@ -427,7 +427,11 @@
 		terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
 
 		terminateBean.setMessageID(msgContext.getMessageID());
-
+		
+		EndpointReference to = msgContext.getTo();
+		if (to!=null)
+			terminateBean.setToAddress(to.getAddress());
+		
 		// this will be set to true at the sender.
 		terminateBean.setSend(true);
 

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Thu Sep 28 07:49:16 2006
@@ -22,6 +22,7 @@
 import java.util.Random;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.TransportOutDescription;
@@ -32,8 +33,10 @@
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
 import org.apache.sandesha2.util.MsgInitializer;
@@ -68,58 +71,59 @@
 					e.printStackTrace();
 				}
 				
-				CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
+				NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+				
 				//geting the sequences to be polled.
 				//if shedule contains any requests, do the earliest one.
 				//else pick one randomly.
 				
-				String internalSequenceId = null;
+				String sequenceId = null;
 				if (sheduledPollingRequests.size()>0) {
-					internalSequenceId = (String )sheduledPollingRequests.get(0);
+					sequenceId = (String )sheduledPollingRequests.get(0);
 					sheduledPollingRequests.remove(0);
 				}
 
-				CreateSeqBean createSeqBean = null;
+				NextMsgBean nextMsgBean = null;
 				
-				if (internalSequenceId==null) {
-					CreateSeqBean findBean = new CreateSeqBean ();
+				if (sequenceId==null) {
+					NextMsgBean findBean = new NextMsgBean ();
 					findBean.setPollingMode(true);
 					
-					List results = createSeqBeanMgr.find(findBean);
+					List results = nextMsgMgr.find(findBean);
 					int size = results.size();
 					if (size>0) {
 						Random random = new Random ();
 						int item = random.nextInt(size);
-						createSeqBean = (CreateSeqBean) results.get(item);
+						nextMsgBean = (NextMsgBean) results.get(item);
 					}
+					
+					sequenceId = nextMsgBean.getSequenceID();
 				} else {
-					CreateSeqBean findBean = new CreateSeqBean ();
+					NextMsgBean findBean = new NextMsgBean ();
 					findBean.setPollingMode(true);
-					findBean.setInternalSequenceID(internalSequenceId);
+					findBean.setSequenceID(sequenceId);
 					
-					createSeqBean = createSeqBeanMgr.findUnique(findBean);
+					nextMsgBean = nextMsgMgr.findUnique(findBean);
 				}
 				
 				//If not valid entry is found, try again later.
-				if (createSeqBean==null)
+				if (nextMsgBean==null)
 					continue;
 				
 				//create a MakeConnection message  
-				String createSeqMessageStoreKey = createSeqBean.getCreateSequenceMsgStoreKey();
-				internalSequenceId = createSeqBean.getInternalSequenceID();
+				String referenceMsgKey = nextMsgBean.getReferenceMessageKey();
 				
-				String incomingSequenceId = SandeshaUtil.getSequenceProperty(internalSequenceId,
-						Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE,storageManager);
-				String replyTo = SandeshaUtil.getSequenceProperty(internalSequenceId,
+				String sequencePropertyKey = sequenceId;
+				String replyTo = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
 						Sandesha2Constants.SequenceProperties.REPLY_TO_EPR,storageManager);
 				String WSRMAnonReplyToURI = null;
 				if (SandeshaUtil.isWSRMAnonymousReplyTo(replyTo))
 					WSRMAnonReplyToURI = replyTo;
 				
-				MessageContext referenceMessage = storageManager.retrieveMessageContext(createSeqMessageStoreKey,configurationContext);
+				MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,configurationContext);
 				RMMsgContext referenceRMMessage = MsgInitializer.initializeMessage(referenceMessage);
 				RMMsgContext makeConnectionRMMessage = RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
-						internalSequenceId, incomingSequenceId, WSRMAnonReplyToURI,storageManager);
+						sequenceId , WSRMAnonReplyToURI,storageManager);
 				
 				makeConnectionRMMessage.setProperty(MessageContext.TRANSPORT_IN,null);
 				//storing the MakeConnection message.
@@ -128,13 +132,16 @@
 				
 				//add an entry for the MakeConnection message to the sender (with ,send=true, resend=false)
 				SenderBean makeConnectionSenderBean = new SenderBean ();
-				makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
+//				makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
 				makeConnectionSenderBean.setMessageContextRefKey(makeConnectionMsgStoreKey);
 				makeConnectionSenderBean.setMessageID(makeConnectionRMMessage.getMessageId());
 				makeConnectionSenderBean.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
 				makeConnectionSenderBean.setReSend(false);
 				makeConnectionSenderBean.setSend(true);
-				makeConnectionSenderBean.setSequenceID(createSeqBean.getSequenceID());
+				makeConnectionSenderBean.setSequenceID(sequenceId);
+				EndpointReference to = makeConnectionRMMessage.getTo();
+				if (to!=null)
+					makeConnectionSenderBean.setToAddress(to.getAddress());
 
 				SenderBeanMgr senderBeanMgr = storageManager.getRetransmitterBeanMgr();
 				

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java Thu Sep 28 07:49:16 2006
@@ -51,12 +51,8 @@
 	 */
 	private String securityTokenData;
 	
-	/**
-	 * This tells weather this sequence is in the polling mode or not.
-	 * PollingManager will use this property decide the sequences that need
-	 * polling and will do MakeConnections on them.
-	 */
-	private boolean pollingMode;
+
+//	private boolean pollingMode;
 
 	
 	/**
@@ -100,14 +96,6 @@
 
 	public void setSecurityTokenData(String securityTokenData) {
 		this.securityTokenData = securityTokenData;
-	}
-
-	public boolean isPollingMode() {
-		return pollingMode;
-	}
-
-	public void setPollingMode(boolean pollingMode) {
-		this.pollingMode = pollingMode;
 	}
 
 	public String getCreateSequenceMsgStoreKey() {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/NextMsgBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/NextMsgBean.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/NextMsgBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/NextMsgBean.java Thu Sep 28 07:49:16 2006
@@ -35,6 +35,19 @@
 	 * The next message to be invoked of the representing sequence.
 	 */
 	private long nextMsgNoToProcess;
+	
+	/**
+	 * This tells weather this sequence is in the polling mode or not.
+	 * PollingManager will use this property decide the sequences that need
+	 * polling and will do MakeConnections on them.
+	 */
+	private boolean pollingMode=false;
+	
+	/**
+	 * This will be used as a referenced 
+	 */
+	private String referenceMessageKey;
+	
 
 	public NextMsgBean() {
 
@@ -74,4 +87,21 @@
 	public void setSequenceID(String sequenceID) {
 		this.sequenceID = sequenceID;
 	}
+
+	public boolean isPollingMode() {
+		return pollingMode;
+	}
+
+	public void setPollingMode(boolean pollingMode) {
+		this.pollingMode = pollingMode;
+	}
+
+	public String getReferenceMessageKey() {
+		return referenceMessageKey;
+	}
+
+	public void setReferenceMessageKey(String referenceMessageKey) {
+		this.referenceMessageKey = referenceMessageKey;
+	}
+	
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java Thu Sep 28 07:49:16 2006
@@ -92,6 +92,12 @@
 	 */
 	private String wsrmAnonURI;
 	
+	/**
+	 * Destination URL of the message to be sent. This can be used to decide weather the message cannot be sent,
+	 * before actyally reading the message from the storage.
+	 */
+	private String toAddress;
+	
 	public SenderBean() {
 
 	}
@@ -194,6 +200,14 @@
 
 	public void setWsrmAnonURI(String wsrmAnonURI) {
 		this.wsrmAnonURI = wsrmAnonURI;
+	}
+
+	public String getToAddress() {
+		return toAddress;
+	}
+
+	public void setToAddress(String toAddress) {
+		this.toAddress = toAddress;
 	}
 	
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Thu Sep 28 07:49:16 2006
@@ -31,8 +31,10 @@
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.SandeshaUtil;
 
 public class InMemorySenderBeanMgr implements SenderBeanMgr {
 	
@@ -91,6 +93,7 @@
 
 			temp = (SenderBean) iterator.next();
 
+			
 			boolean add = true;
 
 			if (bean.getMessageContextRefKey() != null && !bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey()))
@@ -120,8 +123,8 @@
 			if (bean.isSend() != temp.isSend())
 				add = false;
 
-			if (bean.isReSend() != temp.isReSend())
-				add = false;
+//			if (bean.isReSend() != temp.isReSend())
+//				add = false;
 			
 			if (add)
 				beans.add(temp);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java Thu Sep 28 07:49:16 2006
@@ -328,6 +328,9 @@
 			ackBean.setMessageID(ackMsgCtx.getMessageID());
 			ackBean.setReSend(false);
 			ackBean.setSequenceID(sequencePropertyKey);
+			EndpointReference to = ackMsgCtx.getTo();
+			if (to!=null)
+				ackBean.setToAddress(to.getAddress());
 
 			// this will be set to true in the sender.
 			ackBean.setSend(true);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java Thu Sep 28 07:49:16 2006
@@ -274,10 +274,10 @@
 		if (referenceReplyTo!=null && SandeshaUtil.isAnonymousURI (referenceReplyTo.getAddress()))
 			replyTo = new EndpointReference (referenceReplyTo.getAddress());
 		else
-			replyTo = context.getListenerManager().getEPRforService(
-					createSeqmsgContext.getAxisService().getName(), 
-					axisOperationName!=null?axisOperationName.getLocalPart():null, 
-        				transportInName!=null?transportInName.getLocalPart():null);
+			replyTo = context.getListenerManager().getEPRforService (
+					createSeqmsgContext.getAxisService().getName(), null, null 
+					/*axisOperationName!=null?axisOperationName.getLocalPart():null, 
+        				transportInName!=null?transportInName.getLocalPart():null*/);
         
         createSeqmsgContext.setReplyTo(replyTo);
         
@@ -711,22 +711,16 @@
 		applicationMsg.setMessageId(SandeshaUtil.getUUID());
 	}
 	
-	public static RMMsgContext createMakeConnectionMessage (RMMsgContext referenceRMMessage, String internalSequenceId, String makeConnectionSeqId,
+	public static RMMsgContext createMakeConnectionMessage (RMMsgContext referenceRMMessage,  String makeConnectionSeqId,
 			String makeConnectionAnonURI, StorageManager storageManager) throws AxisFault {
 		
 		MessageContext referenceMessage = referenceRMMessage.getMessageContext();
-		ConfigurationContext configurationContext = referenceMessage.getConfigurationContext();
-		
-		String rmVersion = SandeshaUtil.getSequenceProperty(internalSequenceId, 
-				Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION, storageManager);
-		String addressingNamespace = SandeshaUtil.getSequenceProperty(internalSequenceId, 
-				Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
-		String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
+		String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue();
+		String rmVersion = referenceRMMessage.getRMSpecVersion();
 		
 		AxisOperation makeConnectionOperation = AxisOperationFactory.getAxisOperation(
 				WSDL20_2004Constants.MEP_CONSTANT_OUT_IN);
-
-
+		
 		MessageContext makeConnectionMessageCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage,makeConnectionOperation);
 		RMMsgContext makeConnectionRMMessageCtx = MsgInitializer.initializeMessage(makeConnectionMessageCtx);
 		

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java Thu Sep 28 07:49:16 2006
@@ -263,7 +263,18 @@
 				log.error(e.getStackTrace());
 				throw new SandeshaException(message);
 			}
+		} else {
+			
+			EndpointReference replyToEPR = firstAplicationMsgCtx.getReplyTo();
+			//setting replyTo and acksTo beans.
+					
+			if (replyToEPR!=null)		
+				replyToBean = new SequencePropertyBean(sequencePropertyKey,
+						Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, replyToEPR.getAddress());
+		
+			//TODO set AcksToBean.
 		}
+		
 		// Default value for acksTo is anonymous (this happens only for the
 		// client side)
 		if (acksTo == null) {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java Thu Sep 28 07:49:16 2006
@@ -434,6 +434,10 @@
 				Sandesha2Constants.VALUE_FALSE);
 
 		terminateBean.setReSend(false);
+		
+		EndpointReference to = terminateRMMessage.getTo();
+		if (to!=null)
+			terminateBean.setToAddress(to.getAddress());
 
 		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
 

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Thu Sep 28 07:49:16 2006
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 
+import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.util.threadpool.ThreadFactory;
 import org.apache.axis2.util.threadpool.ThreadPool;
@@ -176,6 +177,26 @@
 				}
 				
 				String messageId = senderBean.getMessageID();
+				
+				String toAddress = senderBean.getToAddress();
+				if (toAddress!=null) {
+					boolean unsendableAddress = false;
+					
+					if (toAddress.equals(AddressingConstants.Submission.WSA_ANONYMOUS_URL))
+						unsendableAddress = true;
+					else if (toAddress.equals(AddressingConstants.Final.WSA_ANONYMOUS_URL))
+						unsendableAddress = true;
+					else if (toAddress.startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
+						unsendableAddress = true;
+					
+					if (unsendableAddress) {
+						if (log.isDebugEnabled()) {
+							String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendToTheAddress,toAddress);
+							log.debug(message);
+						}
+						continue;
+					}
+				}
 				
 				//work Id is used to define the piece of work that will be assigned to the Worker thread,
 				//to handle this Sender bean.

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=450876&r1=450875&r2=450876
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Thu Sep 28 07:49:16 2006
@@ -61,6 +61,10 @@
 			transaction = storageManager.getTransaction();
 
 			SenderBean senderBean = senderBeanMgr.retrieve(messageId);
+			if (senderBean==null) {
+				String message = "SenderWorker has been assigned an unexisting work";
+				throw new SandeshaException (message);
+			}
 			String key = senderBean.getMessageContextRefKey();
 			MessageContext msgCtx = storageManager.retrieveMessageContext(key, configurationContext);
 			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org