You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2006/11/21 16:13:49 UTC

svn commit: r477693 - in /webservices/sandesha/trunk/java: src/org/apache/sandesha2/ src/org/apache/sandesha2/client/ src/org/apache/sandesha2/i18n/ src/org/apache/sandesha2/util/ src/org/apache/sandesha2/workers/ test/src/org/apache/sandesha2/workers/

Author: mlovett
Date: Tue Nov 21 07:13:48 2006
New Revision: 477693

URL: http://svn.apache.org/viewvc?view=rev&rev=477693
Log:
Tom's patch for SANDESHA2-43

Added:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java   (with props)
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java   (with props)
    webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/
    webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java   (with props)
Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Tue Nov 21 07:13:48 2006
@@ -258,6 +258,11 @@
 		String CLIENT_COMPLETED_MESSAGES = "ClientCompletedMessages";
 		String SERVER_COMPLETED_MESSAGES = "ServerCompletedMessages";
 		
+		//For IN_ORDER sequences, we can have finite ranges of messages that can be
+		//delivered out of order. These are maintained as a String that is consistent
+		//with the form described in  org.apache.sandesha2.util.RangeString
+		String OUT_OF_ORDER_RANGES = "OutOfOrderRanges";
+		
 		String TO_EPR = "ToEPR";
 
 		String ACKS_TO_EPR = "acksToEPR";

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java Tue Nov 21 07:13:48 2006
@@ -57,6 +57,7 @@
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.workers.Invoker;
 import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.CloseSequence;
 import org.apache.sandesha2.wsrm.Identifier;
@@ -718,6 +719,30 @@
 		options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
 	}
 
+	/**
+	 * Forces any inbound messages currently on the specified inOrder inbound sequence to be dispatched out of order.
+	 * @param configContext
+	 * @param sequenceID
+	 * @param allowLaterDeliveryOfMissingMessages if true, messages skipped over during this
+	 * action will be invoked if they arrive on the system at a later time. 
+	 * Otherwise messages skipped over will be ignored
+	 * @throws SandeshaException
+	 */
+	public static void forceDispatchOfInboundMessages(ConfigurationContext configContext, 
+			String sequenceID,
+			boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
+		//only do this if we are running inOrder
+		if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
+			Invoker invoker = (Invoker) configContext.getProperty(Sandesha2Constants.INVOKER);
+			if (invoker==null){
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.invokerNotFound, sequenceID));
+			}
+			
+			invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, allowLaterDeliveryOfMissingMessages);			
+		}
+	}
+	
 	private static String getInternalSequenceID(String to, String sequenceKey) {
 		return SandeshaUtil.getInternalSequenceID(to, sequenceKey);
 	}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Tue Nov 21 07:13:48 2006
@@ -68,6 +68,7 @@
 	public static final String cannotInnitMessage="cannotInnitMessage";
 	public static final String propertyInvalidValue="propertyInvalidValue";
 	public static final String couldNotCopyParameters="couldNotCopyParameters";
+	public static final String invalidRange="invalidRange";
 	public static final String senderBeanNotFound="senderBeanNotFound";
 	public static final String workAlreadyAssigned="workAlreadyAssigned";
 	public static final String workNotPresent="workNotPresent";
@@ -250,9 +251,10 @@
     public final static String addressingNamespaceNotSet = "addressingNamespaceNotSet";
     public final static String invalidOfferNoResponseEndpoint = "invalidOfferNoResponseEndpoint";
     public final static String invalidElementFoundWithinElement = "invalidElementFoundWithinElement";
+    public final static String invokerNotFound="invokerNotFound";
     
     public final static String couldNotSendAckRequestSeqNotFound="couldNotSendAckRequestSeqNotFound";
     public final static String couldNotSendCloseResponse="couldNotSendCloseResponse";
     public final static String couldNotSendCloseSeqNotFound="couldNotSendCloseSeqNotFound";
     
-}
\ No newline at end of file
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Tue Nov 21 07:13:48 2006
@@ -88,6 +88,7 @@
 cannotInnitMessage=Sandesha2 Internal error: cannot initialize the message.
 propertyInvalidValue=Sandesha2 Internal error: property {0} contains an invalid value.
 couldNotCopyParameters=Could not copy parameters when creating the new RM Message. See the following exception for more details: {0}.
