You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2006/11/21 16:13:49 UTC
svn commit: r477693 - in /webservices/sandesha/trunk/java:
src/org/apache/sandesha2/ src/org/apache/sandesha2/client/
src/org/apache/sandesha2/i18n/ src/org/apache/sandesha2/util/
src/org/apache/sandesha2/workers/ test/src/org/apache/sandesha2/workers/
Author: mlovett
Date: Tue Nov 21 07:13:48 2006
New Revision: 477693
URL: http://svn.apache.org/viewvc?view=rev&rev=477693
Log:
Tom's patch for SANDESHA2-43
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java (with props)
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java (with props)
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java (with props)
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Tue Nov 21 07:13:48 2006
@@ -258,6 +258,11 @@
String CLIENT_COMPLETED_MESSAGES = "ClientCompletedMessages";
String SERVER_COMPLETED_MESSAGES = "ServerCompletedMessages";
+ //For IN_ORDER sequences, we can have finite ranges of messages that can be
+ //delivered out of order. These are maintained as a String that is consistent
+ //with the form described in org.apache.sandesha2.util.RangeString
+ String OUT_OF_ORDER_RANGES = "OutOfOrderRanges";
+
String TO_EPR = "ToEPR";
String ACKS_TO_EPR = "acksToEPR";
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java Tue Nov 21 07:13:48 2006
@@ -57,6 +57,7 @@
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.workers.Invoker;
import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.CloseSequence;
import org.apache.sandesha2.wsrm.Identifier;
@@ -718,6 +719,30 @@
options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
}
+ /**
+ * Forces any inbound messages currently on the specified inOrder inbound sequence to be dispatched out of order.
+ * @param configContext
+ * @param sequenceID
+ * @param allowLaterDeliveryOfMissingMessages if true, messages skipped over during this
+ * action will be invoked if they arrive on the system at a later time.
+ * Otherwise messages skipped over will be ignored
+ * @throws SandeshaException
+ */
+ public static void forceDispatchOfInboundMessages(ConfigurationContext configContext,
+ String sequenceID,
+ boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
+ //only do this if we are running inOrder
+ if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
+ Invoker invoker = (Invoker) configContext.getProperty(Sandesha2Constants.INVOKER);
+ if (invoker==null){
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.invokerNotFound, sequenceID));
+ }
+
+ invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, allowLaterDeliveryOfMissingMessages);
+ }
+ }
+
private static String getInternalSequenceID(String to, String sequenceKey) {
return SandeshaUtil.getInternalSequenceID(to, sequenceKey);
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Tue Nov 21 07:13:48 2006
@@ -68,6 +68,7 @@
public static final String cannotInnitMessage="cannotInnitMessage";
public static final String propertyInvalidValue="propertyInvalidValue";
public static final String couldNotCopyParameters="couldNotCopyParameters";
+ public static final String invalidRange="invalidRange";
public static final String senderBeanNotFound="senderBeanNotFound";
public static final String workAlreadyAssigned="workAlreadyAssigned";
public static final String workNotPresent="workNotPresent";
@@ -250,9 +251,10 @@
public final static String addressingNamespaceNotSet = "addressingNamespaceNotSet";
public final static String invalidOfferNoResponseEndpoint = "invalidOfferNoResponseEndpoint";
public final static String invalidElementFoundWithinElement = "invalidElementFoundWithinElement";
+ public final static String invokerNotFound="invokerNotFound";
public final static String couldNotSendAckRequestSeqNotFound="couldNotSendAckRequestSeqNotFound";
public final static String couldNotSendCloseResponse="couldNotSendCloseResponse";
public final static String couldNotSendCloseSeqNotFound="couldNotSendCloseSeqNotFound";
-}
\ No newline at end of file
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Tue Nov 21 07:13:48 2006
@@ -88,6 +88,7 @@
cannotInnitMessage=Sandesha2 Internal error: cannot initialize the message.
propertyInvalidValue=Sandesha2 Internal error: property {0} contains an invalid value.
couldNotCopyParameters=Could not copy parameters when creating the new RM Message. See the following exception for more details: {0}.
+invalidRange=The specified range was invalid: {0}, {1}
#-------------------------------------
#
@@ -266,6 +267,7 @@
cannotFindTransportInDesc=Cannot find the transport in description {0} in the ConfigurationContext
invalidOfferNoResponseEndpoint=Cannot derive a valid offer from the given infomation. No Endpoint for receiving messages.
invalidElementFoundWithinElement=Found invalid ''{0}'' element within ''{1}'' element
+invokerNotFound=An invoker thread was not found to dispatch messages on the inbound sequence {0}.
#------------------
# Security messages
@@ -279,4 +281,4 @@
proofOfPossessionNotVerified = Proof of possession not verified
noSecurityResults = No Security results
noSecConvTokenInPolicy = No SecureConversationToken in policy
-noServicePolicy=Service policy missing
\ No newline at end of file
+noServicePolicy=Service policy missing
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java?view=auto&rev=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java Tue Nov 21 07:13:48 2006
@@ -0,0 +1,85 @@
+package org.apache.sandesha2.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+/**
+ * Data structure to represent a range of values from lowerValue->upperValue inclusive.
+ */
+public class Range {
+
+ private static final Log log = LogFactory.getLog(Range.class);
+
+ long lowerValue;
+ long upperValue;
+
+ /**
+ *
+ * @param low
+ * @param high
+ *
+ * NOTE: low and high can be equal
+ */
+ public Range(long low, long high){
+ if(high<low || high<0 || low<0){
+ throw new IllegalArgumentException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.invalidRange, ""+(low), ""+(high)));
+ }
+ lowerValue = low;
+ upperValue = high;
+ }
+
+ /**
+ * Construct a range from a String object
+ * @param s a String of the form [lower, upper]
+ * NOTE: lower and upper can be equal
+ */
+ public Range(String s){
+ s = s.trim();
+
+ int length = s.length();
+
+ if (s.charAt(0) != '[' || s.charAt(length - 1) != ']') {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.invalidStringArray, s);
+ log.debug(message);
+ throw new IllegalArgumentException(message);
+ }
+
+ //remove the braces on either end
+ String subStr = s.substring(1, length - 1);
+
+ String[] parts = subStr.split(",");
+
+ if(parts.length!=2){
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.invalidStringArray, s);
+ log.debug(message);
+ throw new IllegalArgumentException(message);
+ }
+
+ lowerValue = Long.parseLong(parts[0]);
+ upperValue = Long.parseLong(parts[1]);
+ }
+
+
+ /**
+ * Value is considered to be "in range" if it is with the limits set by the
+ * upper and lower values.
+ * e.g. [x, x+n] would return true for all values in the set [x...x+n]
+ */
+ public boolean rangeContainsValue(long value){
+ if(value<=upperValue && value>=lowerValue){
+ return true;
+ }
+ else return false;
+ }
+
+ public String toString(){
+ return "[" + lowerValue + "," + upperValue + "]";
+ }
+
+}
Propchange: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/Range.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java?view=auto&rev=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java Tue Nov 21 07:13:48 2006
@@ -0,0 +1,181 @@
+package org.apache.sandesha2.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Helper class.
+ * Enables conversion to from a list of Range objects to a String representation
+ * of that list.
+ * Also performs task such as aggregation of ranges
+ *
+ */
+public class RangeString {
+
+ /**
+ * Each entry in this map is a range
+ * The key to each range entry is range.lowerValue
+ */
+ private Map rangeMap;
+
+ /**
+ * Expects a String of the form
+ * [x1,y1][x2,y2]...[xn,yn]
+ * @param s
+ */
+ public RangeString(String s){
+
+ rangeMap = Collections.synchronizedMap(new HashMap());
+
+ if(s!=null && !s.equals("")){
+ //Walk the string building range objects as we go, and
+ //put them in the map
+ Pattern p = Pattern.compile("\\[(.+?),(.+?)\\]");
+ Matcher m = p.matcher(s);
+ while(m.find()){
+ String token = m.group();
+ addRange(new Range(token));
+ }
+ }
+
+ }
+
+
+ private Range getNextRangeBelow(long msgNumber){
+ //start at the specified index and work down the list of ranges
+ //util we find one
+
+ for(long i = msgNumber; i>=0; i--){
+ Range r = (Range)rangeMap.get(new Long(i));
+ if(r!=null){
+ //this is the next range below
+ return r;
+ }
+ }
+ //no range below this one
+ return null;
+ }
+
+ private Range getRangeImmediatelyAbove(long msgNumber){
+ //see if there is a range that starts imemdiately
+ //above the specified number
+ long targetRange = msgNumber + 1;
+ return (Range)rangeMap.get(new Long(targetRange));
+ }
+
+
+ public boolean isMessageNumberInRanges(long messageNumber){
+ Range below = getNextRangeBelow(messageNumber);
+ if(below!=null){
+ if(below.rangeContainsValue(messageNumber)){
+ //this range contains our value
+ return true;
+ }
+ }
+
+ //if we made it here then we are not in any ranges
+ return false;
+ }
+
+ /**
+ * Returns a String representation of the ranges contained in this object
+ * @return a String of the form [x1,y1][x2,y2]...[xn,yn]
+ */
+ public String toString(){
+ //iterate the rangeList creating an on-going string
+ Set keySet = rangeMap.keySet();
+ //sort the set
+ List sortedList = new LinkedList(keySet);
+ Collections.sort(sortedList);
+ String returnString = "";
+ for(int i=0; i<sortedList.size(); i++){
+ returnString = returnString + (rangeMap.get((Long)sortedList.get(i))).toString();
+ }
+
+ return returnString;
+ }
+
+ /**
+ * Returns a list string of the form
+ * [x1,x2,x3....xn] listing each discrete number contained in all of the ranges
+ * in order
+ */
+ public String getContainedElementsAsListString(){
+ //iterate the rangeList creating an on-going string
+ Set keySet = rangeMap.keySet();
+ //sort the set
+ List sortedList = new LinkedList(keySet);
+ Collections.sort(sortedList);
+ String returnString = "[";
+ for(int i=0; i<sortedList.size(); i++){
+ Range r = (Range)rangeMap.get((Long)sortedList.get(i));
+ for(long l=r.lowerValue; l<=r.upperValue;l++){
+ if(i==0 && l==r.lowerValue){
+ //first time does not need leading ','
+ returnString += l;
+ }
+ else{
+ returnString += "," + l;
+ }
+ }
+ }
+
+ return returnString + "]";
+ }
+
+ public void addRange(Range r){
+ //first we try to aggregate existing ranges
+ boolean rangeAdded = false;
+ long indexKey = r.lowerValue;
+ //see if there is a range below that we can extend up
+ Range below = getNextRangeBelow(indexKey);
+ if(below!=null){
+ if(below.upperValue == (r.lowerValue -1)){
+ //we can extend this range up
+ below.upperValue = r.upperValue;
+ //we do not quit yet, as maybe this has plugged a gap between
+ //an upper range. But we should mark the range as added.
+ rangeAdded = true;
+ }
+ }
+
+ //see if we can extend another range down
+ Range above = getRangeImmediatelyAbove(r.upperValue);
+ if(above!=null){
+ //we can extend this down
+ //first remove it. Then we will either add it under its new key or
+ //keep it removed
+ rangeMap.remove(new Long(above.lowerValue));
+ above.lowerValue = r.lowerValue; //extend down
+ if(rangeAdded){
+ //we extend down and up - join two ranges together
+ //Sicne we have removed the upper, we simply do not add it again and set the
+ //lower range to encompass both of them
+ below.upperValue = above.upperValue;
+ }
+ else{
+ //we did extend up but we did not extend down. Add the upper range back under its new key
+ rangeAdded = true;
+ rangeMap.put(new Long(above.lowerValue), above);
+ }
+
+ }
+
+ if(!rangeAdded){
+ //if we got here and did not add a range then we need to
+ //genuinely add a new range object
+ rangeMap.put(new Long(r.lowerValue), r);
+ }
+
+
+ }
+
+
+}
Propchange: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RangeString.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java Tue Nov 21 07:13:48 2006
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import org.apache.axis2.addressing.AddressingConstants;
@@ -43,6 +44,8 @@
import org.apache.sandesha2.storage.beans.NextMsgBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.wsrm.Sequence;
@@ -59,7 +62,10 @@
private ArrayList workingSequences = new ArrayList();
private ConfigurationContext context = null;
private static final Log log = LogFactory.getLog(Invoker.class);
- private boolean hasStopped = false;
+
+ private boolean hasStoppedInvoking = false;
+ private boolean hasPausedInvoking = false;
+ private boolean pauseRequired = false;
private transient ThreadFactory threadPool;
public int INVOKER_THREADPOOL_SIZE = 5;
@@ -85,11 +91,167 @@
if (log.isDebugEnabled())
log.debug("Exit: InOrderInvoker::stopInvokerForTheSequence");
}
+
+
+ /**
+ * Waits for the invoking thread to pause
+ */
+ public synchronized void blockForPause(){
+ while(pauseRequired){
+ //someone else is requesting a pause - wait for them to finish
+ try{
+ wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
+ }catch(InterruptedException e){
+ //ignore
+ }
+ }
+
+ //we can now request a pause - the next pause will be ours
+ pauseRequired = true;
+
+ if(hasStoppedInvoking() || !isInvokerStarted()){
+ throw new IllegalStateException("Cannot pause a non-running invoker thread"); //TODO NLS
+ }
+ while(!hasPausedInvoking){
+ //wait for our pause to come around
+ try{
+ wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
+ }catch(InterruptedException e){
+ //ignore
+ }
+
+ }
+ //the invoker thread is now paused
+ }
+
+ private synchronized void finishPause(){
+ //indicate that the current pause is no longer required.
+ pauseRequired = false;
+ notifyAll();
+ }
+
+ /**
+ * Forces dispatch of queued messages to the application.
+ * NOTE: may break ordering
+ * @param ctx
+ * @param sequenceID
+ * @param allowLaterDeliveryOfMissingMessages if true, messages skipped over during this
+ * action will be invoked if they arrive on the system at a later time.
+ * Otherwise messages skipped over will be ignored
+ * @throws SandeshaException
+ */
+ public synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx,
+ String sequenceID,
+ boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
+ //first we block while we wait for the invoking thread to pause
+ blockForPause();
+ try{
+ //get all invoker beans for the sequence
+ StorageManager storageManager =
+ SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+
+ InvokerBeanMgr storageMapMgr = storageManager
+ .getStorageMapBeanMgr();
+ NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+ NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceID);
+
+ if (nextMsgBean != null) {
+
+ //The outOfOrder window is the set of known sequence messages (including those
+ //that are missing) at the time the button is pressed.
+ long firstMessageInOutOfOrderWindow = nextMsgBean.getNextMsgNoToProcess();
+
+ Iterator stMapIt =
+ storageMapMgr.find(new InvokerBean(null, 0, sequenceID)).iterator();
+
+ long highestMsgNumberInvoked = 0;
+ Transaction transaction = null;
+
+ //invoke each bean in turn.
+ //NOTE: here we are breaking ordering
+ while(stMapIt.hasNext()){
+ transaction = storageManager.getTransaction();
+ InvokerBean invoker = (InvokerBean)stMapIt.next();
+
+ //invoke the app
+ try{
+ // start a new worker thread and let it do the invocation.
+ String workId = sequenceID + "::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
+ //piece of work that will be assigned to the Worker.
+
+ String messageContextKey = invoker.getMessageContextRefKey();
+ InvokerWorker worker = new InvokerWorker(context,
+ messageContextKey,
+ true); //want to ignore the enxt msg number
+
+ worker.setLock(lock);
+ worker.setWorkId(workId);
+
+ //before we execute we need to set the
+
+ threadPool.execute(worker);
+
+ //adding the workId to the lock after assigning it to a thread makes sure
+ //that all the workIds in the Lock are handled by threads.
+ lock.addWork(workId);
+
+ long msgNumber = invoker.getMsgNo();
+ //if necessary, update the "next message number" bean under this transaction
+ if(msgNumber>highestMsgNumberInvoked){
+ highestMsgNumberInvoked = invoker.getMsgNo();
+ nextMsgBean.setNextMsgNoToProcess(highestMsgNumberInvoked+1);
+ nextMsgMgr.update(nextMsgBean);
+
+ if(allowLaterDeliveryOfMissingMessages){
+ //we also need to update the sequence OUT_OF_ORDER_RANGES property
+ //so as to include our latest view of this outOfOrder range.
+ //We do that here (rather than once at the end) so that we reamin
+ //transactionally consistent
+ Range r = new Range(firstMessageInOutOfOrderWindow,highestMsgNumberInvoked);
+
+ RangeString rangeString = null;
+ SequencePropertyBeanMgr seqPropertyManager = storageManager.getSequencePropertyBeanMgr();
+ SequencePropertyBean outOfOrderRanges =
+ seqPropertyManager.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);
+ if(outOfOrderRanges==null){
+ //insert a new blank one one
+ outOfOrderRanges = new SequencePropertyBean(sequenceID,
+ Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES,
+ "");
+
+ seqPropertyManager.insert(outOfOrderRanges);
+ rangeString = new RangeString("");
+ }
+ else{
+ rangeString = new RangeString(outOfOrderRanges.getValue());
+ }
+ //update the range String with the new value
+ rangeString.addRange(r);
+ outOfOrderRanges.setValue(rangeString.toString());
+ seqPropertyManager.update(outOfOrderRanges);
+ }
+ }
+
+ transaction.commit();
+ }
+ catch(Exception e){
+ transaction.rollback();
+ }
+
+ }//end while
+ }
+ }
+ finally{
+ //restart the invoker
+ finishPause();
+ }
+ }
public synchronized void stopInvoking() {
if (log.isDebugEnabled())
log.debug("Enter: InOrderInvoker::stopInvoking");
-
+ //NOTE: we do not take acount of pausing when stopping.
+ //The call to stop will wait until the invoker has exited the loop
if (isInvokerStarted()) {
// the invoker is started so stop it
runInvoker = false;
@@ -137,9 +299,9 @@
log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
log
.debug("Exit: InOrderInvoker::hasStoppedInvoking, "
- + hasStopped);
+ + hasStoppedInvoking);
}
- return hasStopped;
+ return hasStoppedInvoking;
}
public void run() {
@@ -152,7 +314,7 @@
// flag that we have exited the run loop and notify any waiting
// threads
synchronized (this) {
- hasStopped = true;
+ hasStoppedInvoking = true;
notify();
}
}
@@ -161,6 +323,42 @@
log.debug("Exit: InOrderInvoker::run");
}
+ private void addOutOfOrderInvokerBeansToList(String sequenceID,
+ StorageManager strMgr, List list)throws SandeshaException{
+ if (log.isDebugEnabled())
+ log.debug("Enter: InOrderInvoker::addOutOfOrderInvokerBeansToList");
+
+ SequencePropertyBeanMgr seqPropertyManager = strMgr.getSequencePropertyBeanMgr();
+
+ SequencePropertyBean outOfOrderRanges =
+ seqPropertyManager.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);
+ if(outOfOrderRanges!=null){
+ String sequenceRanges = outOfOrderRanges.getValue();
+ RangeString rangeString = new RangeString(sequenceRanges);
+ //we now have the set of ranges that can be delivered out of order.
+ //Look for any invokable message that lies in one of those ranges
+ Iterator invokerBeansIterator =
+ strMgr.getStorageMapBeanMgr().find(
+ new InvokerBean(null,
+ 0, //finds all invoker beans
+ sequenceID)).iterator();
+
+ while(invokerBeansIterator.hasNext()){
+ InvokerBean invokerBean = (InvokerBean)invokerBeansIterator.next();
+
+ if(rangeString.isMessageNumberInRanges(invokerBean.getMsgNo())){
+ //an invoker bean that has not been deleted and lies in an out
+ //or order range - we can add this to the list
+ list.add(invokerBean);
+ }
+ }
+
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: InOrderInvoker::addOutOfOrderInvokerBeansToList");
+ }
+
private void internalRun() {
if (log.isDebugEnabled())
log.debug("Enter: InOrderInvoker::internalRun");
@@ -177,6 +375,26 @@
log.debug("Invoker was Inturrepted....");
log.debug(ex.getMessage());
}
+
+ //see if we need to pause
+ synchronized(this){
+
+ while(pauseRequired){
+ if(!hasPausedInvoking){
+ //let the requester of this pause know we are now pausing
+ hasPausedInvoking = true;
+ notifyAll();
+ }
+ //now we pause
+ try{
+ wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
+ }catch(InterruptedException e){
+ //ignore
+ }
+ }//end while
+ //the request to pause has finished so we are no longer pausing
+ hasPausedInvoking = false;
+ }
Transaction transaction = null;
boolean rolebacked = false;
@@ -242,32 +460,43 @@
throw new SandeshaException(message);
}
- Iterator stMapIt = storageMapMgr.find(
- new InvokerBean(null, nextMsgno, sequenceId))
- .iterator();
-
+ List invokerBeans = storageMapMgr.find(
+ new InvokerBean(null, nextMsgno, sequenceId));
+
+ //add any msgs that belong to out of order windows
+ addOutOfOrderInvokerBeansToList(sequenceId,
+ storageManager, invokerBeans);
+
+ Iterator stMapIt = invokerBeans.iterator();
//TODO correct the locking mechanism to have one lock per sequence.
- if (stMapIt.hasNext()) { //the next Msg entry is present.
+ if (stMapIt.hasNext()) { //some invokation work is present
- String workId = sequenceId + "::" + nextMsgno; //creating a workId to uniquely identify the
+ InvokerBean bean = (InvokerBean) stMapIt.next();
+ //see if this is an out of order msg
+ boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
+
+ String workId = sequenceId + "::" + bean.getMsgNo();
+ //creating a workId to uniquely identify the
//piece of work that will be assigned to the Worker.
- //check weather the bean is already assigned to a worker.
+ //check whether the bean is already assigned to a worker.
if (lock.isWorkPresent(workId)) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
log.debug(message);
continue;
}
-
- InvokerBean bean = (InvokerBean) stMapIt.next();
+
String messageContextKey = bean.getMessageContextRefKey();
transaction.commit();
// start a new worker thread and let it do the invocation.
- InvokerWorker worker = new InvokerWorker(context,messageContextKey);
+ InvokerWorker worker = new InvokerWorker(context,
+ messageContextKey,
+ beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
+ //out of order message
worker.setLock(lock);
worker.setWorkId(workId);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=477693&r1=477692&r2=477693
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java Tue Nov 21 07:13:48 2006
@@ -25,12 +25,14 @@
ConfigurationContext configurationContext = null;
String messageContextKey;
+ boolean ignoreNextMsg = false;
Log log = LogFactory.getLog(InvokerWorker.class);
- public InvokerWorker (ConfigurationContext configurationContext, String messageContextKey) {
+ public InvokerWorker (ConfigurationContext configurationContext, String messageContextKey, boolean ignoreNextMsg) {
this.configurationContext = configurationContext;
this.messageContextKey = messageContextKey;
+ this.ignoreNextMsg = ignoreNextMsg;
}
public void run() {
@@ -122,12 +124,6 @@
if (msgCtx != null) {
storageManager.removeMessageContext(messageContextKey);
}
-
- // updating the next msg to invoke
-
- String s = invokerBean.getSequenceID();
- NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
-
if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = (Sequence) rmMsg
@@ -143,26 +139,30 @@
}
}
- long nextMsgNo = nextMsgBean.getNextMsgNoToProcess();
-
- if (!(messageNo==nextMsgNo)) {
- String message = "Operated message number is different from the Next Message Number to invoke";
- throw new SandeshaException (message);
- }
-
- if (invoked) {
- nextMsgNo++;
- nextMsgBean.setNextMsgNoToProcess(nextMsgNo);
- nextMsgMgr.update(nextMsgBean);
+ if(!ignoreNextMsg){
+ // updating the next msg to invoke
+
+ String s = invokerBean.getSequenceID();
+ NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+ long nextMsgNo = nextMsgBean.getNextMsgNoToProcess();
+
+ if (!(messageNo==nextMsgNo)) {
+ String message = "Operated message number is different from the Next Message Number to invoke";
+ throw new SandeshaException (message);
+ }
+
+ if (invoked) {
+ nextMsgNo++;
+ nextMsgBean.setNextMsgNoToProcess(nextMsgNo);
+ nextMsgMgr.update(nextMsgBean);
+ }
}
} catch (SandeshaStorageException e) {
transaction.rollback();
} catch (SandeshaException e) {
- e.printStackTrace(); //TODO remove
- log.error(e);
+ log.error(e.toString(), e);
} catch (Exception e) {
- e.printStackTrace();
- log.error(e);
+ log.error(e.toString(), e);
} finally {
if (transaction!=null && transaction.isActive())
transaction.commit();
Added: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java?view=auto&rev=477693
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java (added)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java Tue Nov 21 07:13:48 2006
@@ -0,0 +1,217 @@
+package org.apache.sandesha2.workers;
+
+import java.io.File;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.transport.http.SimpleHTTPServer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.SandeshaTestCase;
+import org.apache.sandesha2.client.SandeshaClient;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.Invoker;
+
+public class ForceInboundDispatchTest extends SandeshaTestCase {
+
+ SimpleHTTPServer httpServer = null;
+ private final String applicationNamespaceName = "http://tempuri.org/";
+ private final String ping = "ping";
+ private final String Text = "Text";
+
+ private Log log = LogFactory.getLog(getClass());
+ int serverPort = DEFAULT_SERVER_TEST_PORT;
+
+ ConfigurationContext serverConfigCtx = null;
+
+ public ForceInboundDispatchTest () {
+ super ("ForceDispatchTest");
+ }
+
+ public void setUp () throws AxisFault {
+
+ String repoPath = "target" + File.separator + "repos" + File.separator + "server";
+ String axis2_xml = "target" + File.separator + "repos" + File.separator + "server" + File.separator + "server_axis2.xml";
+
+ serverConfigCtx = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+ String serverPortStr = getTestProperty("test.server.port");
+ if (serverPortStr!=null) {
+ try {
+ serverPort = Integer.parseInt(serverPortStr);
+ } catch (NumberFormatException e) {
+ log.error(e);
+ }
+ }
+
+ httpServer = new SimpleHTTPServer (serverConfigCtx,serverPort);
+ httpServer.start();
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+ throw new SandeshaException ("sleep interupted");
+ }
+ }
+
+ public void tearDown () throws SandeshaException {
+ if (httpServer!=null)
+ httpServer.stop();
+
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+ throw new SandeshaException ("sleep interupted");
+ }
+ }
+
+ public void testForceInvoke () throws AxisFault,InterruptedException {
+
+ String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+ String transportTo = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+
+ String repoPath = "target" + File.separator + "repos" + File.separator + "client";
+ String axis2_xml = "target" + File.separator + "repos" + File.separator + "client" + File.separator + "client_axis2.xml";
+
+ ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+ Options clientOptions = new Options ();
+ clientOptions.setSoapVersionURI(SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+
+ clientOptions.setTo(new EndpointReference (to));
+ clientOptions.setProperty(MessageContextConstants.TRANSPORT_URL,transportTo);
+
+ String sequenceKey = "sequence1";
+ clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+ ServiceClient serviceClient = new ServiceClient (configContext,null);
+ serviceClient.setOptions(clientOptions);
+
+ try{
+ serviceClient.fireAndForget(getPingOMBlock("ping1"));
+
+ //now deliver the next out of order
+ clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(3));
+ serviceClient.fireAndForget(getPingOMBlock("ping3"));
+
+ Thread.sleep(5000);
+
+ String inboundSequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(SandeshaUtil.getInternalSequenceID(to, sequenceKey),
+ SandeshaUtil.getInMemoryStorageManager(configContext));
+
+ SandeshaClient.forceDispatchOfInboundMessages(serverConfigCtx,
+ inboundSequenceID,
+ true); //allow later msgs to be delivered
+
+ //check that the server is now expecting msg 4
+ StorageManager serverStore = SandeshaUtil.getInMemoryStorageManager(serverConfigCtx);
+ NextMsgBean nextMsgBean =
+ serverStore.getNextMsgBeanMgr().retrieve(inboundSequenceID);
+ assertNotNull(nextMsgBean);
+ assertEquals(nextMsgBean.getNextMsgNoToProcess(), 4);
+
+ //also check that the sequence has an out of order gap that contains msg 2
+ SequencePropertyBean outOfOrderRanges =
+ serverStore.getSequencePropertyBeanMgr().retrieve(
+ inboundSequenceID,
+ Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);
+
+ assertNotNull(outOfOrderRanges);
+ RangeString rangeString = new RangeString(outOfOrderRanges.getValue());
+ assertTrue(rangeString.isMessageNumberInRanges(2));
+
+ //we deliver msg 2
+ //set highest out msg number to 1
+ SequencePropertyBean nextMsgNoBean =
+ SandeshaUtil.getInMemoryStorageManager(configContext).getSequencePropertyBeanMgr().
+ retrieve(SandeshaUtil.getInternalSequenceID(to, sequenceKey),
+ Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
+ nextMsgNoBean.setValue("1");
+
+ clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(2));
+ serviceClient.fireAndForget(getPingOMBlock("ping2"));
+ }
+ finally{
+ configContext.getListenerManager().stop();
+ serviceClient.cleanup();
+ }
+
+ }
+
+ public void testForceInvokeWithDiscardGaps () throws AxisFault,InterruptedException {
+
+ String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+ String transportTo = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+
+ String repoPath = "target" + File.separator + "repos" + File.separator + "client";
+ String axis2_xml = "target" + File.separator + "repos" + File.separator + "client" + File.separator + "client_axis2.xml";
+
+ ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+ Options clientOptions = new Options ();
+ clientOptions.setSoapVersionURI(SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+
+ clientOptions.setTo(new EndpointReference (to));
+ clientOptions.setProperty(MessageContextConstants.TRANSPORT_URL,transportTo);
+
+ String sequenceKey = "sequence1";
+ clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+ ServiceClient serviceClient = new ServiceClient (configContext,null);
+ serviceClient.setOptions(clientOptions);
+ try
+ {
+ serviceClient.fireAndForget(getPingOMBlock("ping1"));
+
+ //now deliver the next out of order
+ clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(3));
+ serviceClient.fireAndForget(getPingOMBlock("ping3"));
+
+ Thread.sleep(5000);
+
+ String inboundSequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(SandeshaUtil.getInternalSequenceID(to, sequenceKey),
+ SandeshaUtil.getInMemoryStorageManager(configContext));
+
+ SandeshaClient.forceDispatchOfInboundMessages(serverConfigCtx, inboundSequenceID, false);
+
+ //check that the server is now expecting msg 4
+ NextMsgBean nextMsgBean =
+ SandeshaUtil.getInMemoryStorageManager(serverConfigCtx).getNextMsgBeanMgr().
+ retrieve(inboundSequenceID);
+ assertNotNull(nextMsgBean);
+ assertEquals(nextMsgBean.getNextMsgNoToProcess(), 4);
+ }
+ finally{
+ configContext.getListenerManager().stop();
+ serviceClient.cleanup();
+ }
+
+ }
+
+ private OMElement getPingOMBlock(String text) {
+ OMFactory fac = OMAbstractFactory.getOMFactory();
+ OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName,"ns1");
+ OMElement pingElem = fac.createOMElement(ping, namespace);
+ OMElement textElem = fac.createOMElement(Text, namespace);
+
+ textElem.setText(text);
+ pingElem.addChild(textElem);
+
+ return pingElem;
+ }
+
+}
Propchange: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org