You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2007/11/06 10:05:29 UTC

svn commit: r592342 - in /webservices/sandesha/trunk/java/modules/core/src/main: java/org/apache/sandesha2/client/ java/org/apache/sandesha2/handlers/ java/org/apache/sandesha2/i18n/ java/org/apache/sandesha2/msgprocessors/ java/org/apache/sandesha2/st...

Author: gatfora
Date: Tue Nov  6 01:05:28 2007
New Revision: 592342

URL: http://svn.apache.org/viewvc?rev=592342&view=rev
Log:
As described in SANDESHA2-108 this change avoids the Inorder thread switch if the message being received is the next message to be invoked.  This change requires Axis2 revision 592132

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
    webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java Tue Nov  6 01:05:28 2007
@@ -942,16 +942,8 @@
 			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration());
 			reportTransaction = storageManager.getTransaction();
 
-			//only do this if we are running inOrder
-			if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
-				Invoker invoker = (Invoker)SandeshaUtil.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration()).getInvoker();
-				if (invoker==null){
-					throw new SandeshaException(SandeshaMessageHelper.getMessage(
-						SandeshaMessageKeys.invokerNotFound, sequenceID));
-				}
-				
-				invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, allowLaterDeliveryOfMissingMessages);			
-			}
+			// There will only be messages waiting if we are running in-order
+			Invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, allowLaterDeliveryOfMissingMessages);			
 			
 			if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.commit();
 			reportTransaction = null;

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Tue Nov  6 01:05:28 2007
@@ -220,9 +220,7 @@
             
         	boolean isDuplicate = true;
         	//still allow this msg if we have no corresponding invoker bean for it and we are inOrder