+invalidRange=The specified range was invalid: {0}, {1}
 
 #-------------------------------------
 #
@@ -266,6 +267,7 @@
 cannotFindTransportInDesc=Cannot find the transport in description {0} in the ConfigurationContext
 invalidOfferNoResponseEndpoint=Cannot derive a valid offer from the given infomation. No Endpoint for receiving messages.
 invalidElementFoundWithinElement=Found invalid ''{0}'' element within ''{1}'' element
+invokerNotFound=An invoker thread was not found to dispatch messages on the inbound sequence {0}.
 
 #------------------
 # Security messages
@@ -279,4 +281,4 @@
 proofOfPossessionNotVerified = Proof of possession not verified
 noSecurityResults = No Security results
 noSecConvTokenInPolicy = No SecureConversationToken in policy
-noServicePolicy=Service policy missing
\ No newline at end of file
+noServicePolicy=Service policy missing

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java?view=auto&rev=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java Tue Nov 21 07:13:48 2006
@@ -0,0 +1,85 @@
+package org.apache.sandesha2.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+/**
+ * Data structure to represent a range of values from lowerValue->upperValue inclusive.
+ */
+public class Range {
+
+	private static final Log log = LogFactory.getLog(Range.class);
+	
+	long lowerValue;
+	long upperValue;
+	
+	/**
+	 * 
+	 * @param low
+	 * @param high
+	 * 
+	 * NOTE: low and high can be equal
+	 */
+	public Range(long low, long high){
+		if(high<low || high<0 || low<0){
+			throw new IllegalArgumentException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.invalidRange, ""+(low), ""+(high)));
+		}
+		lowerValue = low;
+		upperValue = high;
+	}
+	
+	/**
+	 * Construct a range from a String object
+	 * @param s a String of the form [lower, upper]
+	 * NOTE: lower and upper can be equal
+	 */
+	public Range(String s){
+		s = s.trim();
+
+		int length = s.length();
+
+		if (s.charAt(0) != '[' || s.charAt(length - 1) != ']') {
+			String message = SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.invalidStringArray, s);
+			log.debug(message);
+			throw new IllegalArgumentException(message);
+		}
+		
+		//remove the braces on either end
+		String subStr = s.substring(1, length - 1);
+
+		String[] parts = subStr.split(",");
+
+		if(parts.length!=2){
+			String message = SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.invalidStringArray, s);
+			log.debug(message);
+			throw new IllegalArgumentException(message);
+		}
+		
+		lowerValue = Long.parseLong(parts[0]);
+		upperValue = Long.parseLong(parts[1]);
+	}
+	
+	
+	/**
+	 * Value is considered to be "in range" if it is with the limits set by the
+	 * upper and lower values.
+	 * e.g. [x, x+n] would return true for all values in the set [x...x+n]
+	 */
+	public boolean rangeContainsValue(long value){
+		if(value<=upperValue && value>=lowerValue){
+			return true;
+		}
+		else return false;
+	}
+	
+	public String toString(){
+		return "[" + lowerValue + "," + upperValue + "]";
+	}
+	
+}

