You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2007/11/06 10:05:29 UTC
svn commit: r592342 - in
/webservices/sandesha/trunk/java/modules/core/src/main:
java/org/apache/sandesha2/client/ java/org/apache/sandesha2/handlers/
java/org/apache/sandesha2/i18n/ java/org/apache/sandesha2/msgprocessors/
java/org/apache/sandesha2/st...
Author: gatfora
Date: Tue Nov 6 01:05:28 2007
New Revision: 592342
URL: http://svn.apache.org/viewvc?rev=592342&view=rev
Log:
As described in SANDESHA2-108 this change avoids the Inorder thread switch if the message being received is the next message to be invoked. This change requires Axis2 revision 592132
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java Tue Nov 6 01:05:28 2007
@@ -942,16 +942,8 @@
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration());
reportTransaction = storageManager.getTransaction();
- //only do this if we are running inOrder
- if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
- Invoker invoker = (Invoker)SandeshaUtil.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration()).getInvoker();
- if (invoker==null){
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.invokerNotFound, sequenceID));
- }
-
- invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, allowLaterDeliveryOfMissingMessages);
- }
+ // There will only be messages waiting if we are running in-order
+ Invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, allowLaterDeliveryOfMissingMessages);
if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.commit();
reportTransaction = null;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Tue Nov 6 01:05:28 2007
@@ -220,9 +220,7 @@
boolean isDuplicate = true;
//still allow this msg if we have no corresponding invoker bean for it and we are inOrder
- boolean isInOrder =
- SandeshaUtil.getDefaultPropertyBean(rmMsgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
- if(isInOrder)
+ if(SandeshaUtil.isInOrder(rmMsgCtx.getMessageContext()))
{
InvokerBean finderBean = new InvokerBean();
finderBean.setMsgNo(msgNo);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Tue Nov 6 01:05:28 2007
@@ -210,7 +210,6 @@
public final static String elementMustForSpec = "elementMustForSpec";
public final static String couldNotSendCreateSeqResponse = "couldNotSendCreateSeqResponse";
public final static String invalidElementFoundWithinElement = "invalidElementFoundWithinElement";
- public final static String invokerNotFound="invokerNotFound";
public final static String couldNotSendCloseResponse="couldNotSendCloseResponse";
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Tue Nov 6 01:05:28 2007
@@ -46,7 +46,6 @@
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -59,7 +58,7 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.InvokerWorker;
import org.apache.sandesha2.wsrm.Sequence;
/**
@@ -135,7 +134,7 @@
}
// setting acked msg no range
- ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ ConfigurationContext configCtx = msgCtx.getConfigurationContext();
if (configCtx == null) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
log.debug(message);
@@ -202,9 +201,9 @@
}
String specVersion = rmMsgCtx.getRMSpecVersion();
- if ((SandeshaUtil.isDuplicateInOnlyMessage(rmMsgCtx.getMessageContext())
+ if ((SandeshaUtil.isDuplicateInOnlyMessage(msgCtx)
||
- SandeshaUtil.isDuplicateInOutMessage(rmMsgCtx.getMessageContext()))
+ SandeshaUtil.isDuplicateInOutMessage(msgCtx))
&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
// this is a duplicate message and the invocation type is EXACTLY_ONCE. We try to return
@@ -353,11 +352,11 @@
// If the MEP doesn't need the backchannel, and nor do we, we should signal it so that it
// can close off as soon as possible.
if (backchannelFree) {
+ TransportUtils.setResponseWritten(msgCtx, false);
+
RequestResponseTransport t = null;
t = (RequestResponseTransport) rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
-
if(t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus())) {
- TransportUtils.setResponseWritten(msgCtx, false);
t.acknowledgeMessage(msgCtx);
}
}
@@ -377,33 +376,40 @@
}
}
- // If the storage manager has an invoker, then they may be implementing inOrder, or
- // transactional delivery. Either way, if they have one we should use it.
- SandeshaThread invoker = storageManager.getInvoker();
- if (invoker != null) {
- // Whatever the MEP, we stop processing here and the invoker will do the real work. We only
- // SUSPEND if we need to keep the backchannel open for the response... we may as well ABORT
- // to let other cases end more quickly.
- if(backchannelFree && ackBackChannel) {
- result = InvocationResponse.ABORT;
- } else {
- result = InvocationResponse.SUSPEND;
- }
+ // If the storage manager is implementing inOrder, or using transactional delivery
+ // then we should hand the message over to the invoker thread. If not, we can invoke
+ // it directly ourselves.
+ InvokerWorker worker = null;
+ if (SandeshaUtil.isInOrder(msgCtx) || storageManager.hasUserTransaction(msgCtx)) {
- InvokerBeanMgr storageMapMgr = storageManager.getInvokerBeanMgr();
-
InvokerBean invokerBean = new InvokerBean(key, msgNo, sequenceId);
-
ContextManager contextMgr = SandeshaUtil.getContextManager(configCtx);
+
if(contextMgr != null) invokerBean.setContext(contextMgr.storeContext());
- boolean wasAdded = storageMapMgr.insert(invokerBean);
+ boolean wasAdded = storageManager.getInvokerBeanMgr().insert(invokerBean);
// This will avoid performing application processing more than once.
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+ // Whatever the MEP, we stop processing here and the invoker will do the real work. As we
+ // are taking responsibility for the message we need to return SUSPEND
+ result = InvocationResponse.SUSPEND;
+
if (wasAdded) {
- storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
+ storageManager.storeMessageContext(key, msgCtx);
+ // We can invoke the message immediately, if this is the next message to invoke,
+ // and we don't have a user transaction in play.
+ if(bean.getNextMsgNoToProcess() == msgNo && !storageManager.hasUserTransaction(msgCtx)) {
+ String workId = sequenceId;
+ ConfigurationContext context = msgCtx.getConfigurationContext();
+
+ worker = new InvokerWorker(context, invokerBean);
+ worker.setWorkId(workId);
+
+ // Actually take the lock
+ worker.getLock().addWork(workId, worker);
+ }
} else {
// Abort this message immediately as this message has already been added
sendAck = false;
@@ -422,6 +428,14 @@
if (transaction != null && transaction.isActive())
transaction.commit();
+ if(worker != null) {
+ try {
+ worker.run();
+ } catch(Exception e) {
+ log.error("Caught exception running InvokerWorker", e);
+ }
+ }
+
if (sendAck) {
try {
transaction = storageManager.getTransaction();
@@ -429,6 +443,15 @@
RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
AcknowledgementManager.sendAckNow(ackRMMsgContext);
TransportUtils.setResponseWritten(msgCtx, true);
+ RequestResponseTransport t =
+ (RequestResponseTransport) rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ // Tell the transport that we have finished with the message as the response should have been
+ // written
+ if(t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus())) {
+ t.signalResponseReady();
+ }
+
if (transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java Tue Nov 6 01:05:28 2007
@@ -33,9 +33,7 @@
}
public boolean insert(InvokerBean bean) throws SandeshaStorageException {
- boolean result = super.insert(bean.getMessageContextRefKey(), bean);
- mgr.getInMemoryTransaction().setReceivedMessages(true);
- return result;
+ return super.insert(bean.getMessageContextRefKey(), bean);
}
public boolean delete(String key) throws SandeshaStorageException {
@@ -51,9 +49,7 @@
}
public boolean update(InvokerBean bean) throws SandeshaStorageException {
- boolean result = super.update(bean.getMessageContextRefKey(), bean);
- mgr.getInMemoryTransaction().setReceivedMessages(true);
- return result;
+ return super.update(bean.getMessageContextRefKey(), bean);
}
public InvokerBean findUnique(InvokerBean bean) throws SandeshaException {
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Tue Nov 6 01:05:28 2007
@@ -48,7 +48,6 @@
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMBean;
import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.workers.Invoker;
import org.apache.sandesha2.workers.SandeshaThread;
import org.apache.sandesha2.workers.Sender;
@@ -62,7 +61,6 @@
private SenderBeanMgr senderBeanMgr = null;
private InvokerBeanMgr invokerBeanMgr = null;
private Sender sender = null;
- private Invoker invoker = null;
private PollingManager pollingManager = null;
private HashMap transactions = new HashMap();
private boolean useSerialization = false;
@@ -76,10 +74,6 @@
SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
useSerialization = policy.isUseMessageSerialization();
- // Note that while inOrder is a global property we can decide if we need the
- // invoker thread at this point. If we change this to be a sequence-level
- // property then we'll need to revisit this.
- boolean inOrder = policy.isInOrder();
boolean polling = policy.isEnableMakeConnection();
this.rMSBeanMgr = new InMemoryRMSBeanMgr (this, context);
@@ -87,7 +81,6 @@
this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, context);
this.sender = new Sender();
- if(inOrder) this.invoker = new Invoker();
if(polling) this.pollingManager = new PollingManager();
}
@@ -135,7 +128,7 @@
* Gets the Invoker for this Storage manager
*/
public SandeshaThread getInvoker() {
- return invoker;
+ return null;
}
/**
@@ -364,6 +357,7 @@
SOAPEnvelope envelope;
}
}
+
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Tue Nov 6 01:05:28 2007
@@ -28,7 +28,6 @@
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beans.RMBean;
-import org.apache.sandesha2.workers.SandeshaThread;
/**
* This class does not really implement transactions, but it is a good
@@ -45,7 +44,6 @@
private ArrayList enlistedBeans = new ArrayList();
private InMemoryTransaction waitingForTran = null;
private boolean sentMessages = false;
- private boolean receivedMessages = false;
private boolean active = true;
InMemoryTransaction(InMemoryStorageManager manager, String threadName, int id) {
@@ -59,10 +57,6 @@
public void commit() {
releaseLocks();
if(sentMessages) manager.getSender().wakeThread();
- if(receivedMessages) {
- SandeshaThread invoker = manager.getInvoker();
- if(invoker != null) invoker.wakeThread();
- }
active = false;
}
@@ -164,14 +158,11 @@
return result.toString();
}
- public void setReceivedMessages(boolean receivedMessages) {
- this.receivedMessages = receivedMessages;
- }
-
public void setSentMessages(boolean sentMessages) {
this.sentMessages = sentMessages;
}
}
+
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Tue Nov 6 01:05:28 2007
@@ -1169,5 +1169,14 @@
return epr;
}
+ public static boolean isInOrder(MessageContext context) throws SandeshaException {
+ if (log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isInOrder");
+
+ SandeshaPolicyBean policy = getPropertyBean(context.getAxisOperation());
+ boolean result = policy.isInOrder();
+
+ if (log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isInOrder, " + result);
+ return result;
+ }
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java Tue Nov 6 01:05:28 2007
@@ -68,12 +68,13 @@
* Otherwise messages skipped over will be ignored
* @throws SandeshaException
*/
- public synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx,
+ public static synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx,
String sequenceID,
boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
- //first we block while we wait for the invoking thread to pause
- blockForPause();
+// //first we block while we wait for the invoking thread to pause
+// blockForPause();
try{
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(ctx, ctx.getAxisConfiguration());
//get all invoker beans for the sequence
InvokerBeanMgr storageMapMgr = storageManager
.getInvokerBeanMgr();
@@ -102,33 +103,23 @@
InvokerBean invoker = (InvokerBean)stMapIt.next();
// start a new worker thread and let it do the invocation.
- String workId = sequenceID + "::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
- //piece of work that will be assigned to the Worker.
+ String workId = sequenceID;
- String messageContextKey = invoker.getMessageContextRefKey();
- InvokerWorker worker = new InvokerWorker(context,
- messageContextKey,
- true); //want to ignore the enxt msg number
-
- worker.setLock(getWorkerLock());
+ InvokerWorker worker = new InvokerWorker(ctx, invoker);
+ worker.forceOutOfOrder();
+ worker.setPooled();
worker.setWorkId(workId);
// Wrap the invoker worker with the correct context, if needed.
Runnable work = worker;
- ContextManager contextMgr = SandeshaUtil.getContextManager(context);
+ ContextManager contextMgr = SandeshaUtil.getContextManager(ctx);
if(contextMgr != null) {
work = contextMgr.wrapWithContext(work, invoker.getContext());
}
- try {
- // Try and set the lock up before we start the thread, but roll it back
- // if we hit any problems
- if(worker.getLock().addWork(workId, worker)){
- threadPool.execute(work);
- }
- } catch(Exception e) {
- worker.getLock().removeWork(workId);
- }
+ // Setup the lock for the new worker
+ worker.getLock().addWork(workId, worker);
+ ctx.getThreadPool().execute(work);
long msgNumber = invoker.getMsgNo();
//if necessary, update the "next message number" bean under this transaction
@@ -176,8 +167,8 @@
}
}
finally{
- //restart the invoker
- finishPause();
+// //restart the invoker
+// finishPause();
}
}
@@ -311,9 +302,7 @@
//see if this is an out of order msg
boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
- String workId = sequenceId + "::" + bean.getMsgNo();
- //creating a workId to uniquely identify the
- //piece of work that will be assigned to the Worker.
+ String workId = sequenceId;
//check whether the bean is already assigned to a worker.
if (getWorkerLock().isWorkPresent(workId)) {
@@ -331,20 +320,15 @@
return sleep;
}
- String messageContextKey = bean.getMessageContextRefKey();
-
if(transaction != null) {
transaction.commit();
transaction = null;
}
// start a new worker thread and let it do the invocation.
- InvokerWorker worker = new InvokerWorker(context,
- messageContextKey,
- beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
- //out of order message
-
- worker.setLock(getWorkerLock());
+ InvokerWorker worker = new InvokerWorker(context, bean);
+ if(beanIsOutOfOrderMsg) worker.forceOutOfOrder();
+ worker.setPooled();
worker.setWorkId(workId);
// Wrap the invoker worker with the correct context, if needed.
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Tue Nov 6 01:05:28 2007
@@ -17,6 +17,7 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.context.ContextManager;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
@@ -30,21 +31,161 @@
public class InvokerWorker extends SandeshaWorker implements Runnable {
- ConfigurationContext configurationContext = null;
- String messageContextKey;
- boolean ignoreNextMsg = false;
+ static final Log log = LogFactory.getLog(InvokerWorker.class);
+ static final WorkerLock lock = new WorkerLock();
- Log log = LogFactory.getLog(InvokerWorker.class);
+ private ConfigurationContext configurationContext;
+ private String sequence;
+ private long messageNumber;
+ private String messageContextKey;
+ private boolean ignoreNextMsg;
+ private boolean pooledThread;
- public InvokerWorker (ConfigurationContext configurationContext, String messageContextKey, boolean ignoreNextMsg) {
+ public InvokerWorker (ConfigurationContext configurationContext, InvokerBean bean) {
+ // All invoker workers need to use the same lock, so we point to the static one here.
+ this.setLock(lock);
+
this.configurationContext = configurationContext;
- this.messageContextKey = messageContextKey;
- this.ignoreNextMsg = ignoreNextMsg;
+ initializeFromBean(bean);
}
+ public void forceOutOfOrder() {
+ if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::forceOutOfOrder");
+ ignoreNextMsg = true;
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::forceOutOfOrder");
+ }
+
+ public void setPooled() {
+ if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::setPooled");
+ pooledThread = true;
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::setPooled");
+ }
+
+ private void initializeFromBean(InvokerBean bean) {
+ if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::initializeFromBean " + bean);
+
+ this.sequence = bean.getSequenceID();
+ this.messageNumber = bean.getMsgNo();
+ this.messageContextKey = bean.getMessageContextRefKey();
+
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::initializeFromBean");
+ }
+
+ /**
+ * The run method invokes the message that this invoker has been primed with, but will
+ * also attempt to invoke subsequent messages. If the invoker worker is running on the
+ * application thread then we move on to a thread pool for the second message, but if
+ * we are already on a pooled thread then we just continue.
+ */
public void run() {
- if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run");
+ if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run, message " + messageNumber + ", sequence " + sequence);
+
+ // If we are not the holder of the correct lock, then we have to stop
+ if(lock != null && !lock.ownsLock(workId, this)) {
+ if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, another worker holds the lock");
+ return;
+ }
+
+ Transaction tran = null;
+ try {
+ InvokerWorker nextWorker = null;
+ Runnable nextRunnable = null;
+
+ // Invoke the first message
+ invokeMessage(null);
+
+ // Look for the next message, so long as we are still processing normally
+ while(!ignoreNextMsg) {
+ InvokerBean finder = new InvokerBean();
+ finder.setSequenceID(sequence);
+ finder.setMsgNo(messageNumber + 1);
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ tran = storageManager.getTransaction();
+
+ InvokerBeanMgr mgr = storageManager.getInvokerBeanMgr();
+ InvokerBean nextBean = mgr.findUnique(finder);
+
+ if(nextBean != null) {
+ if(pooledThread) {
+ initializeFromBean(nextBean);
+ final Transaction theTran = tran;
+ Runnable work = new Runnable() {
+ public void run() {
+ invokeMessage(theTran);
+ }
+ };
+
+ // Wrap the work with the correct context, if needed.
+ ContextManager contextMgr = SandeshaUtil.getContextManager(configurationContext);
+ if(contextMgr != null) {
+ work = contextMgr.wrapWithContext(work, nextBean.getContext());
+ }
+
+ // Finally do the work
+ work.run();
+
+ tran = null;
+ } else {
+ nextWorker = new InvokerWorker(configurationContext, nextBean);
+ nextWorker.setPooled();
+ nextWorker.setWorkId(workId);
+
+ // Wrap the invoker worker with the correct context, if needed.
+ ContextManager contextMgr = SandeshaUtil.getContextManager(configurationContext);
+ if(contextMgr != null) {
+ nextRunnable = contextMgr.wrapWithContext(nextWorker, nextBean.getContext());
+ } else {
+ nextRunnable = nextWorker;
+ }
+ }
+ }
+ // Clean up the tran, in case we didn't pass it into the invoke method
+ if(tran != null) tran.commit();
+ tran = null;
+
+ if(nextBean == null || nextWorker != null) {
+ // We have run out of work, or the new worker has taken it on, so we can
+ // break out of the loop
+ break;
+ }
+ }
+
+ if (workId !=null && lock!=null) {
+ lock.removeWork(workId);
+ }
+
+ // If we created another worker, set it running now that we have released the lock
+ if(nextWorker != null) {
+ lock.addWork(workId, nextWorker);
+ configurationContext.getThreadPool().execute(nextRunnable);
+ }
+
+ } catch(SandeshaException e) {
+ log.debug("Exception within InvokerWorker", e);
+
+ // Clean up the tran, if there is one left
+ if(tran != null) {
+ try {
+ tran.rollback();
+ } catch(SandeshaException e2) {
+ log.debug("Exception rolling back tran", e2);
+ }
+ }
+ } finally {
+ // Release the lock
+ if (workId !=null && lock!=null && lock.ownsLock(workId, this)) {
+ lock.removeWork(workId);
+ }
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+ }
+
+ private void invokeMessage(Transaction tran) {
+ if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::invokeMessage");
+
Transaction transaction = null;
MessageContext msgToInvoke = null;
@@ -54,7 +195,11 @@
InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
//starting a transaction
- transaction = storageManager.getTransaction();
+ if(tran == null) {
+ transaction = storageManager.getTransaction();
+ } else {
+ transaction = tran;
+ }
InvokerBean invokerBean = invokerBeanMgr.retrieve(messageContextKey);
@@ -153,7 +298,7 @@
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), storageManager);
// exit from current iteration. (since an entry
// was removed)
- if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run Last message return");
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage Last message return");
if(transaction != null && transaction.isActive()) transaction.commit();
return;
}
@@ -181,10 +326,6 @@
if (log.isErrorEnabled())
log.error(e.toString(), e);
} finally {
- if (workId !=null && lock!=null) {
- lock.removeWork(workId);
- }
-
if (transaction!=null && transaction.isActive()) {
try {
transaction.rollback();
@@ -195,7 +336,7 @@
}
}
- if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage");
}
private void makeMessageReadyForReinjection(MessageContext messageContext) {
Modified: webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties Tue Nov 6 01:05:28 2007
@@ -222,7 +222,6 @@
toBeanNotSet=The 'To' Sequence Property Bean has not been set for the sequence.
cannotFindTransportInDesc=Cannot find the transport in description {0} in the ConfigurationContext.
invalidElementFoundWithinElement=Found invalid ''{0}'' element within ''{1}'' element.
-invokerNotFound=An invoker thread was not found to dispatch messages on the inbound sequence {0}.
cannotSetPolicyBeanServiceNull=Cannot set the given SandeshaPolicyBean since the AxisService is not present
#------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org