-        	boolean isInOrder = 
-        		SandeshaUtil.getDefaultPropertyBean(rmMsgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
-        	if(isInOrder)
+        	if(SandeshaUtil.isInOrder(rmMsgCtx.getMessageContext()))
         	{
           	InvokerBean finderBean = new InvokerBean();
           	finderBean.setMsgNo(msgNo);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Tue Nov  6 01:05:28 2007
@@ -210,7 +210,6 @@
 	public final static String elementMustForSpec = "elementMustForSpec";
 	public final static String couldNotSendCreateSeqResponse = "couldNotSendCreateSeqResponse";
 	public final static String invalidElementFoundWithinElement = "invalidElementFoundWithinElement";
-	public final static String invokerNotFound="invokerNotFound";
 	    
 	public final static String couldNotSendCloseResponse="couldNotSendCloseResponse";
 	

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Tue Nov  6 01:05:28 2007
@@ -46,7 +46,6 @@
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -59,7 +58,7 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.InvokerWorker;
 import org.apache.sandesha2.wsrm.Sequence;
 
 /**
@@ -135,7 +134,7 @@
 		}
 		
 		// setting acked msg no range
-		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+		ConfigurationContext configCtx = msgCtx.getConfigurationContext();
 		if (configCtx == null) {
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
 			log.debug(message);
@@ -202,9 +201,9 @@
 		}
 		
 		String specVersion = rmMsgCtx.getRMSpecVersion();
-		if ((SandeshaUtil.isDuplicateInOnlyMessage(rmMsgCtx.getMessageContext())
+		if ((SandeshaUtil.isDuplicateInOnlyMessage(msgCtx)
 						||
-					SandeshaUtil.isDuplicateInOutMessage(rmMsgCtx.getMessageContext()))
+					SandeshaUtil.isDuplicateInOutMessage(msgCtx))
 				&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
 			
 			// this is a duplicate message and the invocation type is EXACTLY_ONCE. We try to return
@@ -353,11 +352,11 @@
 			// If the MEP doesn't need the backchannel, and nor do we, we should signal it so that it
 			// can close off as soon as possible.
 			if (backchannelFree) {
+				TransportUtils.setResponseWritten(msgCtx, false);
+
 				RequestResponseTransport t = null;
 				t = (RequestResponseTransport) rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
-
 				if(t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus())) {
-					TransportUtils.setResponseWritten(msgCtx, false);
 					t.acknowledgeMessage(msgCtx);
 				}
 			}
@@ -377,33 +376,40 @@
 			}
 		}
 		
-		// If the storage manager has an invoker, then they may be implementing inOrder, or
-		// transactional delivery. Either way, if they have one we should use it.
-		SandeshaThread invoker = storageManager.getInvoker();
-		if (invoker != null) {
-			// Whatever the MEP, we stop processing here and the invoker will do the real work. We only
-			// SUSPEND if we need to keep the backchannel open for the response... we may as well ABORT
-			// to let other cases end more quickly.
-			if(backchannelFree && ackBackChannel) {
-				result = InvocationResponse.ABORT;
-			} else {
-				result = InvocationResponse.SUSPEND;
-			}
+		// If the storage manager is implementing inOrder, or using transactional delivery
+		// then we should hand the message over to the invoker thread. If not, we can invoke
+		// it directly ourselves.
+		InvokerWorker worker = null;
+		if (SandeshaUtil.isInOrder(msgCtx) || storageManager.hasUserTransaction(msgCtx)) {
 		    
-			InvokerBeanMgr storageMapMgr = storageManager.getInvokerBeanMgr();
-
 			InvokerBean invokerBean = new InvokerBean(key, msgNo, sequenceId);
-			
 			ContextManager contextMgr = SandeshaUtil.getContextManager(configCtx);
+
 			if(contextMgr != null) invokerBean.setContext(contextMgr.storeContext());
 
-			boolean wasAdded = storageMapMgr.insert(invokerBean);
+			boolean wasAdded = storageManager.getInvokerBeanMgr().insert(invokerBean);
 
 			// This will avoid performing application processing more than once.
 			rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 			
+			// Whatever the MEP, we stop processing here and the invoker will do the real work. As we
+			// are taking responsibility for the message we need to return SUSPEND
+			result = InvocationResponse.SUSPEND;
+            
 			if (wasAdded) {
-				storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());        
+				storageManager.storeMessageContext(key, msgCtx);
+				// We can invoke the message immediately, if this is the next message to invoke,
+				// and we don't have a user transaction in play.
+				if(bean.getNextMsgNoToProcess() == msgNo && !storageManager.hasUserTransaction(msgCtx)) {
+					String workId = sequenceId;
+					ConfigurationContext context = msgCtx.getConfigurationContext();
+					
+					worker = new InvokerWorker(context, invokerBean);
+					worker.setWorkId(workId);
+					
+					// Actually take the lock
+					worker.getLock().addWork(workId, worker);
+				}
 			} else {
 				// Abort this message immediately as this message has already been added
 				sendAck = false;
@@ -422,6 +428,14 @@
 		if (transaction != null && transaction.isActive()) 
 			transaction.commit();
 		
+		if(worker != null) {
+			try {
+				worker.run();
+			} catch(Exception e)  {
+				log.error("Caught exception running InvokerWorker", e);
+			}
+		}
+
 		if (sendAck) {
 			try {
 				transaction = storageManager.getTransaction();
@@ -429,6 +443,15 @@
 				RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
 				AcknowledgementManager.sendAckNow(ackRMMsgContext);
 				TransportUtils.setResponseWritten(msgCtx, true);
+				RequestResponseTransport t = 
+					(RequestResponseTransport) rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+				
+				// Tell the transport that we have finished with the message as the response should have been
+				// written
+				if(t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus())) {
+					t.signalResponseReady();
+				}
+
 				if (transaction != null && transaction.isActive()) transaction.commit();
 				transaction = null;
 			

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java Tue Nov  6 01:05:28 2007
@@ -33,9 +33,7 @@
 	}
 
 	public boolean insert(InvokerBean bean) throws SandeshaStorageException {
-		boolean result = super.insert(bean.getMessageContextRefKey(), bean);
-		mgr.getInMemoryTransaction().setReceivedMessages(true);
-		return result;
+		return super.insert(bean.getMessageContextRefKey(), bean);
 	}
 
 	public boolean delete(String key) throws SandeshaStorageException {
@@ -51,9 +49,7 @@
 	}
 	
 	public boolean update(InvokerBean bean) throws SandeshaStorageException {
-		boolean result = super.update(bean.getMessageContextRefKey(), bean);
-		mgr.getInMemoryTransaction().setReceivedMessages(true);
-		return result;
+		return super.update(bean.getMessageContextRefKey(), bean);
 	}
 	
 	public InvokerBean findUnique(InvokerBean bean) throws SandeshaException {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Tue Nov  6 01:05:28 2007
@@ -48,7 +48,6 @@
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.workers.Invoker;
 import org.apache.sandesha2.workers.SandeshaThread;
 import org.apache.sandesha2.workers.Sender;
 
@@ -62,7 +61,6 @@
     private SenderBeanMgr senderBeanMgr = null;
     private InvokerBeanMgr invokerBeanMgr = null;
     private Sender sender = null;
-    private Invoker invoker = null;
     private PollingManager pollingManager = null;
     private HashMap transactions = new HashMap();
     private boolean useSerialization = false;
@@ -76,10 +74,6 @@
 		SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
 		useSerialization = policy.isUseMessageSerialization();
 		
-		// Note that while inOrder is a global property we can decide if we need the
-		// invoker thread at this point. If we change this to be a sequence-level
-		// property then we'll need to revisit this.
-		boolean inOrder = policy.isInOrder();
 		boolean polling = policy.isEnableMakeConnection();
 		
 		this.rMSBeanMgr = new InMemoryRMSBeanMgr (this, context);
@@ -87,7 +81,6 @@
 		this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
 		this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, context);
 		this.sender = new Sender();
-		if(inOrder) this.invoker = new Invoker();
 		if(polling) this.pollingManager = new PollingManager();
 	}
 
@@ -135,7 +128,7 @@
 	 * Gets the Invoker for this Storage manager
 	 */
 	public SandeshaThread getInvoker() {
-	  return invoker;
+	  return null;
 	}
 
 	/** 
@@ -364,6 +357,7 @@
 		SOAPEnvelope   envelope;
 	}
 }
+
 
 
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Tue Nov  6 01:05:28 2007
@@ -28,7 +28,6 @@
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beans.RMBean;
-import org.apache.sandesha2.workers.SandeshaThread;
 
 /**
  * This class does not really implement transactions, but it is a good
@@ -45,7 +44,6 @@
 	private ArrayList enlistedBeans = new ArrayList();
 	private InMemoryTransaction waitingForTran = null;
 	private boolean sentMessages = false;
-	private boolean receivedMessages = false;
 	private boolean active = true;
 	
 	InMemoryTransaction(InMemoryStorageManager manager, String threadName, int id) {
@@ -59,10 +57,6 @@
 	public void commit() {
 		releaseLocks();
 		if(sentMessages) manager.getSender().wakeThread();
-		if(receivedMessages) {
-			SandeshaThread invoker = manager.getInvoker();
-			if(invoker != null) invoker.wakeThread();
-		}
 		active = false;
 	}
 
@@ -164,14 +158,11 @@
 		return result.toString();
 	}
 
-	public void setReceivedMessages(boolean receivedMessages) {
-		this.receivedMessages = receivedMessages;
-	}
-
 	public void setSentMessages(boolean sentMessages) {
 		this.sentMessages = sentMessages;
 	}
 }
+
 
 
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Tue Nov  6 01:05:28 2007
@@ -1169,5 +1169,14 @@
 		return epr;
 	}
 
+	public static boolean isInOrder(MessageContext context) throws SandeshaException {
+		if (log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isInOrder");
+		
+		SandeshaPolicyBean policy = getPropertyBean(context.getAxisOperation());
+		boolean result = policy.isInOrder();
+		
+		if (log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isInOrder, " + result);
+		return result;
+	}
 
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java Tue Nov  6 01:05:28 2007
@@ -68,12 +68,13 @@
 	 * Otherwise messages skipped over will be ignored
 	 * @throws SandeshaException
 	 */
-	public synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx, 
+	public static synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx, 
 			String sequenceID,
 			boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
-		//first we block while we wait for the invoking thread to pause
-		blockForPause();
+//		//first we block while we wait for the invoking thread to pause
+//		blockForPause();
 		try{
+			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(ctx, ctx.getAxisConfiguration());
 			//get all invoker beans for the sequence
 			InvokerBeanMgr storageMapMgr = storageManager
 					.getInvokerBeanMgr();
@@ -102,33 +103,23 @@
 						InvokerBean invoker = (InvokerBean)stMapIt.next();
 						
 						// start a new worker thread and let it do the invocation.
-						String workId = sequenceID + "::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
-					   //piece of work that will be assigned to the Worker.
+						String workId = sequenceID;
 						
-						String messageContextKey = invoker.getMessageContextRefKey();
-						InvokerWorker worker = new InvokerWorker(context,
-								messageContextKey, 
-								true); //want to ignore the enxt msg number
-						
-						worker.setLock(getWorkerLock());
+						InvokerWorker worker = new InvokerWorker(ctx, invoker);
+						worker.forceOutOfOrder();
+						worker.setPooled();
 						worker.setWorkId(workId);
 						
 						// Wrap the invoker worker with the correct context, if needed.
 						Runnable work = worker;
-						ContextManager contextMgr = SandeshaUtil.getContextManager(context);
+						ContextManager contextMgr = SandeshaUtil.getContextManager(ctx);
 						if(contextMgr != null) {
 							work = contextMgr.wrapWithContext(work, invoker.getContext());
 						}
 						
-						try {
-							// Try and set the lock up before we start the thread, but roll it back
-							// if we hit any problems
-							if(worker.getLock().addWork(workId, worker)){
-								threadPool.execute(work);
-							}
-						} catch(Exception e) {
-							worker.getLock().removeWork(workId);
-						}
+						// Setup the lock for the new worker
+						worker.getLock().addWork(workId, worker);
+						ctx.getThreadPool().execute(work);
 
 						long msgNumber = invoker.getMsgNo();
 						//if necessary, update the "next message number" bean under this transaction
@@ -176,8 +167,8 @@
 			}
 		}
 		finally{
-			//restart the invoker
-			finishPause();
+//			//restart the invoker
+//			finishPause();
 		}
 	}
 
