You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2007/11/13 13:04:19 UTC

svn commit: r594502 - in /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/SequenceProcessor.java workers/InvokerWorker.java

Author: mckierna
Date: Tue Nov 13 04:04:18 2007
New Revision: 594502

URL: http://svn.apache.org/viewvc?rev=594502&view=rev
Log:
some refactoring and potential NPE protection

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=594502&r1=594501&r2=594502&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Tue Nov 13 04:04:18 2007
@@ -382,7 +382,7 @@
 		// then we should hand the message over to the invoker thread. If not, we can invoke
 		// it directly ourselves.
 		InvokerWorker worker = null;
-		if (SandeshaUtil.isInOrder(msgCtx) || storageManager.hasUserTransaction(msgCtx)) {
+		if (SandeshaUtil.isInOrder(msgCtx)) {
 		    
 			InvokerBean invokerBean = new InvokerBean(key, msgNo, sequenceId);
 			ContextManager contextMgr = SandeshaUtil.getContextManager(configCtx);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=594502&r1=594501&r2=594502&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Tue Nov 13 04:04:18 2007
@@ -59,6 +59,7 @@
 	private String  messageContextKey;
 	private boolean ignoreNextMsg;
 	private boolean pooledThread;
+	boolean lastMessageInvoked;
 	
 	public InvokerWorker (ConfigurationContext configurationContext, InvokerBean bean) {
 		// All invoker workers need to use the same lock, so we point to the static one here.
@@ -99,11 +100,6 @@
 	public void run() {
 		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run, message " + messageNumber + ", sequence " + sequence);
 		
-		// If we are not the holder of the correct lock, then we have to stop
-		if(lock != null && !lock.ownsLock(workId, this)) {
-			if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, another worker holds the lock");
-			return;
-		}
 		
 		Transaction tran = null;
 		try {
@@ -111,10 +107,11 @@
 			Runnable nextRunnable = null;
 
 			// Invoke the first message
-			invokeMessage(null);
+	    	lastMessageInvoked = invokeMessage(null);
 
 			// Look for the next message, so long as we are still processing normally
-			while(!ignoreNextMsg) {
+			while(!ignoreNextMsg && lastMessageInvoked) {
+				if(log.isDebugEnabled()) log.debug("InvokerWorker:: looking for next msg to invoke");
 				InvokerBean finder = new InvokerBean();
 				finder.setSequenceID(sequence);
 				finder.setMsgNo(messageNumber + 1);
@@ -127,11 +124,12 @@
 
 				if(nextBean != null) {
 					if(pooledThread) {
+						if(log.isDebugEnabled()) log.debug("InvokerWorker:: pooledThread");
 						initializeFromBean(nextBean);
 						final Transaction theTran = tran;
 						Runnable work = new Runnable() {
 							public void run() {
-								invokeMessage(theTran);
+								lastMessageInvoked = invokeMessage(theTran);
 							}
 						};
 
@@ -146,6 +144,7 @@
 
 						tran = null;
 					} else {
+						if(log.isDebugEnabled()) log.debug("InvokerWorker:: not pooled thread");
 						nextWorker = new InvokerWorker(configurationContext, nextBean);
 						nextWorker.setPooled();
 						nextWorker.setWorkId(workId);
@@ -169,7 +168,7 @@
 					// break out of the loop
 					break;
 				}
-			}
+			}//end while
 					
 			if (workId !=null && lock!=null) {
 				lock.removeWork(workId);
@@ -202,11 +201,18 @@
 		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
 	}
 
-	private void invokeMessage(Transaction tran) {
+	private boolean invokeMessage(Transaction tran) {
 		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::invokeMessage");
 
 		Transaction transaction = null;
 		MessageContext msgToInvoke = null;
+		boolean messageInvoked = true;
+		
+	    // If we are not the holder of the correct lock, then we have to stop
+	    if(lock != null && (!lock.ownsLock(workId, this))) {
+	    	if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, another worker holds the lock");
+	    	return false;
+	    }
 		
 		try {
 			
@@ -223,6 +229,11 @@
 			InvokerBean invokerBean = invokerBeanMgr.retrieve(messageContextKey);
 
 			msgToInvoke = storageManager.retrieveMessageContext(messageContextKey, configurationContext);
+			if(msgToInvoke==null){
+				//return since there is nothing to do
+				if(log.isDebugEnabled()) log.debug("null msg");
+				return false;
+			}
 
 			// ending the transaction before invocation.
 			if(transaction != null) {
@@ -288,6 +299,7 @@
 
 				if (transaction != null && transaction.isActive())
 					transaction.rollback();
+				messageInvoked = false;
 				
 				handleFault(rmMsg, e);
 			}
@@ -317,9 +329,9 @@
 					TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), storageManager);
 					// exit from current iteration. (since an entry
 					// was removed)
-					if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage Last message return");	
 					if(transaction != null && transaction.isActive()) transaction.commit();
-					return;
+					if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage Last message return " + messageInvoked);
+					return messageInvoked;	
 				}
 			}
 			
@@ -344,6 +356,7 @@
 		} catch (Exception e) {
 			if (log.isErrorEnabled())
 				log.error(e.toString(), e);
+			messageInvoked = false;
 		} finally {
 			if (transaction!=null && transaction.isActive()) {
 				try {
@@ -355,7 +368,8 @@
 			}
 		}
 		
-		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage");
+		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage " + messageInvoked);
+		return messageInvoked;
 	}
 
 	private void makeMessageReadyForReinjection(MessageContext messageContext) {



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org