You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/10/04 08:38:58 UTC

svn commit: r293551 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ msgprocessors/ storage/beans/ util/

Author: chamikara
Date: Mon Oct  3 23:38:26 2005
New Revision: 293551

URL: http://svn.apache.org/viewcvs?rev=293551&view=rev
Log:
InOrder delivery assurance - Done
A new thread is itroduced for invoking in the correct order (InOrderInvoker)

Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=293551&r1=293550&r2=293551&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Mon Oct  3 23:38:26 2005
@@ -137,6 +137,9 @@
 	}
 
 	public interface SequenceProperties {
+		
+		String ALL_SEQUENCES = "AllSequences";  //this is not a sequence property. This is used as the sequenceId to share data b/w sequences
+
 		String RECEIVED_MESSAGES = "SeqMsgListProperty";
 
 		String TO_EPR = "ToEPR";
@@ -158,6 +161,8 @@
 		String NEXT_MESSAGE_NUMBER = "NextMsgNo";
 		
 		String LAST_OUT_MESSAGE = "LastOutMessage";
+		
+		String INCOMING_SEQUENCE_LIST = "IncomingSequenceList";
 	}
 
 	public interface SOAPVersion {
@@ -168,19 +173,28 @@
 		int DEFAULT = v1_1;
 	}
 
