You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/01/30 16:57:27 UTC

svn commit: r373537 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/ msgprocessors/ storage/inmemory/ util/ workers/

Author: chamikara
Date: Mon Jan 30 07:55:20 2006
New Revision: 373537

URL: http://svn.apache.org/viewcvs?rev=373537&view=rev
Log:
Corrected the ending of SimpleAxisServer correctly after an RM sequence.

Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Mon Jan 30 07:55:20 2006
@@ -87,15 +87,14 @@
 		}
 
 		String internalSequenceId = (String) internalSequenceBean.getValue();
-		findBean.setInternalSequenceID(internalSequenceId);
 		findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
 		findBean.setSend(true);
 		findBean.setReSend(false);
 		Collection collection = retransmitterBeanMgr.find(findBean);
+		
 		Iterator it = collection.iterator();
 
 		if (it.hasNext()) {
-
 			SenderBean ackBean = (SenderBean) it.next();
 
 			long timeNow = System.currentTimeMillis();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Mon Jan 30 07:55:20 2006
@@ -192,6 +192,8 @@
 		String NO_OF_OUTGOING_MSGS_ACKED = "NoOfOutGoingMessagesAcked";
 		
 		String TRANSPORT_TO = "TransportTo";
+		
+		String OUT_SEQ_ACKSTO = "OutSequenceAcksTo";
 	}
 
 	public interface SOAPVersion {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Mon Jan 30 07:55:20 2006
@@ -22,6 +22,9 @@
 import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.client.ListenerManager;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -176,7 +179,8 @@
 	 * @param sequenceID
 	 * @throws SandeshaException
 	 */
-	public static void terminateSendingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+	public static void terminateSendingSide (ConfigurationContext configContext, String sequenceID,boolean serverSide) throws SandeshaException {
+		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		
 		//TODO - remove folowing redundant transaction
@@ -186,6 +190,38 @@
 		SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
 		CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
 		
+		
+		if (!serverSide) {
+			//stpoing the listner for the client side.	
+			
+			//SequencePropertyBean outGoingAcksToBean  = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQ_ACKSTO);
+			
+			boolean stopListnerForAsyncAcks = false;
+			SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+			if (internalSequenceBean!=null) {
+				String internalSequenceID = internalSequenceBean.getValue();
+				SequencePropertyBean acksToBean = sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+				
+				if (acksToBean!=null) {
+					String acksTo = acksToBean.getValue();
+					if (acksTo!=null && !Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)) {
+						stopListnerForAsyncAcks = true;
+					}
+				}
+			}
+			
+			try {
+				//this removes the listner entry for receiving async acks.
+				if (stopListnerForAsyncAcks)
+					ListenerManager.stop(configContext,Constants.TRANSPORT_HTTP);
+				
+				//TODO stop listner for asyncControlMessages
+				
+			} catch (AxisFault e) {
+				throw new SandeshaException (e.getMessage());
+			}
+		}
+		
 		SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
 		if (internalSequenceBean==null)
 			throw new SandeshaException ("TempSequence entry not found");
@@ -228,14 +264,6 @@
 		}
 		
 		terminateSendingTransaction.commit();
-		
-		//asking the listner to stop.
-		//if (clientSide)
-//			try {
-//				ListenerManager.stop(configContext,Constants.TRANSPORT_HTTP);
-//			} catch (AxisFault e) {
-//				throw new SandeshaException (e.getMessage());
-//			}
 		
 		SandeshaUtil.stopSenderForTheSequence(internalSequenceId);
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Jan 30 07:55:20 2006
@@ -290,15 +290,6 @@
 							.getProperty(MessageContext.TRANSPORT_IN);
 					if (transportIn == null)
 						transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
