You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2007/02/06 13:57:04 UTC

svn commit: r504115 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: msgprocessors/ polling/ storage/ storage/inmemory/ util/ workers/

Author: mlovett
Date: Tue Feb  6 04:57:01 2007
New Revision: 504115

URL: http://svn.apache.org/viewvc?view=rev&rev=504115
Log:
Make the PollingManager more similar to the other worker threads

Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Tue Feb  6 04:57:01 2007
@@ -34,7 +34,6 @@
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Tue Feb  6 04:57:01 2007
@@ -37,6 +37,7 @@
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.polling.PollingManager;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
@@ -214,9 +215,10 @@
 			RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
 			RMDBean rMDBean = rMDBeanMgr.retrieve(outSequenceId);
 			
-			if (rMDBean!=null && rMDBean.isPollingMode())
-				SandeshaUtil.shedulePollingRequest(rmsBean.getOfferedSequence(), configCtx);
-			
+			if (rMDBean!=null && rMDBean.isPollingMode()) {
+				PollingManager manager = storageManager.getPollingManager();
+				manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+			}
 		}
 
 		// We overwrite the previous client completed message ranges with the

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Tue Feb  6 04:57:01 2007
@@ -154,7 +154,7 @@
 					rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); 
 						
 					rMSBean.setToEPR(rmdBean.getToEPR());
-					rMSBean.setAcksToEPR(rmdBean.getAcksToEPR());
+					rMSBean.setAcksToEPR(rmdBean.getToEPR());  // The acks need to flow back into this endpoint
 					rMSBean.setReplyToEPR(rmdBean.getReplyToEPR());
 					rMSBean.setLastActivatedTime(System.currentTimeMillis());
 					rMSBean.setRMVersion(rmdBean.getRMVersion());
@@ -175,7 +175,22 @@
 					// Store the inbound token (if any) with the new sequence
 					rMSBean.setSecurityTokenData(rmdBean.getSecurityTokenData());
 					
+					// If this new sequence has anonymous acksTo, then we must poll for the acks
+					// If the inbound sequence is targetted at the WSRM anonymous URI, we need to start
+					// polling for this sequence.
+					String acksTo = rMSBean.getAcksToEPR();
+					EndpointReference reference = new EndpointReference(acksTo);
+					if ((acksTo == null || reference.hasAnonymousAddress()) &&
+						Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqRMMsg.getRMSpecVersion())) {
+						rMSBean.setPollingMode(true);
+					}
+
 					rmsBeanMgr.insert(rMSBean);
+					
+					if(rMSBean.isPollingMode()) {
+						SandeshaUtil.startPollingForTheSequence(context, rmdBean.getSequenceID(), false);
+					}
+					
 				} else {
 					// removing the accept part.
 					createSeqResPart.setAccept(null);
@@ -188,8 +203,22 @@
 			outMessage.setResponseWritten(true);
 	
 			rmdBean.setLastActivatedTime(System.currentTimeMillis());
+			
+			// If the inbound sequence is targetted at the anonymous URI, we need to start
+			// polling for this sequence.
+			EndpointReference toEPR = createSeqMsg.getTo();
+			if (toEPR.hasAnonymousAddress()) {
+				if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqRMMsg.getRMSpecVersion())) {
+					rmdBean.setPollingMode(true);
+				}
+			}
+			
 			storageManager.getRMDBeanMgr().update(rmdBean);
 	
+			if(rmdBean.isPollingMode()) {
+				SandeshaUtil.startPollingForTheSequence(context, rmdBean.getSequenceID(), false);
+			}
+
 			AxisEngine engine = new AxisEngine(context);
 			try{
 				engine.send(outMessage);				
@@ -204,27 +233,22 @@
 				return false;
 			}
 	
-			boolean anon = true;
-			if (rmdBean.getToEPR() != null) {
-				EndpointReference toEPR = new EndpointReference(rmdBean.getToEPR());
-				if (!toEPR.hasAnonymousAddress()) anon = false;
-			}
-			if(anon) {
+			EndpointReference replyTo = createSeqMsg.getReplyTo();
+			if(replyTo == null || replyTo.hasAnonymousAddress()) {
 				createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
 			} else {
-					createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+				createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
 			}
 			
 	//		SequencePropertyBean findBean = new SequencePropertyBean ();
 	//		findBean.setName (Sandesha2Constants.SequenceProperties.TERMINATE_ON_CREATE_SEQUENCE);
 	//		findBean.setValue(createSeqMsg.getTo().getAddress());
 			
-			String toAddress = createSeqMsg.getTo().getAddress();
 			//if toAddress is RMAnon we may need to terminate the request side sequence here.
-			if (toAddress!=null && SandeshaUtil.isWSRMAnonymous(toAddress)) {
+			if (toEPR.hasAnonymousAddress()) {
 	
 				RMSBean findBean = new RMSBean ();
-				findBean.setReplyToEPR(toAddress);
+				findBean.setReplyToEPR(toEPR.getAddress());
 				findBean.setTerminationPauserForCS(true);
 				
 				//TODO recheck

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Tue Feb  6 04:57:01 2007
@@ -133,10 +133,10 @@
 		// the create must include an offer (or this client cannot be identified). If the reply-to
 		// is the RM anon URI template then the offer is not required.
 		if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
-			String replyToAddress = rmsBean.getReplyToEPR();
-			if(SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
+			String acksTo = rmsBean.getAcksToEPR();
+			EndpointReference reference = new EndpointReference(acksTo);
+			if(acksTo == null || reference.hasAnonymousAddress()) {
 				rmsBean.setPollingMode(true);
-				SandeshaUtil.startPollingManager(configCtx);
 			}
 		}
 
@@ -181,11 +181,8 @@
 			// rmdBean for polling too, so that it still gets serviced after the outbound
 			// sequence terminates.
 			if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
-				String replyToAddress = rmsBean.getReplyToEPR();
-				EndpointReference ref = new EndpointReference(replyToAddress);
-				if(rmsBean.isPollingMode() && (replyToAddress == null || ref.hasAnonymousAddress())) {
+				if(rmsBean.isPollingMode()) {
 					rMDBean.setPollingMode(true);
-					SandeshaUtil.startPollingManager(configCtx);
 				}
 			}
 			
@@ -213,10 +210,18 @@
 			rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
 			
 			rmdBeanMgr.insert(rMDBean);
+			
+			if(rMDBean.isPollingMode()) {
+				SandeshaUtil.startPollingForTheSequence(configCtx, rMDBean.getSequenceID(), false);
+			}
 		}
 		
 		rmsBean.setLastActivatedTime(System.currentTimeMillis());
 		rmsBeanMgr.update(rmsBean);
+		
+		if(rmsBean.isPollingMode()) {
+			SandeshaUtil.startPollingForTheSequence(configCtx, rmsBean.getSequenceID(), true);
+		}
 
 		// Locate and update all of the messages for this sequence, now that we know
 		// the sequence id.

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java Tue Feb  6 04:57:01 2007
@@ -1,12 +1,14 @@
 package org.apache.sandesha2.msgprocessors;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.wsrm.MessagePending;
@@ -33,10 +35,10 @@
 			if (messagePending!=null) {
 				boolean pending = messagePending.isPending();
 				if (pending) {
-					PollingManager pollingManager = SandeshaUtil.getPollingManager(message.getConfigurationContext());
-					if (pollingManager!=null) {
-						pollingManager.shedulePollingRequest(sequenceId);
-					}
+					ConfigurationContext context = rmMsgContext.getConfigurationContext();
+					StorageManager storage = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+					PollingManager pollingManager = storage.getPollingManager();
+					pollingManager.schedulePollingRequest(sequenceId, false);
 				}
 			}
 		}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Tue Feb  6 04:57:01 2007
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.polling.PollingManager;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
@@ -67,16 +68,15 @@
 
 		msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,rmsBean.getInternalSequenceID());
 
