You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2007/11/13 13:04:19 UTC
svn commit: r594502 - in
/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2:
msgprocessors/SequenceProcessor.java workers/InvokerWorker.java
Author: mckierna
Date: Tue Nov 13 04:04:18 2007
New Revision: 594502
URL: http://svn.apache.org/viewvc?rev=594502&view=rev
Log:
some refactoring and potential NPE protection
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=594502&r1=594501&r2=594502&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Tue Nov 13 04:04:18 2007
@@ -382,7 +382,7 @@
// then we should hand the message over to the invoker thread. If not, we can invoke
// it directly ourselves.
InvokerWorker worker = null;
- if (SandeshaUtil.isInOrder(msgCtx) || storageManager.hasUserTransaction(msgCtx)) {
+ if (SandeshaUtil.isInOrder(msgCtx)) {
InvokerBean invokerBean = new InvokerBean(key, msgNo, sequenceId);
ContextManager contextMgr = SandeshaUtil.getContextManager(configCtx);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=594502&r1=594501&r2=594502&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Tue Nov 13 04:04:18 2007
@@ -59,6 +59,7 @@
private String messageContextKey;
private boolean ignoreNextMsg;
private boolean pooledThread;
+ boolean lastMessageInvoked;
public InvokerWorker (ConfigurationContext configurationContext, InvokerBean bean) {
// All invoker workers need to use the same lock, so we point to the static one here.
@@ -99,11 +100,6 @@
public void run() {
if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run, message " + messageNumber + ", sequence " + sequence);
- // If we are not the holder of the correct lock, then we have to stop
- if(lock != null && !lock.ownsLock(workId, this)) {
- if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, another worker holds the lock");
- return;
- }
Transaction tran = null;
try {
@@ -111,10 +107,11 @@
Runnable nextRunnable = null;
// Invoke the first message
- invokeMessage(null);
+ lastMessageInvoked = invokeMessage(null);
// Look for the next message, so long as we are still processing normally
- while(!ignoreNextMsg) {
+ while(!ignoreNextMsg && lastMessageInvoked) {
+ if(log.isDebugEnabled()) log.debug("InvokerWorker:: looking for next msg to invoke");
InvokerBean finder = new InvokerBean();
finder.setSequenceID(sequence);
finder.setMsgNo(messageNumber + 1);
@@ -127,11 +124,12 @@
if(nextBean != null) {
if(pooledThread) {
+ if(log.isDebugEnabled()) log.debug("InvokerWorker:: pooledThread");
initializeFromBean(nextBean);
final Transaction theTran = tran;
Runnable work = new Runnable() {
public void run() {
- invokeMessage(theTran);
+ lastMessageInvoked = invokeMessage(theTran);
}
};
@@ -146,6 +144,7 @@
tran = null;
} else {
+ if(log.isDebugEnabled()) log.debug("InvokerWorker:: not pooled thread");
nextWorker = new InvokerWorker(configurationContext, nextBean);
nextWorker.setPooled();
nextWorker.setWorkId(workId);
@@ -169,7 +168,7 @@
// break out of the loop
break;
}
- }
+ }//end while
if (workId !=null && lock!=null) {
lock.removeWork(workId);
@@ -202,11 +201,18 @@
if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
}
- private void invokeMessage(Transaction tran) {
+ private boolean invokeMessage(Transaction tran) {
if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::invokeMessage");
Transaction transaction = null;
MessageContext msgToInvoke = null;
+ boolean messageInvoked = true;
+
+ // If we are not the holder of the correct lock, then we have to stop
+ if(lock != null && (!lock.ownsLock(workId, this))) {
+ if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, another worker holds the lock");
+ return false;
+ }
try {
@@ -223,6 +229,11 @@
InvokerBean invokerBean = invokerBeanMgr.retrieve(messageContextKey);
msgToInvoke = storageManager.retrieveMessageContext(messageContextKey, configurationContext);
+ if(msgToInvoke==null){
+ //return since there is nothing to do
+ if(log.isDebugEnabled()) log.debug("null msg");
+ return false;
+ }
// ending the transaction before invocation.
if(transaction != null) {
@@ -288,6 +299,7 @@
if (transaction != null && transaction.isActive())
transaction.rollback();
+ messageInvoked = false;
handleFault(rmMsg, e);
}
@@ -317,9 +329,9 @@
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), storageManager);
// exit from current iteration. (since an entry
// was removed)
- if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage Last message return");
if(transaction != null && transaction.isActive()) transaction.commit();
- return;
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage Last message return " + messageInvoked);
+ return messageInvoked;
}
}
@@ -344,6 +356,7 @@
} catch (Exception e) {
if (log.isErrorEnabled())
log.error(e.toString(), e);
+ messageInvoked = false;
} finally {
if (transaction!=null && transaction.isActive()) {
try {
@@ -355,7 +368,8 @@
}
}
- if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage");
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage " + messageInvoked);
+ return messageInvoked;
}
private void makeMessageReadyForReinjection(MessageContext messageContext) {
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org