-
-					// For receiving async Ack messages.
-					// try {
-					ListenerManager.makeSureStarted(transportIn, context);
-					// } catch (AxisFault e) {
-					// log.debug("Could not start listener...");
-					// log.debug(e.getStackTrace());
-					// }
-
 				} else if (acksTo == null && serverSide) {
 					String incomingSequencId = SandeshaUtil
 							.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Jan 30 07:55:20 2006
@@ -123,7 +123,6 @@
 			rmMsgCtx.setRelatesTo(null);
 
 		SenderBean input = new SenderBean();
-		input.setInternalSequenceID(internalSequenceId);
 		input.setSend(true);
 		input.setReSend(true);
 		Collection retransmitterEntriesOfSequence = retransmitterMgr

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Jan 30 07:55:20 2006
@@ -411,7 +411,6 @@
 			//the internalSequenceId value of the retransmitter Table for the
 			// messages related to an incoming
 			//sequence is the actual sequence ID
-			ackBean.setInternalSequenceID(sequenceId);
 
 //			RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
 //					.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
@@ -442,7 +441,6 @@
 			//removing old acks.
 			SenderBean findBean = new SenderBean();
 			findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-			findBean.setInternalSequenceID(sequenceId);
 			findBean.setSend(true);
 			findBean.setReSend(false);
 			Collection coll = retransmitterBeanMgr.find(findBean);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Mon Jan 30 07:55:20 2006
@@ -84,7 +84,7 @@
 
 	public Collection find(SenderBean bean) {
 		ArrayList beans = new ArrayList();
-		Iterator iterator = table.values().iterator();
+		Iterator iterator = ((Hashtable) table).values().iterator();
 
 		SenderBean temp;
 		while (iterator.hasNext()) {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Mon Jan 30 07:55:20 2006
@@ -621,9 +621,9 @@
 			return Sandesha2Constants.INTERNAL_SEQUENCE_PREFIX + ":" + to + ":" +sequenceKey;
 	}
 	
-//	public static String getServerSideInternalSeqID (String incomingSeqId) {
-//		return (Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":" + incomingSeqId);
-//	}
+	public static String getInternalSequenceID (String sequenceID) {
+			return Sandesha2Constants.INTERNAL_SEQUENCE_PREFIX + ":" + sequenceID;
+	}
 	
 	public static String getSequenceIDFromInternalSequenceID (String internalSequenceID, ConfigurationContext configurationContext)  throws SandeshaException {
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Mon Jan 30 07:55:20 2006
@@ -10,10 +10,13 @@
 import java.util.StringTokenizer;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.ListenerManager;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.Parameter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -188,6 +191,50 @@
 
 		SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
 		
+		updateClientSideListnerIfNeeded (firstAplicationMsgCtx);
+		
+	}
+	
+	private static void updateClientSideListnerIfNeeded (MessageContext messageContext) throws SandeshaException {
+		if (messageContext.isServerSide())
+			return;   //listners are updated only for the client side.
+		
+		String transportInProtocol = messageContext.getOptions().getTransportInProtocol();
+		
+		String acksTo = (String) messageContext.getProperty(Sandesha2ClientAPI.AcksTo);
+		String mep = messageContext.getAxisOperation().getMessageExchangePattern();
+		
+		boolean startListnerForAsyncAcks = false;
+		boolean startListnerForAsyncControlMsgs = false;   //For async createSerRes & terminateSeq.
+		
+		if (acksTo!=null && !Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)) {
+			//starting listner for async acks.
+			startListnerForAsyncAcks = true;
+		}
+		
+		if (mep!=null && !AxisOperation.MEP_URI_OUT_ONLY.equals(mep)) {
+			//starting listner for the async createSeqResponse & terminateSer messages.
+			startListnerForAsyncControlMsgs = true;
+		}
+		
+		try {
+			if ((startListnerForAsyncAcks || startListnerForAsyncControlMsgs) && transportInProtocol==null)
+				throw new SandeshaException ("Cant start the listner since the TransportInProtocol is null");
+
+			if (startListnerForAsyncAcks)
+				ListenerManager.makeSureStarted(messageContext.getOptions().getTransportInProtocol(),messageContext.getConfigurationContext());
+		
+			if (startListnerForAsyncControlMsgs)
+				ListenerManager.makeSureStarted(messageContext.getOptions().getTransportInProtocol(),messageContext.getConfigurationContext());
+							
+			
+		} catch (AxisFault e) {
+			String message = "Cant start the listner for incoming messages";
+			log.error(e.getStackTrace());
+			System.out.println(e.getStackTrace());
+			throw new SandeshaException (message);
+		}
+	
 	}
 	
 	/**

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Mon Jan 30 07:55:20 2006
@@ -197,8 +197,8 @@
 									.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 							if (sequence.getLastMessage() != null) {
 								
-								TerminateManager.cleanReceivingSideAfterInvocation(
-										context, sequenceId);
+								TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId);
+								
 								//this sequence has no more invocations
 								stopInvokerForTheSequence(sequenceId);
 								

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Mon Jan 30 07:55:20 2006
@@ -172,7 +172,7 @@
 								//do time out processing.
 								
 								//TODO uncomment below line
-								//TerminateManager.terminateSendingSide(context,sequenceID);
+								TerminateManager.terminateSendingSide(context,sequenceID,msgCtx.isServerSide());
 								
 								String message = "Sequence timed out";
 								log.debug(message);
@@ -185,14 +185,8 @@
 									.piggybackAckIfPresent(rmMsgCtx);
 						}
 						
-
-						
 						preSendTransaction.commit();
 						
-						if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-							int i =1;
-						}
-						
 						try {
 							//every message should be resumed (pause==false) when sending
 							boolean paused = msgCtx.isPaused();
@@ -258,8 +252,7 @@
 							ConfigurationContext configContext = msgCtx
 									.getConfigurationContext();
 
-							TerminateManager.terminateSendingSide(
-									configContext, sequenceID);
+							TerminateManager.terminateSendingSide(configContext, sequenceID,msgCtx.isServerSide());
 							
 							//removing a entry from the Listener
 							String transport = msgCtx.getTransportOut().getName().getLocalPart();



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org