-	public interface DeliveryAssurance {
-		String IN_ORDER = "InOrder";
-
-		String NOT_IN_ORDER = "NotInOrder";
+	public interface QOS {
+		
+		public interface DeliveryAssurance {
+		
+			String IN_ORDER = "InOrder";
 
-		String DEFAULT_DELIVERY_ASSURANCE = NOT_IN_ORDER;
+			String NOT_IN_ORDER = "NotInOrder";
 
-		//invocation types
-		String EXACTLY_ONCE = "ExactlyOnce";
+			String DEFAULT_DELIVERY_ASSURANCE = NOT_IN_ORDER;
+		}
+		
+		public interface InvocationType {
+			
+			//invocation types
+			String EXACTLY_ONCE = "ExactlyOnce";
 
-		String MORE_THAN_ONCE = "MoreThanOnce";
+			String MORE_THAN_ONCE = "MoreThanOnce";
 
-		String DEFAULT_INVOCATION_TYPE = EXACTLY_ONCE;
+			String DEFAULT_INVOCATION_TYPE = EXACTLY_ONCE;
+		}
+		
+		
 	}
 
 	public interface BeanMAPs {
@@ -215,4 +229,5 @@
 
 	int SENDER_SLEEP_TIME = 1000;
 
+	int TERMINATE_DELAY = 1000;
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/InOrderInvoker.java?rev=293551&r1=293550&r2=293551&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/InOrderInvoker.java Mon Oct  3 23:38:26 2005
@@ -17,6 +17,7 @@
 
 package org.apache.sandesha2;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
@@ -28,10 +29,13 @@
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.sandesha2.storage.AbstractBeanMgrFactory;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.StorageMapBeanMgr;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.storage.beans.StorageMapBean;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.Sequence;
 import org.ietf.jgss.MessageProp;
 
 /**
@@ -40,106 +44,114 @@
  * @author Jaliya
  */
 
+//TODO rename to Invoker
 public class InOrderInvoker extends Thread {
-	boolean stopInvoker = false;
+	boolean invokerStarted = false;
 
 	ConfigurationContext context = null;
 
-	public synchronized void stopWork() {
-		stopInvoker = true;
+	public synchronized void stopInvoker() {
+		invokerStarted = false;
 	}
 
-	public synchronized boolean isStopped() {
-		return stopInvoker;
+	public synchronized boolean isInvokerStarted() {
+		return invokerStarted;
 	}
 
 	public void setConfugurationContext(ConfigurationContext context) {
 		this.context = context;
 	}
 
+	public void start (ConfigurationContext context) {
+		System.out.println ("Starting the invoker......");
+		invokerStarted = true;
+		this.context = context;
+		super.start();
+	}
+	
 	public void run() {
 
-		while (!isStopped()) {
-
-			System.out.print("|");
-			NextMsgBeanMgr nextMsgMgr = AbstractBeanMgrFactory.getInstance(
-					context).getNextMsgBeanMgr();
-
-			StorageMapBeanMgr storageMapMgr = AbstractBeanMgrFactory
-					.getInstance(context).getStorageMapBeanMgr();
-
-			Collection coll = nextMsgMgr.retrieveAll();
-
-			Iterator it = coll.iterator();
-
-			while (it.hasNext()) {
-				Object obj = it.next();
-				NextMsgBean nextMsgBean = (NextMsgBean) obj;
-				long msgNo = nextMsgBean.getNextMsgNoToProcess();
-				boolean tryNext = true;
-
-				while (tryNext) {
-					String seqId = nextMsgBean.getSequenceId();
-					Collection coll1 = storageMapMgr.find(new StorageMapBean(
-							null, msgNo, seqId));
-					if (coll1 == null || coll1.isEmpty()) {
-						tryNext = false;
-						continue;
-					}
-
-					StorageMapBean stMapBean = (StorageMapBean) coll1
-							.iterator().next();
-					if (stMapBean == null) {
+		while (isInvokerStarted()) {
 
-						tryNext = false;
-						continue;
-					}
+			//System.out.print("~~");
+			
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException ex) {
+				ex.printStackTrace();
+			}
+			
 
-					String key = stMapBean.getKey();
+			try {
+				NextMsgBeanMgr nextMsgMgr = AbstractBeanMgrFactory.getInstance(
+						context).getNextMsgBeanMgr();
 
-					try {
-						boolean done = resumeMessageContext(key);
-						System.out.println("Resumed");
-						if (!done) {
-							tryNext = false;
-							continue;
+				StorageMapBeanMgr storageMapMgr = AbstractBeanMgrFactory
+						.getInstance(context).getStorageMapBeanMgr();
+				
+				SequencePropertyBeanMgr sequencePropMgr = AbstractBeanMgrFactory
+						.getInstance(context).getSequencePropretyBeanMgr();
+
+				
+				
+				//Getting the incomingSequenceIdList
+				SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) sequencePropMgr.retrieve(Constants.SequenceProperties.ALL_SEQUENCES,Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+				if (sequencePropertyBean==null)
+					continue;
+				
+				
+				ArrayList seqPropList = (ArrayList) sequencePropertyBean.getValue();
+				Iterator seqPropIt = seqPropList.iterator();
+				
+				while (seqPropIt.hasNext()){
+					
+					//FIXME - Invoke multiple messages of the same sequence within one iteration.
+					
+					String sequenceId = (String) seqPropIt.next();
+					
+					NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+					if (nextMsgBean==null) 
+						throw new SandeshaException ("Next message not set correctly");
+					
+					long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
+					if (nextMsgno<=0)
+						throw new SandeshaException ("Invalid messaage number for the nextMsgNo");
+					
+					Iterator stMapIt = storageMapMgr.find(new StorageMapBean (null,nextMsgno,sequenceId)).iterator();
+					while (stMapIt.hasNext()){
+						StorageMapBean stMapBean = (StorageMapBean) stMapIt.next();
+						String key = stMapBean.getKey();
+						
+						MessageContext msgToInvoke = SandeshaUtil.getStoredMessageContext(key);
+						
+						//removing the storage map entry.
+						storageMapMgr.delete(key);
+						
+						RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
+						Sequence seq = (Sequence) rmMsg.getMessagePart(Constants.MessageParts.SEQUENCE);
+						long msgNo = seq.getMessageNumber().getMessageNumber();
+						
+						try {
+							//Invoking the message.
+							new AxisEngine (msgToInvoke.getSystemContext()).receive(msgToInvoke);
+						} catch (AxisFault e) {
+							throw new SandeshaException (e.getMessage());
 						}
-					} catch (SandeshaException ex) {
-						ex.printStackTrace();
-						tryNext = false;
-						continue;
-					}
+						
 
-					msgNo++;
+						if (msgNo==3)
+							return;
+						
+						//undating the next mst to invoke
+						nextMsgno++;
+						nextMsgMgr.update(new NextMsgBean (sequenceId,nextMsgno));
+					}
+					
 				}
-
-				nextMsgBean.setNextMsgNoToProcess(msgNo);
-				nextMsgMgr.update(nextMsgBean);
+			} catch (SandeshaException e1) {
+				// TODO Auto-generated catch block
+				e1.printStackTrace();
 			}
-
-			try {
-				Thread.sleep(20000);
-			} catch (InterruptedException ex) {
-				ex.printStackTrace();
-			}
-		}
-	}
-
-	private boolean resumeMessageContext(String key) throws SandeshaException {
-		MessageContext ctx = SandeshaUtil.getStoredMessageContext(key);
-		if (ctx == null)
-			return false;
-
-		ctx.setPausedTrue(new QName(Constants.IN_HANDLER_NAME)); //in case the
-																 // pause is not
-																 // set
-
-		//resuming.
-		try {
-			new AxisEngine(ctx.getSystemContext()).receive(ctx);
-		} catch (AxisFault ex) {
-			throw new SandeshaException(ex.getMessage());
 		}
-		return true;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java?rev=293551&r1=293550&r2=293551&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java Mon Oct  3 23:38:26 2005
@@ -29,7 +29,7 @@
 
 public class SandeshaModule implements Module{
 
-	private InOrderInvoker inorderInvoker = new InOrderInvoker ();
+	//private InOrderInvoker inorderInvoker = new InOrderInvoker ();
 	
     // initialize the module
     public void init(AxisConfiguration axisSystem) throws AxisFault {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java?rev=293551&r1=293550&r2=293551&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java Mon Oct  3 23:38:26 2005
@@ -55,7 +55,7 @@
 	public void run () {
 		
 		while (senderStarted) {
-			System.out.println ("|-|");
+			//System.out.println ("|-|");
 			try {
 				if (context==null)
 					throw new SandeshaException ("Can't continue the Sender. Context is null");
@@ -73,9 +73,10 @@
 				 MessageContext msgCtx = SandeshaUtil.getStoredMessageContext(key);
 					
 				 try {	 
-				 	updateMessage (msgCtx);
+				 	MessageContext copiedMsgCtx = SandeshaUtil.deepCopy(msgCtx);
+				 	updateMessage (copiedMsgCtx);
 				 	
-				 	Object obj = msgCtx.getProperty(
+				 	Object obj = copiedMsgCtx.getProperty(
                             MessageContext.CHARACTER_SET_ENCODING);
 				 	System.out.println("CHAR SET ENCODING:" + obj);
 					new AxisEngine(context).send(msgCtx);
@@ -88,9 +89,13 @@
 				//changing the values of the sent bean.
 			    bean.setLastSentTime(System.currentTimeMillis());
 			    bean.setSentCount(bean.getSentCount()+1);
-			    mgr.update(bean);
-				
-				
+			    
+			    //update if resend=true otherwise delete. (reSend=false means send only once).
+			    if (bean.isReSend())
+			    	mgr.update(bean);
+			    else
+			    	mgr.delete(bean.getMessageId());
+					
 			}
 			
 			try {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=293551&r1=293550&r2=293551&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Oct  3 23:38:26 2005
@@ -140,11 +140,13 @@
 			boolean complete = SandeshaUtil.verifySequenceCompletion(
 					sequenceAck.getAcknowledgementRanges().iterator(),
 					lastOutMessageNo);
+			
 			if (complete) {
 				addTerminateSequenceMessage(rmMsgCtx, outSequenceId,incomingSequenceId);
 			}
 		}
 
+		int i = 1;
 	}
 
 	private RetransmitterBean getRetransmitterEntry(Collection collection,
@@ -190,27 +192,20 @@
 				.getMessageContext());
 		RetransmitterBean terminateBean = new RetransmitterBean();
 		terminateBean.setKey(key);
-		terminateBean.setLastSentTime(0);
+		
+		//Set a retransmitter lastSentTime so that terminate will be send with some delay.
+		//Otherwise this get send before return of the current request (ack).
+		//TODO verify that the time given is correct
+		terminateBean.setLastSentTime(System.currentTimeMillis()+Constants.TERMINATE_DELAY);
+		
 		terminateBean.setMessageId(terminateRMMessage.getMessageId());
 		terminateBean.setSend(true);
+		terminateBean.setReSend(false);
 
 		RetransmitterBeanMgr retramsmitterMgr = AbstractBeanMgrFactory.getInstance(
 				incomingAckRMMsg.getContext()).getRetransmitterBeanMgr();
 		retramsmitterMgr.insert(terminateBean);
 		
-		
-		try {
-			System.out.println("SERIALIZING TERMINATE MSG");
-			SOAPEnvelope envel = terminateRMMessage.getSOAPEnvelope();
-			XMLStreamWriter writer = XMLOutputFactory.newInstance().createXMLStreamWriter(System.out);
-			envel.serialize(writer);
-		} catch (XMLStreamException e1) {
-			// TODO Auto-generated catch block
-			e1.printStackTrace();
-		} catch (FactoryConfigurationError e1) {
-			// TODO Auto-generated catch block
-			e1.printStackTrace();
-		}
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=293551&r1=293550&r2=293551&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Oct  3 23:38:26 2005
@@ -34,6 +34,7 @@
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.soap.SOAPEnvelope;
 import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.InOrderInvoker;
 import org.apache.sandesha2.MsgInitializer;
 import org.apache.sandesha2.MsgValidator;
 import org.apache.sandesha2.RMMsgContext;
@@ -60,7 +61,7 @@
 public class ApplicationMsgProcessor implements MsgProcessor {
 
 	private boolean letInvoke = false;
-	
+
 	public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
 
 		System.out.println("Application msg processor called");
@@ -96,11 +97,13 @@
 		String messagesStr = (String) msgsBean.getValue();
 
 		if (msgNoPresentInList(messagesStr, msgNo)
-				&& (Constants.DeliveryAssurance.DEFAULT_INVOCATION_TYPE == Constants.DeliveryAssurance.EXACTLY_ONCE)) {
+				&& (Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Constants.QOS.InvocationType.EXACTLY_ONCE)) {
 			//this is a duplicate message and the invocation type is
 			// EXACTLY_ONCE.
 			throw new SandeshaException(
 					"Duplicate message - Invocation type is EXACTLY_ONCE");
+
+			//FIXME - return instead of sending a fault.
 		}
 
 		if (messagesStr != "" && messagesStr != null)
@@ -119,7 +122,6 @@
 		EndpointReference acksTo = (EndpointReference) acksToBean.getValue();
 		String acksToStr = acksTo.getAddress();
 
-
 		//TODO: remove folowing 2.
 		System.out.println("Messages received:" + messagesStr);
 		System.out.println("Acks To:" + acksToStr);
@@ -165,17 +167,17 @@
 
 			AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
 					.getSystemContext());
-		
-			
+
 			//set CONTEXT_WRITTEN since acksto is anonymous
-			rmMsgCtx.getMessageContext().getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,"true");
-			rmMsgCtx.getMessageContext().setProperty(Constants.ACK_WRITTEN,"true");
+			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+					org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+			rmMsgCtx.getMessageContext().setProperty(Constants.ACK_WRITTEN,
+					"true");
 			try {
 				engine.send(ackRMMsgCtx.getMessageContext());
 			} catch (AxisFault e1) {
 				throw new SandeshaException(e1.getMessage());
 			}
-			
 
 		} else {
 			//TODO Add async Ack
@@ -194,81 +196,61 @@
 
 		long nextMsgno = bean.getNextMsgNoToProcess();
 
-		if (nextMsgno < msgNo) {
-
-			//pause and store the message (since it is not the next message of
-			// the order)
-			//rmMsgCtx.getMessageContext().setPausedTrue(new QName
-			// (Constants.IN_HANDLER_NAME));
-
-			try {
-				String key = SandeshaUtil.storeMessageContext(rmMsgCtx
-						.getMessageContext());
-				storageMapMgr
-						.insert(new StorageMapBean(key, msgNo, sequenceId));
-
-				//This will avoid performing application processing more than
-				// once.
-				rmMsgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,
-						"true");
+		//Have to pause the message anyway
+		msgCtx.setPausedTrue(new QName(Constants.IN_HANDLER_NAME));
+		
+		
+		//Adding an entry in the SequencesToInvoke List  TODO - add this to a module init kind of place.
+		SequencePropertyBean incomingSequenceListBean =  (SequencePropertyBean) seqPropMgr.retrieve(sequenceId,Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+		
+		if (incomingSequenceListBean==null) { 
+			ArrayList incomingSequenceList = new ArrayList ();
+			incomingSequenceListBean = new SequencePropertyBean ();
+			incomingSequenceListBean.setSequenceId(Constants.SequenceProperties.ALL_SEQUENCES);
+			incomingSequenceListBean.setName(Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+			incomingSequenceListBean.setValue(incomingSequenceList);
+			
+			seqPropMgr.insert(incomingSequenceListBean);
+		}
+		
+		//This must be a List :D
+		ArrayList incomingSequenceList = (ArrayList) incomingSequenceListBean.getValue();
+		
+		//Adding current sequence to the incoming sequence List.
+		if (!incomingSequenceList.contains(sequenceId)){
+			incomingSequenceList.add(sequenceId);
+		}
+		
+		//saving the message.
+		try {
+			String key = SandeshaUtil.storeMessageContext(rmMsgCtx
+					.getMessageContext());
+			storageMapMgr
+					.insert(new StorageMapBean(key, msgNo, sequenceId));
+
+			//This will avoid performing application processing more than
+			// once.
+			rmMsgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,
+					"true");
 
-			} catch (Exception ex) {
-				throw new SandeshaException(ex.getMessage());
-			}
-		} else {
-			//OK this is a correct message.
-			//(nextMsgNo>msgNo can not happen if EXCTLY_ONCE is enabled. This
-			// should have been
-			//		detected previously)
-
-			if (Constants.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE == Constants.DeliveryAssurance.IN_ORDER) {
-				//store and let invoker handle for IN_ORDER invocation
-				//rmMsgCtx.getMessageContext().setPausedTrue(new QName
-				// (Constants.IN_HANDLER_NAME));
-
-				try {
-					String key = SandeshaUtil.storeMessageContext(rmMsgCtx
-							.getMessageContext());
-					storageMapMgr.insert(new StorageMapBean(key, msgNo,
-							sequenceId));
-//					rmMsgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,"true");
-
-					
-					SequencePropertyBean msgProcessorListBean = seqPropMgr.retrieve(sequenceId,Constants.SequenceProperties.APP_MSG_PROCESSOR_LIST);
-					if (msgProcessorListBean == null){
-						ArrayList msgProcessorList = new ArrayList ();
-						msgProcessorListBean = new SequencePropertyBean (sequenceId,Constants.SequenceProperties.APP_MSG_PROCESSOR_LIST,msgProcessorList);
-						seqPropMgr.update(msgProcessorListBean);
-					}
-					
-					if (! (msgProcessorListBean.getValue() instanceof ArrayList)){
-						throw new SandeshaException ("Invalid property value");
-					}
-					
-					ArrayList msgProcessorList = (ArrayList) msgProcessorListBean.getValue();
-					msgProcessorList.add(this);
-					
-					while (!isLetInvoke()){
-						Thread.sleep(Constants.INVOKER_SLEEP_TIME);
-					}
-					
-				} catch (Exception ex) {
-					throw new SandeshaException(ex.getMessage());
-				}
-			} else {
-				//if IN_ORDER is not required. Simply let this invoke (by doing
-				// nothing here :D )
-			}
+		} catch (Exception ex) {
+			throw new SandeshaException(ex.getMessage());
 		}
+		
+		//Starting the invoker if stopped.
+		SandeshaUtil.startInvokerIfStopped(msgCtx.getSystemContext());
+		
+		
+
 	}
-	
-	public synchronized void letInvoke () {
-		letInvoke = true;
-	}
-	
-	public synchronized boolean isLetInvoke () {
-		return letInvoke;
-	}
+
+//	public synchronized void letInvoke() {
+//		letInvoke = true;
+//	}
+//
+//	public synchronized boolean isLetInvoke() {
+//		return letInvoke;
+//	}
 
 	//TODO convert following from INT to LONG
 	private boolean msgNoPresentInList(String list, long no) {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java?rev=293551&r1=293550&r2=293551&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java Mon Oct  3 23:38:26 2005
@@ -35,6 +35,8 @@
 	
 	private long messageNumber = 0;
 	
+	private boolean reSend = true;
+	
 	public RetransmitterBean () {
 		
 	}
@@ -103,6 +105,15 @@
 	
 	public void setMessageNumber(long messageNumber) {
 		this.messageNumber = messageNumber;
+	}
+	
+	
+	public boolean isReSend() {
+		return reSend;
+	}
+	
+	public void setReSend(boolean reSend) {
+		this.reSend = reSend;
 	}
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=293551&r1=293550&r2=293551&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Mon Oct  3 23:38:26 2005
@@ -34,10 +34,13 @@
 import org.apache.axis2.description.TransportInDescription;
 import org.apache.axis2.description.TransportOutDescription;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.om.OMElement;
 import org.apache.axis2.om.impl.MIMEOutputUtils;
+import org.apache.axis2.soap.SOAPEnvelope;
 import org.apache.axis2.transport.http.HTTPConstants;
 import org.apache.axis2.util.UUIDGenerator;
 import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.InOrderInvoker;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.Sender;
@@ -54,6 +57,7 @@
 	private static Hashtable storedMsgContexts = new Hashtable();
 
 	private static Sender sender = new Sender();
+	private static InOrderInvoker invoker = new InOrderInvoker ();
 
 	public static String getUUID() {
 		String uuid = "uuid:" + UUIDGenerator.getUUID();
@@ -276,6 +280,13 @@
 			sender.start(context);
 		}
 	}
+	
+	public static void startInvokerIfStopped(ConfigurationContext context) {
+		if (!invoker.isInvokerStarted()) {
+			System.out.println ("Starting invoker. SandeshaUtil.....");
+			invoker.start(context);
+		}
+	}
 
 	public static boolean verifySequenceCompletion(Iterator ackRangesIterator,
 			long lastMessageNo) {
@@ -305,4 +316,8 @@
 
 		return false;
 	}
+	
+//	public SOAPEnvelope cloneSOAPEnvelope (SOAPEnvelope oldEnvelope) {
+//		
+//	}
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org