-		ConfigurationContext configContext = msgContext.getConfigurationContext();
-
 		//shedulling a polling request for the response side.
-		
 		if (rmsBean.getOfferedSequence()!=null) {
 			RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
 			RMDBean rMDBean = rMDBeanMgr.retrieve(sequenceId);
 			
-			if (rMDBean!=null && rMDBean.isPollingMode())
-				SandeshaUtil.shedulePollingRequest(rmsBean.getOfferedSequence(), configContext);
+			if (rMDBean!=null && rMDBean.isPollingMode()) {
+				PollingManager manager = storageManager.getPollingManager();
+				manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+			}
 		}
 
 		TerminateManager.terminateSendingSide (rmsBean, msgContext.isServerSide(), storageManager);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Tue Feb  6 04:57:01 2007
@@ -17,12 +17,11 @@
 
 package org.apache.sandesha2.polling;
 
-import java.util.HashMap;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.LinkedList;
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,7 +29,6 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.SandeshaStorageException;
-import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
@@ -42,118 +40,116 @@
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SandeshaThread;
 
 /**
  * This class is responsible for sending MakeConnection requests. This is a seperate thread that
  * keeps running. Will do MakeConnection based on the request queue or randomly.
  */
-public class PollingManager extends Thread {
+public class PollingManager extends SandeshaThread {
 	private static final Log log = LogFactory.getLog(PollingManager.class);
 
-	private ConfigurationContext configurationContext = null;
-	private StorageManager storageManager = null;
-	private boolean poll = false;
-
 	// Variables used to help round-robin across the sequences that we can poll for 
-	private int rmsIndex = 0;
-	private int rmdIndex = 0;
+	private int nextIndex = 0;
 
 	/**
 	 * By adding an entry to this, the PollingManager will be asked to do a polling request on this sequence.
 	 */
-	private HashMap sheduledPollingRequests = null;
+	private LinkedList scheduledPollingRequests = new LinkedList();
 	
-	private final int POLLING_MANAGER_WAIT_TIME = 3000;
+	private static final int POLLING_MANAGER_WAIT_TIME = 3000;
 	
-	private void internalRun() {
-		while (isPoll()) {
-			Transaction t = null;
-			try {
-				t = storageManager.getTransaction();
-				pollRMDSide();
-				if(t != null) t.commit();
-				t = null;
-
-				t = storageManager.getTransaction();
-				pollRMSSide();
-				if(t != null) t.commit();
-				t = null;
-			} catch (Exception e) {
-				if(log.isDebugEnabled()) log.debug("Exception", e);
-				if(t != null) {
-					try {
-						t.rollback();
-					} catch(Exception e2) {
-						if(log.isDebugEnabled()) log.debug("Exception during rollback", e);
-					}
-				}
-			}
-			try {
-				Thread.sleep(POLLING_MANAGER_WAIT_TIME);
-			} catch (InterruptedException e) {
-				if(log.isDebugEnabled()) log.debug("Sleep was interrupted", e);
-			}
-		}
+	public PollingManager() {
+		super(POLLING_MANAGER_WAIT_TIME);
 	}
 	
-	public void run() {
+	protected boolean internalRun() {
+		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::internalRun");
+		Transaction t = null;
 		try {
-			internalRun();
-		} catch(Exception e) {
-			if(log.isDebugEnabled()) log.debug("PollingManager thread ending", e);
+			// If we have request scheduled, handle them first, and then pick
+			// pick a sequence using a round-robin approach.
+			SequenceEntry entry = null;
+			synchronized (this) {
+				if(!scheduledPollingRequests.isEmpty()) {
+					entry = (SequenceEntry) scheduledPollingRequests.removeFirst();
+				}
+			}
+			if(entry == null) {
+				ArrayList allSequencesList = getSequences();
+				int size = allSequencesList.size();
+				if(log.isDebugEnabled()) log.debug("Choosing one from " + size + " sequences");
+				if(nextIndex >= size) {
+					nextIndex = 0;
+					// We just looped over the set of sequences, so sleep before we try
+					// polling them again.
+					if (log.isDebugEnabled()) log.debug("Exit: PollingManager::internalRun, looped over all sequences, sleeping");
+					return true;
+				}
+	
+				entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+			}
+			if(log.isDebugEnabled()) log.debug("Chose sequence " + entry.getSequenceId());
+
+			t = storageManager.getTransaction();
+			if(entry.isRmSource()) {
+				pollRMSSide(entry);
+			} else {
+				pollRMDSide(entry);
+			}
+			if(t != null) t.commit();
+			t = null;
+
+		} catch (Exception e) {
+			if(log.isDebugEnabled()) log.debug("Exception", e);
+			if(t != null) {
+				try {
+					t.rollback();
+				} catch(Exception e2) {
+					if(log.isDebugEnabled()) log.debug("Exception during rollback", e);
+				}
+			}
 		}
+		
+		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::internalRun, not sleeping");
+		return false;
 	}
 	
-	private void pollRMSSide() throws AxisFault {
+	private void pollRMSSide(SequenceEntry entry) throws AxisFault {
 		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMSSide");
 		
 		RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
 		RMSBean findRMS = new RMSBean();
+		findRMS.setSequenceID(entry.getSequenceId());
 		findRMS.setPollingMode(true);
 		findRMS.setTerminated(false);
-		List results = rmsBeanManager.find(findRMS);
+		RMSBean beanToPoll = rmsBeanManager.findUnique(findRMS);
 		
-		int size = results.size();
-		log.debug("Choosing one from " + size + " RMS sequences");
-		if(rmsIndex >= size) {
-			rmsIndex = 0;
-			if (size == 0) {
-				if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide, nothing to poll");
-				return;
-			}
+		if(beanToPoll == null) {
+			// This sequence must have been terminated, or deleted
+			stopThreadForSequence(entry.getSequenceId(), true);
+		} else {
+			pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll);
 		}
-		RMSBean beanToPoll = (RMSBean) results.get(rmsIndex++);
-		pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll);
 
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide");
 	}
 
-	private void pollRMDSide() throws AxisFault {
+	private void pollRMDSide(SequenceEntry entry) throws AxisFault {
 		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMDSide");
-		//geting the sequences to be polled.
-		//if shedule contains any requests, do the earliest one.
-		//else pick one randomly.
 		RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
-		String sequenceId = getNextSheduleEntry ();
-
 		RMDBean findBean = new RMDBean();
 		findBean.setPollingMode(true);
 		findBean.setTerminated(false);
-		findBean.setSequenceID(sequenceId); // Note that this may be null
-		List results = nextMsgMgr.find(findBean);
-		
-		int size = results.size();
+		findBean.setSequenceID(entry.getSequenceId());
+		RMDBean nextMsgBean = nextMsgMgr.findUnique(findBean);
 		
-		log.debug("Choosing one from " + size + " RMD sequences");
-		if(rmdIndex >= size) {
-			rmdIndex = 0;
-			if (size == 0) {
-				if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMDSide, nothing to poll");
-				return;
-			}
+		if(nextMsgBean == null) {
+			// This sequence must have been terminated, or deleted
+			stopThreadForSequence(entry.getSequenceId(), false);
+		} else {
+			pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean);
 		}
-		RMDBean nextMsgBean = (RMDBean) results.get(rmdIndex++);
-		pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean);
 
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMDSide");
 	}
@@ -176,7 +172,7 @@
 			WSRMAnonReplyToURI = replyTo;
 		}
 		
-		MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,configurationContext);
+		MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context);
 		RMMsgContext referenceRMMessage = MsgInitializer.initializeMessage(referenceMessage);
 		RMMsgContext makeConnectionRMMessage = RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
 				rmBean, sequenceId, WSRMAnonReplyToURI, storageManager);