@@ -311,9 +302,7 @@
 				//see if this is an out of order msg
 				boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
 				
-				String workId = sequenceId + "::" + bean.getMsgNo(); 
-																	//creating a workId to uniquely identify the
-															   //piece of work that will be assigned to the Worker.
+				String workId = sequenceId; 
 									
 				//check whether the bean is already assigned to a worker.
 				if (getWorkerLock().isWorkPresent(workId)) {
@@ -331,20 +320,15 @@
 					return sleep;
 				}
 
-				String messageContextKey = bean.getMessageContextRefKey();
-				
 				if(transaction != null) {
 					transaction.commit();
 					transaction = null;
 				}
 
 				// start a new worker thread and let it do the invocation.
-				InvokerWorker worker = new InvokerWorker(context,
-						messageContextKey, 
-						beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
-																	//out of order message
-				
-				worker.setLock(getWorkerLock());
+				InvokerWorker worker = new InvokerWorker(context, bean);
+				if(beanIsOutOfOrderMsg) worker.forceOutOfOrder();
+				worker.setPooled();
 				worker.setWorkId(workId);
 				
 				// Wrap the invoker worker with the correct context, if needed.

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Tue Nov  6 01:05:28 2007
@@ -17,6 +17,7 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.context.ContextManager;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
@@ -30,21 +31,161 @@
 
 public class InvokerWorker extends SandeshaWorker implements Runnable {
 
-	ConfigurationContext configurationContext = null;
-	String messageContextKey;
-	boolean ignoreNextMsg = false;
+	static final Log log = LogFactory.getLog(InvokerWorker.class);
+	static final WorkerLock lock = new WorkerLock();
 	
-	Log log = LogFactory.getLog(InvokerWorker.class);
+	private ConfigurationContext configurationContext;
+	private String  sequence;
+	private long    messageNumber;
+	private String  messageContextKey;
+	private boolean ignoreNextMsg;
+	private boolean pooledThread;
 	
-	public InvokerWorker (ConfigurationContext configurationContext, String messageContextKey, boolean ignoreNextMsg) {
+	public InvokerWorker (ConfigurationContext configurationContext, InvokerBean bean) {
+		// All invoker workers need to use the same lock, so we point to the static one here.
+		this.setLock(lock);
+		
 		this.configurationContext = configurationContext;
-		this.messageContextKey = messageContextKey;
-		this.ignoreNextMsg = ignoreNextMsg;
+		initializeFromBean(bean);
 	}
 	
+	public void forceOutOfOrder() {
+		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::forceOutOfOrder");
+		ignoreNextMsg = true;
+		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::forceOutOfOrder");
+	}
+
+	public void setPooled() {
+		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::setPooled");
+		pooledThread = true;
+		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::setPooled");
+	}
+
+	private void initializeFromBean(InvokerBean bean) {
+		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::initializeFromBean " + bean);
+		
+		this.sequence = bean.getSequenceID();
+		this.messageNumber = bean.getMsgNo();
+		this.messageContextKey = bean.getMessageContextRefKey();
+		
+		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::initializeFromBean");
+	}
+		
+	/**
+	 * The run method invokes the message that this invoker has been primed with, but will
+	 * also attempt to invoke subsequent messages. If the invoker worker is running on the
+	 * application thread then we move on to a thread pool for the second message, but if
+	 * we are already on a pooled thread then we just continue.
+	 */
 	public void run() {
-		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run");
+		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run, message " + messageNumber + ", sequence " + sequence);
+		
+		// If we are not the holder of the correct lock, then we have to stop
+		if(lock != null && !lock.ownsLock(workId, this)) {
+			if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, another worker holds the lock");
+			return;
+		}
+		
+		Transaction tran = null;
+		try {
+			InvokerWorker nextWorker = null;
+			Runnable nextRunnable = null;
+
+			// Invoke the first message
+			invokeMessage(null);
+
+			// Look for the next message, so long as we are still processing normally
+			while(!ignoreNextMsg) {
+				InvokerBean finder = new InvokerBean();
+				finder.setSequenceID(sequence);
+				finder.setMsgNo(messageNumber + 1);
+
+				StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+				tran = storageManager.getTransaction();
+
+				InvokerBeanMgr mgr = storageManager.getInvokerBeanMgr();
+				InvokerBean nextBean = mgr.findUnique(finder);
+
+				if(nextBean != null) {
+					if(pooledThread) {
+						initializeFromBean(nextBean);
+						final Transaction theTran = tran;
+						Runnable work = new Runnable() {
+							public void run() {
+								invokeMessage(theTran);
+							}
+						};
+
+						// Wrap the work with the correct context, if needed.
+						ContextManager contextMgr = SandeshaUtil.getContextManager(configurationContext);
+						if(contextMgr != null) {
+							work = contextMgr.wrapWithContext(work, nextBean.getContext());
+						}
+
+						// Finally do the work
+						work.run();
+
+						tran = null;
+					} else {
+						nextWorker = new InvokerWorker(configurationContext, nextBean);
+						nextWorker.setPooled();
+						nextWorker.setWorkId(workId);
+
+						// Wrap the invoker worker with the correct context, if needed.
+						ContextManager contextMgr = SandeshaUtil.getContextManager(configurationContext);
+						if(contextMgr != null) {
+							nextRunnable = contextMgr.wrapWithContext(nextWorker, nextBean.getContext());
+						} else {
+							nextRunnable = nextWorker;
+						}
+					}
+				}
 		
+				// Clean up the tran, in case we didn't pass it into the invoke method
+				if(tran != null) tran.commit();
+				tran = null;
+						
+				if(nextBean == null || nextWorker != null) {
+					// We have run out of work, or the new worker has taken it on, so we can
+					// break out of the loop
+					break;
+				}
+			}
+					
+			if (workId !=null && lock!=null) {
+				lock.removeWork(workId);
+			}
+
+			// If we created another worker, set it running now that we have released the lock
+			if(nextWorker != null) {
+				lock.addWork(workId, nextWorker);
+				configurationContext.getThreadPool().execute(nextRunnable);
+			}
+
+		} catch(SandeshaException e) {
+			log.debug("Exception within InvokerWorker", e);
+
+			// Clean up the tran, if there is one left
+			if(tran != null) {
+				try {
+					tran.rollback();
+				} catch(SandeshaException e2) {
+					log.debug("Exception rolling back tran", e2);
+				}
+			}
+		} finally {
+			// Release the lock
+			if (workId !=null && lock!=null && lock.ownsLock(workId, this)) {
+				lock.removeWork(workId);
+			}
+		}
+				
+		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+	}
+
+	private void invokeMessage(Transaction tran) {
+		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::invokeMessage");
+
 		Transaction transaction = null;
 		MessageContext msgToInvoke = null;
 		
@@ -54,7 +195,11 @@
 			InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
 			
 			//starting a transaction
-			transaction = storageManager.getTransaction();
+			if(tran == null) {
+				transaction = storageManager.getTransaction();
+			} else {
+				transaction = tran;
+			}
 			
 			InvokerBean invokerBean = invokerBeanMgr.retrieve(messageContextKey);
 
@@ -153,7 +298,7 @@
 					TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), storageManager);
 					// exit from current iteration. (since an entry
 					// was removed)
-					if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run Last message return");	
+					if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage Last message return");	
 					if(transaction != null && transaction.isActive()) transaction.commit();
 					return;
 				}
@@ -181,10 +326,6 @@
 			if (log.isErrorEnabled())
 				log.error(e.toString(), e);
 		} finally {
-			if (workId !=null && lock!=null) {
-				lock.removeWork(workId);
-			}
-
 			if (transaction!=null && transaction.isActive()) {
 				try {
 					transaction.rollback();
@@ -195,7 +336,7 @@
 			}
 		}
 		
-		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage");
 	}
 
 	private void makeMessageReadyForReinjection(MessageContext messageContext) {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties Tue Nov  6 01:05:28 2007
@@ -222,7 +222,6 @@
 toBeanNotSet=The 'To' Sequence Property Bean has not been set for the sequence.
 cannotFindTransportInDesc=Cannot find the transport in description {0} in the ConfigurationContext.
 invalidElementFoundWithinElement=Found invalid ''{0}'' element within ''{1}'' element.
-invokerNotFound=An invoker thread was not found to dispatch messages on the inbound sequence {0}.
 cannotSetPolicyBeanServiceNull=Cannot set the given SandeshaPolicyBean since the AxisService is not present
 
 #------------------



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org