Propchange: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java?view=auto&rev=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java Tue Nov 21 07:13:48 2006
@@ -0,0 +1,181 @@
+package org.apache.sandesha2.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Helper class.
+ * Enables conversion to from a list of Range objects to a String representation
+ * of that list.
+ * Also performs task such as aggregation of ranges
+ *
+ */
+public class RangeString {
+
+	/**
+	 * Each entry in this map is a range
+	 * The key to each range entry is range.lowerValue
+	 */
+	private Map rangeMap;
+	
+	/**
+	 * Expects a String of the form
+	 * [x1,y1][x2,y2]...[xn,yn]
+	 * @param s
+	 */
+	public RangeString(String s){
+
+		rangeMap = Collections.synchronizedMap(new HashMap());
+
+		if(s!=null && !s.equals("")){
+			//Walk the string building range objects as we go, and
+			//put them in the map
+			Pattern p = Pattern.compile("\\[(.+?),(.+?)\\]");
+			Matcher m = p.matcher(s);
+			while(m.find()){
+				String token = m.group();
+				addRange(new Range(token));
+			}			
+		}
+		
+	}
+	
+	
+	private Range getNextRangeBelow(long msgNumber){
+		//start at the specified index and work down the list of ranges
+		//util we find one
+		
+		for(long i = msgNumber; i>=0; i--){
+			Range r = (Range)rangeMap.get(new Long(i));
+			if(r!=null){
+				//this is the next range below
+				return r;
+			}
+		}
+		//no range below this one
+		return null; 
+	}
+	
+	private Range getRangeImmediatelyAbove(long msgNumber){
+		//see if there is a range that starts imemdiately
+		//above the specified number
+		long targetRange = msgNumber + 1;
+		return (Range)rangeMap.get(new Long(targetRange));
+	}
+	
+	
+	public boolean isMessageNumberInRanges(long messageNumber){
+		Range below = getNextRangeBelow(messageNumber);
+		if(below!=null){
+			if(below.rangeContainsValue(messageNumber)){
+				//this range contains our value
+				return true;
+			}
+		}
+		
+		//if we made it here then we are not in any ranges
+		return false;
+	}
+	
+	/**
+	 * Returns a String representation of the ranges contained in this object
+	 * @return a String of the form [x1,y1][x2,y2]...[xn,yn]
+	 */
+	public String toString(){
+		//iterate the rangeList creating an on-going string
+		Set keySet = rangeMap.keySet();
+		//sort the set
+		List sortedList = new LinkedList(keySet);
+		Collections.sort(sortedList);
+		String returnString = "";
+		for(int i=0; i<sortedList.size(); i++){
+			returnString = returnString + (rangeMap.get((Long)sortedList.get(i))).toString();
+		}
+		
+		return returnString;
+	}
+	
+	/**
+	 * Returns a list string of the form
+	 * [x1,x2,x3....xn] listing each discrete number contained in all of the ranges
+	 * in order
+	 */
+	public String getContainedElementsAsListString(){
+		//iterate the rangeList creating an on-going string
+		Set keySet = rangeMap.keySet();
+		//sort the set
+		List sortedList = new LinkedList(keySet);
+		Collections.sort(sortedList);
+		String returnString = "[";
+		for(int i=0; i<sortedList.size(); i++){
+			Range r = (Range)rangeMap.get((Long)sortedList.get(i));
+			for(long l=r.lowerValue; l<=r.upperValue;l++){
+				if(i==0 && l==r.lowerValue){
+					//first time does not need leading ','
+					returnString += l;						
+				}
+				else{
+					returnString += "," + l;						
+				}
+			}
+		}
+		
+		return returnString + "]";		
+	}
+	
+	public void addRange(Range r){
+		//first we try to aggregate existing ranges
+		boolean rangeAdded = false;
+		long indexKey = r.lowerValue;
+		//see if there is a range below that we can extend up
+		Range below = getNextRangeBelow(indexKey);
+		if(below!=null){
+			if(below.upperValue == (r.lowerValue -1)){
+				//we can extend this range up
+				below.upperValue = r.upperValue;
+				//we do not quit yet, as maybe this has plugged a gap between
+				//an upper range. But we should mark the range as added.
+				rangeAdded = true;
+			}
+		}
+		
+		//see if we can extend another range down
+		Range above = getRangeImmediatelyAbove(r.upperValue);
+		if(above!=null){
+			//we can extend this down
+			//first remove it. Then we will either add it under its new key or 
+			//keep it removed
+			rangeMap.remove(new Long(above.lowerValue));
+			above.lowerValue = r.lowerValue; //extend down
+			if(rangeAdded){
+				//we extend down and up - join two ranges together
+				//Sicne we have removed the upper, we simply do not add it again and set the
+				//lower range to encompass both of them
+				below.upperValue = above.upperValue;
+			}
+			else{
+				//we did extend up but we did not extend down. Add the upper range back under its new key
+				rangeAdded = true;				
+				rangeMap.put(new Long(above.lowerValue), above);
+			}
+
+		}
+		
+		if(!rangeAdded){
+			//if we got here and did not add a range then we need to 
+			//genuinely add a new range object
+			rangeMap.put(new Long(r.lowerValue), r);			
+		}
+
+		
+	}
+	
+
+}

Propchange: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java Tue Nov 21 07:13:48 2006
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.axis2.addressing.AddressingConstants;
@@ -43,6 +44,8 @@
 import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.TerminateManager;
 import org.apache.sandesha2.wsrm.Sequence;