@@ -213,84 +209,18 @@
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollForSequence");
 	}
 	
-	private synchronized String getNextSheduleEntry () {
-		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::getNextSheduleEntry");
-		String sequenceId = null;
-		
-		if (sheduledPollingRequests.size()>0) {
-			sequenceId = (String) sheduledPollingRequests.keySet().iterator().next();
-			Integer sequencEntryCount = (Integer) sheduledPollingRequests.remove(sequenceId);
-			
-			Integer leftCount = new Integer (sequencEntryCount.intValue() -1 );
-			if (leftCount.intValue() > 0) 
-				sheduledPollingRequests.put(sequenceId, leftCount);
-		}
-		
-		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::getNextSheduleEntry, " + sequenceId);
-		return sequenceId;
-	}
-	
-	/**
-	 * Starts the PollingManager.
-	 * 
-	 * @param configurationContext
-	 * @throws SandeshaException
-	 */
-	public synchronized void start (ConfigurationContext configurationContext) throws SandeshaException {
-		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::start");
-
-		this.configurationContext = configurationContext;
-		this.sheduledPollingRequests = new HashMap ();
-		this.storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
-		setPoll(true);
-		super.start();
-		
-		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::start");
-	}
-	
-	/**
-	 * Asks the PollingManager to stop its work.
-	 *
-	 */
-	public synchronized void stopPolling () {
-		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::stopPolling");
-		setPoll(false);
-		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::stopPolling");
-	}
-	
-	public synchronized void setPoll (boolean poll) {
-		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::setPoll");
-		this.poll = poll;
-		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::setPoll");
-	}
-	
-	public synchronized boolean isPoll () {
-		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::isPoll");
-		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::isPoll");
-		return poll;
-	}
-	
-	public void start () {
-		throw new UnsupportedOperationException ("You must use the oveerloaded start method");
-	}
-	
 	/**
 	 * Asking the polling manager to do a polling request on the sequence identified by the
 	 * given InternalSequenceId.
 	 * 
 	 * @param sequenceId
 	 */
-	public synchronized void shedulePollingRequest (String sequenceId) {
+	public synchronized void schedulePollingRequest(String sequenceId, boolean rmSource) {
 		if(log.isDebugEnabled()) log.debug("Enter: PollingManager::shedulePollingRequest, " + sequenceId);
 		
-		if (sheduledPollingRequests.containsKey (sequenceId)) {
-			Integer sequenceEntryCount = (Integer) sheduledPollingRequests.get(sequenceId);
-			Integer newCount = new Integer (sequenceEntryCount.intValue()+1);
-			sheduledPollingRequests.put(sequenceId,newCount);
-		} else {
-			Integer sequenceEntryCount = new Integer (1);
-			sheduledPollingRequests.put(sequenceId, sequenceEntryCount);
-		}
+		SequenceEntry entry = new SequenceEntry(sequenceId, rmSource);
+		scheduledPollingRequests.add(entry);
+		this.wakeThread();
 		
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::shedulePollingRequest");
 	}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java Tue Feb  6 04:57:01 2007
@@ -20,6 +20,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.AxisModule;
+import org.apache.sandesha2.polling.PollingManager;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
@@ -52,6 +53,7 @@
 		//shutdown the running threads
 		getSender().stopRunning();
 		getInvoker().stopRunning();
+		getPollingManager().stopRunning();
 	}
 	
 	public abstract void initStorage (AxisModule moduleDesc) throws SandeshaStorageException;
@@ -61,6 +63,8 @@
 	public abstract SandeshaThread getSender();
 	
 	public abstract SandeshaThread getInvoker();
+	
+	public abstract PollingManager getPollingManager();
 
 	public abstract RMSBeanMgr getRMSBeanMgr();
 

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Tue Feb  6 04:57:01 2007
@@ -38,6 +38,7 @@
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.polling.PollingManager;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
@@ -63,6 +64,7 @@
     private InvokerBeanMgr invokerBeanMgr = null;
     private Sender sender = null;
     private Invoker invoker = null;
+    private PollingManager pollingManager = null;
     private HashMap transactions = new HashMap();
     private boolean useSerialization = false;
     
@@ -80,7 +82,7 @@
 		this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, context);
 		this.sender = new Sender();
 		this.invoker = new Invoker();
-		
+		this.pollingManager = new PollingManager();
 	}
 
 	public Transaction getTransaction() {
@@ -128,14 +130,21 @@
 	 */
 	public SandeshaThread getInvoker() {
 	  return invoker;
-  }
+	}
 
 	/** 
 	 * Gets the Sender for this Storage manager
 	 */
 	public SandeshaThread getSender() {
 	  return sender;
-  }
+	}
+	
+	/** 
+	 * Gets the PollingManager for this Storage manager
+	 */
+	public PollingManager getPollingManager() {
+		return pollingManager;
+	}
 
 	void enlistBean(RMBean bean) throws SandeshaStorageException {
 		InMemoryTransaction t = null;

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Tue Feb  6 04:57:01 2007
@@ -83,12 +83,13 @@
 						set.add(this);
 						while(other != null) {
 							if(set.contains(other)) {
+								String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, this.toString(), bean.toString());
+								SandeshaStorageException e = new SandeshaStorageException(message);
+								
 								// Do our best to get out of the way of the other work in the system
 								waitingForTran = null;
 								releaseLocks();
-
-								String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, this.toString(), bean.toString());
-								SandeshaStorageException e = new SandeshaStorageException(message);
+								
 								if(log.isDebugEnabled()) log.debug(message, e);
 								throw e;
 							}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Tue Feb  6 04:57:01 2007
@@ -71,7 +71,6 @@
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
-import org.apache.sandesha2.polling.PollingManager;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
@@ -148,7 +147,7 @@
 			log.debug("Enter: SandeshaUtil::startSenderForTheSequence , context " + context + ", sequenceID " + sequenceID);
 		
 		SandeshaThread sender = getSandeshaStorageManager(context, context.getAxisConfiguration()).getSender();		
-		sender.runThreadForSequence(context, sequenceID);
+		sender.runThreadForSequence(context, sequenceID, true);
 		
 		if (log.isDebugEnabled())
 			log.debug("Exit: SandeshaUtil::startSenderForTheSequence");
@@ -159,13 +158,16 @@
 			log.debug("Enter: SandeshaUtil::startInvokerForTheSequence , context " + context + ", sequenceID " + sequenceID);
 		
 		SandeshaThread invoker = getSandeshaStorageManager(context, context.getAxisConfiguration()).getInvoker();
-		invoker.runThreadForSequence(context,sequenceID);
+		invoker.runThreadForSequence(context, sequenceID, false);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: SandeshaUtil::startInvokerForTheSequence");			
 	}
 