@@ -59,7 +62,10 @@
 	private ArrayList workingSequences = new ArrayList();
 	private ConfigurationContext context = null;
 	private static final Log log = LogFactory.getLog(Invoker.class);
-	private boolean hasStopped = false;
+	
+	private boolean hasStoppedInvoking = false;
+	private boolean hasPausedInvoking = false;
+	private boolean pauseRequired = false;
 	
 	private transient ThreadFactory threadPool;
 	public int INVOKER_THREADPOOL_SIZE = 5;
@@ -85,11 +91,167 @@
 		if (log.isDebugEnabled())
 			log.debug("Exit: InOrderInvoker::stopInvokerForTheSequence");
 	}
+	
+	
+	/**
+	 * Waits for the invoking thread to pause
+	 */
+	public synchronized void blockForPause(){
+		while(pauseRequired){
+			//someone else is requesting a pause - wait for them to finish
+			try{
+				wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
+			}catch(InterruptedException e){
+				//ignore
+			}
+		}
+		
+	  //we can now request a pause - the next pause will be ours
+	  pauseRequired = true;
+				
+		if(hasStoppedInvoking() || !isInvokerStarted()){
+			throw new IllegalStateException("Cannot pause a non-running invoker thread"); //TODO NLS
+		}
+		while(!hasPausedInvoking){
+			//wait for our pause to come around
+			try{
+				wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
+			}catch(InterruptedException e){
+				//ignore
+			}
+			
+		}
+		//the invoker thread is now paused
+	}
+	
+	private synchronized void finishPause(){
+		//indicate that the current pause is no longer required.
+		pauseRequired = false;
+		notifyAll();
+	}
+	
+	/**
+	 * Forces dispatch of queued messages to the application.
+	 * NOTE: may break ordering
+	 * @param ctx
+	 * @param sequenceID
+	 * @param allowLaterDeliveryOfMissingMessages if true, messages skipped over during this
+	 * action will be invoked if they arrive on the system at a later time. 
+	 * Otherwise messages skipped over will be ignored
+	 * @throws SandeshaException
+	 */
+	public synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx, 
+			String sequenceID,
+			boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
+		//first we block while we wait for the invoking thread to pause
+		blockForPause();
+		try{
+			//get all invoker beans for the sequence
+			StorageManager storageManager = 
+				SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+	
+			InvokerBeanMgr storageMapMgr = storageManager
+					.getStorageMapBeanMgr();
+			NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+			NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceID);
+			
+			if (nextMsgBean != null) {
+				
+				//The outOfOrder window is the set of known sequence messages (including those
+				//that are missing) at the time the button is pressed.
+				long firstMessageInOutOfOrderWindow = nextMsgBean.getNextMsgNoToProcess();
+			
+				Iterator stMapIt = 
+					storageMapMgr.find(new InvokerBean(null, 0, sequenceID)).iterator();
+				
+				long highestMsgNumberInvoked = 0;
+				Transaction transaction = null;
+				
+				//invoke each bean in turn. 
+				//NOTE: here we are breaking ordering
+				while(stMapIt.hasNext()){
+					transaction = storageManager.getTransaction();
+					InvokerBean invoker = (InvokerBean)stMapIt.next();
+					
+					//invoke the app
+					try{
+						// start a new worker thread and let it do the invocation.
+						String workId = sequenceID + "::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
+					   //piece of work that will be assigned to the Worker.
+						
+						String messageContextKey = invoker.getMessageContextRefKey();
+						InvokerWorker worker = new InvokerWorker(context,
+								messageContextKey, 
+								true); //want to ignore the enxt msg number
+						
+						worker.setLock(lock);
+						worker.setWorkId(workId);
+						
+						//before we execute we need to set the 
+						
+						threadPool.execute(worker);
+					
+						//adding the workId to the lock after assigning it to a thread makes sure 
+						//that all the workIds in the Lock are handled by threads.
+						lock.addWork(workId);
+
+						long msgNumber = invoker.getMsgNo();
+						//if necessary, update the "next message number" bean under this transaction
+						if(msgNumber>highestMsgNumberInvoked){
+							highestMsgNumberInvoked = invoker.getMsgNo();
+							nextMsgBean.setNextMsgNoToProcess(highestMsgNumberInvoked+1);
+							nextMsgMgr.update(nextMsgBean);
+							
+							if(allowLaterDeliveryOfMissingMessages){
+								//we also need to update the sequence OUT_OF_ORDER_RANGES property
+								//so as to include our latest view of this outOfOrder range.
+								//We do that here (rather than once at the end) so that we reamin
+								//transactionally consistent
+								Range r = new Range(firstMessageInOutOfOrderWindow,highestMsgNumberInvoked);
+										
+								RangeString rangeString = null;
+								SequencePropertyBeanMgr seqPropertyManager = storageManager.getSequencePropertyBeanMgr();
+								SequencePropertyBean outOfOrderRanges = 
+									seqPropertyManager.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);
+								if(outOfOrderRanges==null){
+									//insert a new blank one one
+									outOfOrderRanges = new SequencePropertyBean(sequenceID,
+											Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES,
+											"");
+
+									seqPropertyManager.insert(outOfOrderRanges);
+									rangeString = new RangeString("");
+								}
+								else{
+									rangeString = new RangeString(outOfOrderRanges.getValue());
+								}
+								//update the range String with the new value
+								rangeString.addRange(r);
+								outOfOrderRanges.setValue(rangeString.toString());
+								seqPropertyManager.update(outOfOrderRanges);
+							}
+						}
+						
+						transaction.commit();
+					}
+					catch(Exception e){
+						transaction.rollback();
+					}
+		
+				}//end while
+			}
+		}
+		finally{
+			//restart the invoker
+			finishPause();
+		}
+	}
 
 	public synchronized void stopInvoking() {
 		if (log.isDebugEnabled())
 			log.debug("Enter: InOrderInvoker::stopInvoking");
-
+		//NOTE: we do not take acount of pausing when stopping.
+		//The call to stop will wait until the invoker has exited the loop
 		if (isInvokerStarted()) {
 			// the invoker is started so stop it
 			runInvoker = false;
@@ -137,9 +299,9 @@
 			log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
 			log
 					.debug("Exit: InOrderInvoker::hasStoppedInvoking, "
-							+ hasStopped);
+							+ hasStoppedInvoking);
 		}