-	public static void startPollingManager (ConfigurationContext configurationContext) throws SandeshaException {
+	public static void startPollingForTheSequence(ConfigurationContext configurationContext, String sequenceID, boolean rmSource) throws SandeshaException {
+		if (log.isDebugEnabled())
+			log.debug("Enter: SandeshaUtil::startPollingForTheSequence , context " + configurationContext + ", sequenceID " + sequenceID + ", rmSource");
+
 		// Only start the polling manager if we are configured to use MakeConnection
 		SandeshaPolicyBean policy = getPropertyBean(configurationContext.getAxisConfiguration());
 		if(!policy.isEnableMakeConnection()) {
@@ -173,22 +175,11 @@
 			throw new SandeshaException(message);
 		}
 		
-		PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
-				Sandesha2Constants.POLLING_MANAGER);
-		
-		//assums that if somebody hs set the PollingManager, he must hv already started it.
-		if (pollingManager==null) {
-			pollingManager = new PollingManager ();
-			configurationContext.setProperty(Sandesha2Constants.POLLING_MANAGER,pollingManager);
-			pollingManager.start(configurationContext);
-		}
-	}
-	
-	public static void stopPollingManager (ConfigurationContext configurationContext) {
-		PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
-				Sandesha2Constants.POLLING_MANAGER);
-		if (pollingManager!=null) 
-			pollingManager.stopPolling ();
+		SandeshaThread polling = getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration()).getPollingManager();
+		polling.runThreadForSequence(configurationContext, sequenceID, rmSource);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: SandeshaUtil::startPollingForTheSequence");			
 	}
 	
 	public static String getMessageTypeString(int messageType) {
@@ -1079,18 +1070,6 @@
 		
 		return newMsg;
 		
-	}
-  
-	public static PollingManager getPollingManager (ConfigurationContext configurationContext) {
-		PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
-				Sandesha2Constants.POLLING_MANAGER);
-		
-		return pollingManager;
-	}
-	
-	public static void shedulePollingRequest (String sequenceId, ConfigurationContext configurationContext) { 
-		PollingManager pollingManager = getPollingManager(configurationContext);
-		pollingManager.shedulePollingRequest(sequenceId);
 	}
 
 	public static EndpointReference cloneEPR (EndpointReference epr) {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java Tue Feb  6 04:57:01 2007
@@ -47,7 +47,12 @@
 public class Invoker extends SandeshaThread {
 
 	private static final Log log = LogFactory.getLog(Invoker.class);
-	
+
+	// If this invoker is working for several sequences, we use round-robin to
+	// try and give them all a chance to invoke messages.
+	int nextIndex = 0;
+	boolean processedMessage = false;
+
 	public Invoker() {
 		super(Sandesha2Constants.INVOKER_SLEEP_TIME);
 	}
@@ -69,9 +74,6 @@
 		blockForPause();
 		try{
 			//get all invoker beans for the sequence
-			StorageManager storageManager = 
-				SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
-	
 			InvokerBeanMgr storageMapMgr = storageManager
 					.getInvokerBeanMgr();
 			RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
@@ -201,185 +203,161 @@
 			log.debug("Exit: InOrderInvoker::addOutOfOrderInvokerBeansToList");
 	}
 	
-	protected void internalRun() {
-		if (log.isDebugEnabled())
-			log.debug("Enter: InOrderInvoker::internalRun");
+	protected boolean internalRun() {
+		if (log.isDebugEnabled()) log.debug("Enter: Invoker::internalRun");
 		
-		// If this invoker is working for several sequences, we use round-robin to
-		// try and give them all a chance to invoke messages.
-		int nextIndex = 0;
 		boolean sleep = false;
-		boolean processedMessage = false;
+		Transaction transaction = null;
+
+		try {
+			RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
 
-		while (isThreadStarted()) {
+			InvokerBeanMgr storageMapMgr = storageManager
+					.getInvokerBeanMgr();
 
-			try {
-				if(sleep && !runMainLoop()) Thread.sleep(Sandesha2Constants.INVOKER_SLEEP_TIME);
-				if (!isThreadStarted())
-					continue;
-				// Indicate that we are running the main loop
-				setRanMainLoop();
-			} catch (InterruptedException ex) {
-				log.debug("Invoker was Interrupted.", ex);
-			} finally {
-				sleep = false;
+			transaction = storageManager.getTransaction();
+			
+			// Pick a sequence using a round-robin approach
+			ArrayList allSequencesList = getSequences();
+			int size = allSequencesList.size();
+			log.debug("Choosing one from " + size + " sequences");
+			if(nextIndex >= size) {
+				nextIndex = 0;
+
+				// We just looped over the set of sequences. If we didn't process any
+				// messages on this loop then we sleep before the next one
+				if(size == 0 || !processedMessage) {
+					sleep = true;
+				}
+				processedMessage = false;
+				
+				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, looped over all sequences, sleep " + sleep);
+				return sleep;
 			}
 
-			//pause if we have to
-			doPauseIfNeeded();
+			SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+			String sequenceId = entry.getSequenceId();
+			log.debug("Chose sequence " + sequenceId);
+
+			RMDBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+			if (nextMsgBean == null) {
+				log.debug("Next message not set correctly. Removing invalid entry.");
+
+				stopThreadForSequence(sequenceId, entry.isRmSource());
+				allSequencesList = getSequences();
+				if (allSequencesList.size() == 0)
+					sleep = true;
 
-			Transaction transaction = null;
+				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, sleep " + sleep);
+				return sleep;
+			}
 
-			try {
-				StorageManager storageManager = SandeshaUtil
-						.getSandeshaStorageManager(context, context
-								.getAxisConfiguration());
-				RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
-
-				InvokerBeanMgr storageMapMgr = storageManager
-						.getInvokerBeanMgr();
-
-				transaction = storageManager.getTransaction();
-				
-				// Pick a sequence using a round-robin approach
-				ArrayList allSequencesList = getSequences();
-				int size = allSequencesList.size();
-				log.debug("Choosing one from " + size + " sequences");
-				if(nextIndex >= size) {
-					nextIndex = 0;
-
-					// We just looped over the set of sequences. If we didn't process any
-					// messages on this loop then we sleep before the next one
-					if(size == 0 || !processedMessage) {
-						sleep = true;
-					}
-					processedMessage = false;
-					continue;
-				}
-				String sequenceId = (String) allSequencesList.get(nextIndex++);
-				log.debug("Chose sequence " + sequenceId);
+			long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
+			if (nextMsgno <= 0) {
+				// Make sure we sleep on the next loop, so that we don't spin in a tight loop
+				sleep = true;
+				if (log.isDebugEnabled())
+					log.debug("Invalid Next Message Number " + nextMsgno);
+				String message = SandeshaMessageHelper.getMessage(
+						SandeshaMessageKeys.invalidMsgNumber, Long
+								.toString(nextMsgno));
+				throw new SandeshaException(message);
+			}
 
-				RMDBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
-				if (nextMsgBean == null) {
-					String message = "Next message not set correctly. Removing invalid entry.";
-					log.debug(message);
-	
-					stopThreadForSequence(sequenceId);
-					allSequencesList = getSequences();
-					if (allSequencesList.size() == 0)
-						sleep = true;
-					continue;
-				}
+			InvokerBean selector = new InvokerBean();
+			selector.setSequenceID(sequenceId);
+			selector.setMsgNo(nextMsgno);
+			List invokerBeans = storageMapMgr.find(selector);
+			
+			//add any msgs that belong to out of order windows
+			addOutOfOrderInvokerBeansToList(sequenceId, 
+					storageManager, invokerBeans);
+			
+			// If there aren't any beans to process then move on to the next sequence
+			if (invokerBeans.size() == 0) {
+				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep " + sleep);
+				return sleep;
+			}
+			
+			Iterator stMapIt = invokerBeans.iterator();
 
-				long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
-				if (nextMsgno <= 0) {
-					// Make sure we sleep on the next loop, so that we don't spin in a tight loop
+			//TODO correct the locking mechanism to have one lock per sequence.
+			//TODO should this be a while, not an if?
+			if (stMapIt.hasNext()) { //some invokation work is present
+				
+				InvokerBean bean = (InvokerBean) stMapIt.next();
+				//see if this is an out of order msg
+				boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
+				
+				String workId = sequenceId + "::" + bean.getMsgNo(); 
+																	//creating a workId to uniquely identify the
+															   //piece of work that will be assigned to the Worker.
+									
+				//check whether the bean is already assigned to a worker.
+				if (getWorkerLock().isWorkPresent(workId)) {
+					// As there is already a worker assigned we are probably dispatching
+					// messages too quickly, so we sleep before trying the next sequence.
 					sleep = true;
-					if (log.isDebugEnabled())
-						log.debug("Invalid Next Message Number " + nextMsgno);
-					String message = SandeshaMessageHelper.getMessage(
-							SandeshaMessageKeys.invalidMsgNumber, Long
-									.toString(nextMsgno));
-					throw new SandeshaException(message);
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
+					if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
+					return sleep;
 				}
 
-				InvokerBean selector = new InvokerBean();
-				selector.setSequenceID(sequenceId);
-				selector.setMsgNo(nextMsgno);
-				List invokerBeans = storageMapMgr.find(selector);
-				
-				//add any msgs that belong to out of order windows
-				addOutOfOrderInvokerBeansToList(sequenceId, 
-						storageManager, invokerBeans);
-				
-				// If there aren't any beans to process then move on to the next sequence
-				if (invokerBeans.size() == 0) {
-					if (log.isDebugEnabled())
-						log.debug("No beans to invoke on sequence " + sequenceId);
-					continue;
-				}
+				String messageContextKey = bean.getMessageContextRefKey();
 				
-				Iterator stMapIt = invokerBeans.iterator();
+				if(transaction != null) {
+					transaction.commit();
+					transaction = null;
+				}
 
-				//TODO correct the locking mechanism to have one lock per sequence.
+				// start a new worker thread and let it do the invocation.
+				InvokerWorker worker = new InvokerWorker(context,
+						messageContextKey, 
+						beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
+																	//out of order message
 				
-				if (stMapIt.hasNext()) { //some invokation work is present
-					if (!isThreadStarted())
-						continue;
-					
-					InvokerBean bean = (InvokerBean) stMapIt.next();
-					//see if this is an out of order msg
-					boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
-					
-					String workId = sequenceId + "::" + bean.getMsgNo(); 
-																		//creating a workId to uniquely identify the
-																   //piece of work that will be assigned to the Worker.
-										
-					//check whether the bean is already assigned to a worker.
-					if (getWorkerLock().isWorkPresent(workId)) {
-						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
-						log.debug(message);
-						
-						// As there is already a worker assigned we are probably dispatching
-						// messages too quickly, so we sleep before trying the next sequence.
-						sleep = true;
-						continue;
-					}
-
-					String messageContextKey = bean.getMessageContextRefKey();
-					
-					if(transaction != null) {
-						transaction.commit();
-						transaction = null;
-					}
-
-					// start a new worker thread and let it do the invocation.
-					InvokerWorker worker = new InvokerWorker(context,
-							messageContextKey, 
-							beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
-																		//out of order message
-					
-					worker.setLock(getWorkerLock());
-					worker.setWorkId(workId);
-					
-					threadPool.execute(worker);
-					
-					//adding the workId to the lock after assigning it to a thread makes sure 
-					//that all the workIds in the Lock are handled by threads.
-					getWorkerLock().addWork(workId);
-					
-					processedMessage = true;
-				}
-			} catch (Exception e) {
-				if (transaction != null) {
-					try {
-						transaction.rollback();
-						transaction = null;
-					} catch (Exception e1) {
-						String message = SandeshaMessageHelper.getMessage(
-								SandeshaMessageKeys.rollbackError, e1
-										.toString());
-						log.debug(message, e1);
-					}
+				worker.setLock(getWorkerLock());
+				worker.setWorkId(workId);
+				
+				threadPool.execute(worker);
+				
+				//adding the workId to the lock after assigning it to a thread makes sure 
+				//that all the workIds in the Lock are handled by threads.
+				getWorkerLock().addWork(workId);
+				
+				processedMessage = true;
+			}
+		} catch (Exception e) {
+			if (transaction != null) {
+				try {
+					transaction.rollback();
+					transaction = null;
+				} catch (Exception e1) {
+					String message = SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.rollbackError, e1
+									.toString());
+					log.debug(message, e1);
 				}
-				String message = SandeshaMessageHelper
-						.getMessage(SandeshaMessageKeys.invokeMsgError);
-				log.debug(message, e);
-			} finally {
-				if (transaction != null) {
-					try {
-						transaction.commit();
-						transaction = null;
-					} catch (Exception e) {
-						String message = SandeshaMessageHelper.getMessage(
-								SandeshaMessageKeys.commitError, e.toString());
-						log.debug(message, e);
-					}
+			}
+			String message = SandeshaMessageHelper
+					.getMessage(SandeshaMessageKeys.invokeMsgError);
+			log.debug(message, e);
+		} finally {
+			if (transaction != null) {
+				try {
+					transaction.commit();
+					transaction = null;
+				} catch (Exception e) {
+					String message = SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.commitError, e.toString());
+					log.debug(message, e);
 				}
 			}
 		}
+
 		if (log.isDebugEnabled())
 			log.debug("Exit: InOrderInvoker::internalRun");
+		return sleep;
 	}
 
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java Tue Feb  6 04:57:01 2007
@@ -22,8 +22,11 @@
 import org.apache.axis2.util.threadpool.ThreadFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
 
 /**
  * Aggregates pause and stop logic between sender and invoker threads.
@@ -44,6 +47,7 @@
 	
 	protected transient ThreadFactory threadPool;
 	protected ConfigurationContext context = null;
+	protected StorageManager storageManager = null;
 	private boolean reRunThread;
 
 	public SandeshaThread(int sleepTime) {
@@ -55,7 +59,7 @@
 		return lock;
 	}
 	
-	public synchronized void stopThreadForSequence(String sequenceID){
+	public synchronized void stopThreadForSequence(String sequenceID, boolean rmSource){
 		if (log.isDebugEnabled())
 			log.debug("Enter: SandeshaThread::stopThreadForSequence, " + sequenceID);
 		
@@ -63,7 +67,7 @@
 		// to sleep when there is no work to do. If we were to exit the thread then
 		// we wouldn't be able to start back up when the thread gets some more work
 		// to do.
-		workingSequences.remove(sequenceID);
+		workingSequences.remove(new SequenceEntry(sequenceID, rmSource));
 		
 		if (log.isDebugEnabled())
 			log.debug("Exit: SandeshaThread::stopThreadForSequence");		
@@ -138,15 +142,17 @@
 	}
 	
 
-	public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID){
+	public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID, boolean rmSource){
 		if(log.isDebugEnabled()) log.debug("Entry: SandeshaThread::runThreadForSequence, " + this);
 
-		if (!workingSequences.contains(sequenceID))	workingSequences.add(sequenceID);
+		SequenceEntry entry = new SequenceEntry(sequenceID, rmSource);
+		if (!workingSequences.contains(entry)) workingSequences.add(entry);
 		
 		if (!isThreadStarted()) {
 			if(log.isDebugEnabled()) log.debug("Starting thread");
 
 			this.context = context;
+			
 			// Get the axis2 thread pool
 			threadPool = context.getThreadPool();
 			
@@ -164,6 +170,10 @@
 		if(log.isDebugEnabled()) log.debug("Exit: SandeshaThread::runThreadForSequence");
 	}
 	
+	/**
+	 * 
+	 * @return a List of SequenceEntry instances
+	 */
 	public synchronized ArrayList getSequences() {
 		return workingSequences;
 	}
@@ -223,13 +233,54 @@
 	}
 	
 	/**
-	 * The main work loop, to be implemented by any child class.
+	 * The main work loop, to be implemented by any child class. If the child wants
+	 * to sleep before the next loop then they should return true.
 	 */
-	protected abstract void internalRun();
+	protected abstract boolean internalRun();
 	
 	public void run() {
 		try {
-			internalRun();
+			boolean sleep = false;
+
+			while (isThreadStarted()) {
+				try {
+					synchronized (this) {		
+						if(sleep && !runMainLoop()) wait(sleepTime);
+						// Indicate that we are running the main loop
+						setRanMainLoop();
+					}
+				} catch (InterruptedException e1) {
+					log.debug("Sender was interupted...");
+					log.debug(e1.getMessage());
+					log.debug("End printing Interrupt...");
+				}
+
+				//pause if we have to
+				doPauseIfNeeded();
+
+				// Ensure we have context and a storage manager
+				if (context == null) {
+					String message = SandeshaMessageHelper
+							.getMessage(SandeshaMessageKeys.configContextNotSet);
+					message = SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.cannotCointinueSender, message);
+					log.debug(message);
+					throw new RuntimeException(message);
+				}
+
+				if(storageManager == null) {
+					try {
+						storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+					} catch (SandeshaException e2) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotCointinueSender, e2.toString());
+						log.debug(message);
+						throw new RuntimeException(message);
+					}
+				}
+
+				// Call into the real function
+				sleep = internalRun();
+			}
 		} finally {
 			// flag that we have exited the run loop and notify any waiting
 			// threads
@@ -238,6 +289,44 @@
 				hasStoppedRunning = true;
 				notify();
 			}
+		}
+	}
+	
+	protected class SequenceEntry {
+		private String  sequenceId;
+		private boolean rmSource;
+		
+		public SequenceEntry(String sequenceId, boolean rmSource) {
+			this.sequenceId = sequenceId;
+			this.rmSource = rmSource;
+		}
+		public boolean isRmSource() {
+			return rmSource;
+		}
+		public String getSequenceId() {
+			return sequenceId;
+		}
+
+
+		public boolean equals(Object o) {
+			if(o == null) return false;
+			if(o == this) return true;
+			if(o.getClass() != getClass()) return false;
+			
+			SequenceEntry other = (SequenceEntry) o;
+			if(sequenceId != null) {
+				if(!sequenceId.equals(other.sequenceId)) return false;
+			} else {
+				if(other.sequenceId != null) return false;
+			}
+			
+			return rmSource == other.rmSource;
+		}
+		public int hashCode() {
+			int result = 1;
+			if(sequenceId != null) result = sequenceId.hashCode();
+			if(rmSource) result = -result;
+			return result;
 		}
 	}
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Tue Feb  6 04:57:01 2007
@@ -20,14 +20,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.SandeshaUtil;
 
 /**
  * This is responsible for sending and re-sending messages of Sandesha2. This
@@ -39,152 +36,104 @@
 
 	private static final Log log = LogFactory.getLog(Sender.class);
 	    
-  public Sender () {
-  	super(Sandesha2Constants.SENDER_SLEEP_TIME);
-  }	
-
-	protected void internalRun() {
-		if (log.isDebugEnabled())
-			log.debug("Enter: Sender::internalRun");
-
-		if (context == null) {
-			String message = SandeshaMessageHelper
-					.getMessage(SandeshaMessageKeys.configContextNotSet);
-			message = SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.cannotCointinueSender, message);
-			log.debug(message);
-			throw new RuntimeException(message);
-		}
+	public Sender () {
+		super(Sandesha2Constants.SENDER_SLEEP_TIME);
+	}
 
-		StorageManager storageManager = null;
-		boolean sleep = false;
+	protected boolean internalRun() {
+		if (log.isDebugEnabled()) log.debug("Enter: Sender::internalRun");
 
-		try {
-			storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
-		} catch (SandeshaException e2) {
-			// TODO Auto-generated catch block
-			log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotCointinueSender, e2.toString()), e2);
-			e2.printStackTrace();
-			return;
-		}
+		Transaction transaction = null;
 
-		while (isThreadStarted()) {
+		try {
+			transaction = storageManager.getTransaction();
 
-			try {
-				synchronized (this) {		
-					if(sleep && !runMainLoop()) wait(Sandesha2Constants.SENDER_SLEEP_TIME);
-					// Indicate that we are running the main loop
-					setRanMainLoop();
+			SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
+			SenderBean senderBean = mgr.getNextMsgToSend();
+			
+			if (senderBean == null) {
+				// As there was no work to do, we sleep for a while on the next loop.
+				if (log.isDebugEnabled()) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
+					log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
 				}
-			} catch (InterruptedException e1) {
-				log.debug("Sender was interupted...");
-				log.debug(e1.getMessage());
-				log.debug("End printing Interrupt...");
-			} finally {
-				sleep = false;
+				return true;
 			}
 
-			//pause if we have to
-			doPauseIfNeeded();
-
-			Transaction transaction = null;
-
-			try {
-				
-				transaction = storageManager.getTransaction();
-
-				SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
-				SenderBean senderBean = mgr.getNextMsgToSend();
-				
-				if (senderBean == null) {
-					if (log.isDebugEnabled()) {
-						String message = SandeshaMessageHelper
-								.getMessage(SandeshaMessageKeys.senderBeanNotFound);
-						log.debug(message);
-					}
-					
-					// As there was no work to do, we sleep for a while on the next loop.
-					sleep = true;
-					continue;
+			// work Id is used to define the piece of work that will be
+			// assigned to the Worker thread,
+			// to handle this Sender bean.
+			
+			//workId contains a timeTiSend part to cater for retransmissions.
+			//This will cause retransmissions to be treated as new work.
+			String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
+
+			// check weather the bean is already assigned to a worker.
+			if (getWorkerLock().isWorkPresent(workId)) {
+				// As there is already a worker running we are probably looping
+				// too fast, so sleep on the next loop.
+				if (log.isDebugEnabled()) {
+					String message = SandeshaMessageHelper.getMessage(
+									SandeshaMessageKeys.workAlreadyAssigned,
+									workId);
+					log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
 				}
+				return true;
+			}
 
-				// work Id is used to define the piece of work that will be
-				// assigned to the Worker thread,
-				// to handle this Sender bean.
-				
-				//workId contains a timeTiSend part to cater for retransmissions.
-				//This will cause retransmissions to be treated as new work.
-				String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
-
-				// check weather the bean is already assigned to a worker.
-				if (getWorkerLock().isWorkPresent(workId)) {
-					if (log.isDebugEnabled()) {
-						String message = SandeshaMessageHelper
-								.getMessage(
-										SandeshaMessageKeys.workAlreadyAssigned,
-										workId);
-						log.debug(message);
-					}
-					// As there is already a worker running we are probably looping
-					// too fast, so sleep on the next loop.
-					sleep = true;
-					continue;
-				}
+			if(transaction != null) {
+				transaction.commit();
+				transaction = null;
+			}
 
-				if(transaction != null) {
-					transaction.commit();
+			// start a worker which will work on this messages.
+			SenderWorker worker = new SenderWorker(context, senderBean);
+			worker.setLock(getWorkerLock());
+			worker.setWorkId(workId);
+			threadPool.execute(worker);
+
+			// adding the workId to the lock after assigning it to a thread
+			// makes sure
+			// that all the workIds in the Lock are handled by threads.
+			getWorkerLock().addWork(workId);
+
+		} catch (Exception e) {
+
+			// TODO : when this is the client side throw the exception to
+			// the client when necessary.
+
+			
+			//TODO rollback only if a SandeshaStorageException.
+			//This allows the other Exceptions to be used within the Normal flow.
+			
+			if (transaction != null) {
+				try {
+					transaction.rollback();
 					transaction = null;
+				} catch (Exception e1) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
+							.toString());
+					log.debug(message, e1);
 				}
+			}
 
-				// start a worker which will work on this messages.
-				SenderWorker worker = new SenderWorker(context, senderBean);
-				worker.setLock(getWorkerLock());
-				worker.setWorkId(workId);
-				threadPool.execute(worker);
-
-				// adding the workId to the lock after assigning it to a thread
-				// makes sure
-				// that all the workIds in the Lock are handled by threads.
-				getWorkerLock().addWork(workId);
-
-			} catch (Exception e) {
-
-				// TODO : when this is the client side throw the exception to
-				// the client when necessary.
-
-				
-				//TODO rollback only if a SandeshaStorageException.
-				//This allows the other Exceptions to be used within the Normal flow.
-				
-				if (transaction != null) {
-					try {
-						transaction.rollback();
-						transaction = null;
-					} catch (Exception e1) {
-						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
-								.toString());
-						log.debug(message, e1);
-					}
-				}
-
-				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
 
-				log.debug(message, e);
-			} finally {
-				if (transaction != null) {
-					try {
-						transaction.commit();
-						transaction = null;
-					} catch (Exception e) {
-						String message = SandeshaMessageHelper
-								.getMessage(SandeshaMessageKeys.commitError, e.toString());
-						log.debug(message, e);
-					}
+			log.debug(message, e);
+		} finally {
+			if (transaction != null) {
+				try {
+					transaction.commit();
+					transaction = null;
+				} catch (Exception e) {
+					String message = SandeshaMessageHelper
+							.getMessage(SandeshaMessageKeys.commitError, e.toString());
+					log.debug(message, e);
 				}
 			}
 		}
-		if (log.isDebugEnabled())
-			log.debug("Exit: Sender::internalRun");
+		if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, not sleeping");
+		return false;
 	}
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org