-		return hasStopped;
+		return hasStoppedInvoking;
 	}
 
 	public void run() {
@@ -152,7 +314,7 @@
 			// flag that we have exited the run loop and notify any waiting
 			// threads
 			synchronized (this) {
-				hasStopped = true;
+				hasStoppedInvoking = true;
 				notify();
 			}
 		}
@@ -161,6 +323,42 @@
 			log.debug("Exit: InOrderInvoker::run");
 	}
 
+	private void addOutOfOrderInvokerBeansToList(String sequenceID, 
+			StorageManager strMgr, List list)throws SandeshaException{
+		if (log.isDebugEnabled())
+			log.debug("Enter: InOrderInvoker::addOutOfOrderInvokerBeansToList");
+		
+		SequencePropertyBeanMgr seqPropertyManager = strMgr.getSequencePropertyBeanMgr();
+		
+		SequencePropertyBean outOfOrderRanges = 
+			seqPropertyManager.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);		
+		if(outOfOrderRanges!=null){
+			String sequenceRanges = outOfOrderRanges.getValue();
+			RangeString rangeString = new RangeString(sequenceRanges);
+			//we now have the set of ranges that can be delivered out of order.
+			//Look for any invokable message that lies in one of those ranges
+			Iterator invokerBeansIterator = 
+				strMgr.getStorageMapBeanMgr().find(
+						new InvokerBean(null, 
+														0,  //finds all invoker beans
+														sequenceID)).iterator();
+			
+			while(invokerBeansIterator.hasNext()){
+				InvokerBean invokerBean = (InvokerBean)invokerBeansIterator.next();
+				
+				if(rangeString.isMessageNumberInRanges(invokerBean.getMsgNo())){
+					//an invoker bean that has not been deleted and lies in an out
+					//or order range - we can add this to the list
+					list.add(invokerBean);
+				}
+			}
+			
+		}
+			
+		if (log.isDebugEnabled())
+			log.debug("Exit: InOrderInvoker::addOutOfOrderInvokerBeansToList");
+	}
+	
 	private void internalRun() {
 		if (log.isDebugEnabled())
 			log.debug("Enter: InOrderInvoker::internalRun");
@@ -177,6 +375,26 @@
 				log.debug("Invoker was Inturrepted....");
 				log.debug(ex.getMessage());
 			}
+					
+			//see if we need to pause
+			synchronized(this){
+				
+				while(pauseRequired){
+					if(!hasPausedInvoking){
+						//let the requester of this pause know we are now pausing
+					  hasPausedInvoking = true;
+					  notifyAll();						
+					}
+					//now we pause
+				  try{
+				  	wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
+				  }catch(InterruptedException e){
+				  	//ignore
+				  }
+				}//end while
+				//the request to pause has finished so we are no longer pausing
+				hasPausedInvoking = false;
+			}
 
 			Transaction transaction = null;
 			boolean rolebacked = false;
@@ -242,32 +460,43 @@
 					throw new SandeshaException(message);
 				}
 
-				Iterator stMapIt = storageMapMgr.find(
-						new InvokerBean(null, nextMsgno, sequenceId))
-						.iterator();
-
+				List invokerBeans = storageMapMgr.find(
+						new InvokerBean(null, nextMsgno, sequenceId));
+				
+				//add any msgs that belong to out of order windows
+				addOutOfOrderInvokerBeansToList(sequenceId, 
+						storageManager, invokerBeans);
+				
+				Iterator stMapIt = invokerBeans.iterator();
 				
 				//TODO correct the locking mechanism to have one lock per sequence.
 				
-				if (stMapIt.hasNext()) { //the next Msg entry is present.
+				if (stMapIt.hasNext()) { //some invokation work is present
 
-					String workId = sequenceId + "::" + nextMsgno; //creating a workId to uniquely identify the
+					InvokerBean bean = (InvokerBean) stMapIt.next();
+					//see if this is an out of order msg
+					boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
+					
+					String workId = sequenceId + "::" + bean.getMsgNo(); 
+																		//creating a workId to uniquely identify the
 																   //piece of work that will be assigned to the Worker.
 										
-					//check weather the bean is already assigned to a worker.
+					//check whether the bean is already assigned to a worker.
 					if (lock.isWorkPresent(workId)) {
 						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
 						log.debug(message);
 						continue;
 					}
-					
-					InvokerBean bean = (InvokerBean) stMapIt.next();
+
 					String messageContextKey = bean.getMessageContextRefKey();
 					
 					transaction.commit();
 
 					// start a new worker thread and let it do the invocation.
-					InvokerWorker worker = new InvokerWorker(context,messageContextKey);
+					InvokerWorker worker = new InvokerWorker(context,
+							messageContextKey, 
+							beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
+																		//out of order message
 					
 					worker.setLock(lock);
 					worker.setWorkId(workId);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java Tue Nov 21 07:13:48 2006
@@ -25,12 +25,14 @@
 
 	ConfigurationContext configurationContext = null;
 	String messageContextKey;
+	boolean ignoreNextMsg = false;
 	
 	Log log = LogFactory.getLog(InvokerWorker.class);
 	
-	public InvokerWorker (ConfigurationContext configurationContext, String messageContextKey) {
+	public InvokerWorker (ConfigurationContext configurationContext, String messageContextKey, boolean ignoreNextMsg) {
 		this.configurationContext = configurationContext;
 		this.messageContextKey = messageContextKey;
+		this.ignoreNextMsg = ignoreNextMsg;
 	}
 	
 	public void run() {
@@ -122,12 +124,6 @@
 			if (msgCtx != null) {
 				storageManager.removeMessageContext(messageContextKey);
 			}
-
-			// updating the next msg to invoke
-
-			String s = invokerBean.getSequenceID();
-			NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
-
 			
 			if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
 				Sequence sequence = (Sequence) rmMsg
@@ -143,26 +139,30 @@
 				}
 			}
 			
-			long nextMsgNo = nextMsgBean.getNextMsgNoToProcess();
-			
-			if (!(messageNo==nextMsgNo)) {
-				String message = "Operated message number is different from the Next Message Number to invoke";
-				throw new SandeshaException (message);
-			}
-			
-			if (invoked) {
-				nextMsgNo++;
-				nextMsgBean.setNextMsgNoToProcess(nextMsgNo);
-				nextMsgMgr.update(nextMsgBean);
+			if(!ignoreNextMsg){
+				// updating the next msg to invoke
+
+				String s = invokerBean.getSequenceID();
+				NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+				long nextMsgNo = nextMsgBean.getNextMsgNoToProcess();
+				
+				if (!(messageNo==nextMsgNo)) {
+					String message = "Operated message number is different from the Next Message Number to invoke";
+					throw new SandeshaException (message);
+				}
+				
+				if (invoked) {
+					nextMsgNo++;
+					nextMsgBean.setNextMsgNoToProcess(nextMsgNo);
+					nextMsgMgr.update(nextMsgBean);
+				}				
 			}
 		} catch (SandeshaStorageException e) {
 			transaction.rollback();
 		} catch (SandeshaException e) {
-			e.printStackTrace(); //TODO remove
-			log.error(e);
+			log.error(e.toString(), e);
 		} catch (Exception e) {
-			e.printStackTrace();
-			log.error(e);
+			log.error(e.toString(), e);
 		} finally {
 			if (transaction!=null && transaction.isActive())
 				transaction.commit();

Added: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java?view=auto&rev=477693
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java (added)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java Tue Nov 21 07:13:48 2006
@@ -0,0 +1,217 @@
+package org.apache.sandesha2.workers;
+
+import java.io.File;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.transport.http.SimpleHTTPServer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.SandeshaTestCase;
+import org.apache.sandesha2.client.SandeshaClient;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.Invoker;
+
+public class ForceInboundDispatchTest extends SandeshaTestCase  {
+
+	SimpleHTTPServer httpServer = null;
+	private final String applicationNamespaceName = "http://tempuri.org/"; 
+	private final String ping = "ping";
+	private final String Text = "Text";
+
+	private Log log = LogFactory.getLog(getClass());
+	int serverPort = DEFAULT_SERVER_TEST_PORT;
+	
+	ConfigurationContext serverConfigCtx = null;
+	
+	public ForceInboundDispatchTest () {
+        super ("ForceDispatchTest");
+	}
+	
+	public void setUp () throws AxisFault {
+		
+		String repoPath = "target" + File.separator + "repos" + File.separator + "server";
+		String axis2_xml = "target" + File.separator + "repos" + File.separator + "server" + File.separator + "server_axis2.xml";
+
+		serverConfigCtx = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+		String serverPortStr = getTestProperty("test.server.port");
+		if (serverPortStr!=null) {
+			try {
+				serverPort = Integer.parseInt(serverPortStr);
+			} catch (NumberFormatException e) {
+				log.error(e);
+			}
+		}
+		
+		httpServer = new SimpleHTTPServer (serverConfigCtx,serverPort);
+		httpServer.start();
+		try {
+			Thread.sleep(300);
+		} catch (InterruptedException e) {
+			throw new SandeshaException ("sleep interupted");
+		}
+	}
+	
+	public void tearDown () throws SandeshaException {
+		if (httpServer!=null)
+			httpServer.stop();
+		
+		try {
+			Thread.sleep(300);
+		} catch (InterruptedException e) {
+			throw new SandeshaException ("sleep interupted");
+		}
+	}
+	
+	public void testForceInvoke () throws AxisFault,InterruptedException  {
+		
+		String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+		String transportTo = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+		
+		String repoPath = "target" + File.separator + "repos" + File.separator + "client";
+		String axis2_xml = "target" + File.separator + "repos" + File.separator + "client" + File.separator + "client_axis2.xml";
+
+		ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+		Options clientOptions = new Options ();
+		clientOptions.setSoapVersionURI(SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+		
+		clientOptions.setTo(new EndpointReference (to));
+		clientOptions.setProperty(MessageContextConstants.TRANSPORT_URL,transportTo);
+		
+		String sequenceKey = "sequence1";
+		clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+		ServiceClient serviceClient = new ServiceClient (configContext,null);
+		serviceClient.setOptions(clientOptions);
+		
+		try{
+			serviceClient.fireAndForget(getPingOMBlock("ping1"));		
+			
+			//now deliver the next out of order 
+			clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(3));
+			serviceClient.fireAndForget(getPingOMBlock("ping3"));
+	
+			Thread.sleep(5000);
+			
+			String inboundSequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(SandeshaUtil.getInternalSequenceID(to, sequenceKey),
+					SandeshaUtil.getInMemoryStorageManager(configContext));
+			
+			SandeshaClient.forceDispatchOfInboundMessages(serverConfigCtx, 
+					inboundSequenceID, 
+					true); //allow later msgs to be delivered 
+			
+			//check that the server is now expecting msg 4
+			StorageManager serverStore = SandeshaUtil.getInMemoryStorageManager(serverConfigCtx);
+			NextMsgBean nextMsgBean = 
+				serverStore.getNextMsgBeanMgr().retrieve(inboundSequenceID);
+			assertNotNull(nextMsgBean);
+			assertEquals(nextMsgBean.getNextMsgNoToProcess(), 4);
+			
+			//also check that the sequence has an out of order gap that contains msg 2
+			SequencePropertyBean outOfOrderRanges = 
+				serverStore.getSequencePropertyBeanMgr().retrieve(
+							inboundSequenceID, 
+							Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);
+			
+			assertNotNull(outOfOrderRanges);
+			RangeString rangeString = new RangeString(outOfOrderRanges.getValue());
+			assertTrue(rangeString.isMessageNumberInRanges(2));
+			
+			//we deliver msg 2
+			//set highest out msg number to 1
+			SequencePropertyBean nextMsgNoBean = 
+					SandeshaUtil.getInMemoryStorageManager(configContext).getSequencePropertyBeanMgr().
+					retrieve(SandeshaUtil.getInternalSequenceID(to, sequenceKey),
+					Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
+			nextMsgNoBean.setValue("1");
+			
+			clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(2));
+			serviceClient.fireAndForget(getPingOMBlock("ping2"));
+		}
+		finally{
+			configContext.getListenerManager().stop();
+			serviceClient.cleanup();			
+		}
+
+	}
+	
+	public void testForceInvokeWithDiscardGaps () throws AxisFault,InterruptedException  {
+		
+		String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+		String transportTo = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+		
+		String repoPath = "target" + File.separator + "repos" + File.separator + "client";
+		String axis2_xml = "target" + File.separator + "repos" + File.separator + "client" + File.separator + "client_axis2.xml";
+
+		ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+		Options clientOptions = new Options ();
+		clientOptions.setSoapVersionURI(SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+		
+		clientOptions.setTo(new EndpointReference (to));
+		clientOptions.setProperty(MessageContextConstants.TRANSPORT_URL,transportTo);
+		
+		String sequenceKey = "sequence1";
+		clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+		ServiceClient serviceClient = new ServiceClient (configContext,null);
+		serviceClient.setOptions(clientOptions);
+		try
+		{
+			serviceClient.fireAndForget(getPingOMBlock("ping1"));		
+			
+			//now deliver the next out of order 
+			clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(3));
+			serviceClient.fireAndForget(getPingOMBlock("ping3"));
+	
+			Thread.sleep(5000);
+			
+			String inboundSequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(SandeshaUtil.getInternalSequenceID(to, sequenceKey),
+					SandeshaUtil.getInMemoryStorageManager(configContext));
+			
+			SandeshaClient.forceDispatchOfInboundMessages(serverConfigCtx, inboundSequenceID, false);
+			
+			//check that the server is now expecting msg 4
+			NextMsgBean nextMsgBean = 
+				SandeshaUtil.getInMemoryStorageManager(serverConfigCtx).getNextMsgBeanMgr().
+					retrieve(inboundSequenceID);
+			assertNotNull(nextMsgBean);
+			assertEquals(nextMsgBean.getNextMsgNoToProcess(), 4);
+	  }
+		finally{
+			configContext.getListenerManager().stop();
+			serviceClient.cleanup();			
+		}
+
+	}
+	
+	private OMElement getPingOMBlock(String text) {
+		OMFactory fac = OMAbstractFactory.getOMFactory();
+		OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName,"ns1");
+		OMElement pingElem = fac.createOMElement(ping, namespace);
+		OMElement textElem = fac.createOMElement(Text, namespace);
+		
+		textElem.setText(text);
+		pingElem.addChild(textElem);
+
+		return pingElem;
+	}
+
+}

Propchange: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org