You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2007/09/11 08:55:52 UTC
svn commit: r574490 - in
/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2:
handlers/ msgprocessors/ polling/ storage/ storage/inmemory/ util/ workers/
Author: mckierna
Date: Mon Sep 10 23:55:50 2007
New Revision: 574490
URL: http://svn.apache.org/viewvc?rev=574490&view=rev
Log:
performance improvements
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Sep 10 23:55:50 2007
@@ -84,10 +84,11 @@
return returnValue ;
}
}
-
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
//this will change the execution chain of this message to work correctly in retransmissions.
//For e.g. Phases like security will be removed to be called in each retransmission.
- SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
+ SandeshaUtil.modifyExecutionChainForStoring(msgCtx, storageManager);
String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
if (null != DONE && "true".equals(DONE)) {
@@ -97,7 +98,6 @@
}
msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
Transaction transaction = null;
@@ -126,7 +126,7 @@
}
if (msgProcessor != null){
- if(msgProcessor.processOutMessage(rmMsgCtx)){
+ if(msgProcessor.processOutMessage(rmMsgCtx, transaction)){
//the msg was paused
returnValue = InvocationResponse.SUSPEND;
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Sep 10 23:55:50 2007
@@ -266,7 +266,7 @@
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
// passing the message through sandesha2sender
- SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+ SandeshaUtil.executeAndStore(ackRMMsgCtx, key, storageManager);
// inserting the new Ack.
senderBeanMgr.insert(ackBean);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Sep 10 23:55:50 2007
@@ -116,6 +116,12 @@
String outSequenceId = sequenceAck.getIdentifier().getIdentifier();
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
+ if(rmsBean==null){
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader, Sequence bean not found");
+ return;
+ }
+
if (outSequenceId == null || "".equals(outSequenceId)) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outSeqIDIsNull);
log.debug(message);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -51,6 +51,9 @@
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
import org.apache.sandesha2.wsrm.CreateSequence;
import org.apache.sandesha2.wsrm.SequenceOffer;
@@ -82,7 +85,7 @@
return false;
}
- public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction tran) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
@@ -252,7 +255,7 @@
// server and the client sides.
if (rmsBean == null) {
rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
- rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+ rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager, tran);
}
}
@@ -405,7 +408,7 @@
// processing the response if not an dummy.
if (!dummyMessage)
- processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager);
+ processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager, tran);
//Users wont be able to get reliable response msgs in the back channel in the back channel of a
//reliable message. If he doesn't have a endpoint he should use polling mechanisms.
@@ -417,7 +420,7 @@
}
private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, RMSBean rmsBean,
- StorageManager storageManager) throws AxisFault {
+ StorageManager storageManager, Transaction tran) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
@@ -484,7 +487,7 @@
createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
+ SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey, storageManager);
retransmitterMgr.insert(createSeqEntry);
@@ -497,7 +500,7 @@
}
private void processResponseMessage(RMMsgContext rmMsg, RMSBean rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
- String storageKey, StorageManager storageManager) throws AxisFault {
+ String storageKey, StorageManager storageManager, Transaction tran) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", " + outSequenceID);
@@ -521,6 +524,11 @@
}
}
+ boolean sendingNow = false;
+ if(outSequenceID != null && !storageManager.hasUserTransaction(msg)) {
+ sendingNow = true;
+ }
+
// Now that we have decided which sequence to use for the message, make sure that we secure
// it with the correct token.
RMMsgCreator.secureOutboundMessage(rmsBean, msg);
@@ -563,11 +571,35 @@
// increasing the current handler index, so that the message will not be
// going throught the SandeshaOutHandler again.
msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
-
- SandeshaUtil.executeAndStore(rmMsg, storageKey);
+ SandeshaUtil.executeAndStore(rmMsg, storageKey, storageManager);
+
+ // Lock the sender bean before we insert it, if we are planning to send it ourselves
+ SenderWorker worker = null;
+ if(sendingNow) {
+ String workId = appMsgEntry.getMessageID() + appMsgEntry.getTimeToSend();
+ SandeshaThread sender = storageManager.getSender();
+ ConfigurationContext context = msg.getConfigurationContext();
+ WorkerLock lock = sender.getWorkerLock();
+
+ worker = new SenderWorker(context, appMsgEntry, rmsBean.getRMVersion());
+ worker.setLock(lock);
+ worker.setWorkId(workId);
+ // Actually take the lock
+ lock.addWork(workId, worker);
+ }
retransmitterMgr.insert(appMsgEntry);
+ // Commit the transaction, so that the sender worker starts with a clean slate.
+ if(tran != null && tran.isActive()) tran.commit();
+
+ if(worker != null) {
+ try {
+ worker.run();
+ } catch(Exception e) {
+ log.error("Caught exception running SandeshaWorker", e);
+ }
+ }
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon Sep 10 23:55:50 2007
@@ -141,7 +141,7 @@
return false;
}
- public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: CloseSequenceProcessor::processOutMessage");
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -343,7 +343,7 @@
return true;
}
- public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
if (log.isDebugEnabled())
log.debug("Enter: CreateSeqMsgProcessor::processOutMessage");
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -262,7 +262,7 @@
return true;
}
- public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
if (log.isDebugEnabled()) {
log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java Mon Sep 10 23:55:50 2007
@@ -51,7 +51,7 @@
return true;
}
- public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
return false;
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Mon Sep 10 23:55:50 2007
@@ -211,7 +211,7 @@
messagePending.toSOAPEnvelope(returnMessage.getEnvelope());
}
- public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
return false;
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -40,5 +40,5 @@
* @return true if the msg context has been paused
* @throws AxisFault
*/
- public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
-}
\ No newline at end of file
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction)throws AxisFault;
+}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Mon Sep 10 23:55:50 2007
@@ -28,6 +28,7 @@
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
@@ -345,6 +346,18 @@
RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, timeToSend, storageManager);
+
+ // If the MEP doesn't need the backchannel, and nor do we, we should signal it so that it
+ // can close off as soon as possible.
+ result = InvocationResponse.ABORT;
+ RequestResponseTransport t = null;
+ t = (RequestResponseTransport) rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ // Tell the transport that there will be no response message
+ if(t != null) {
+ TransportUtils.setResponseWritten(msgCtx, false);
+ t.acknowledgeMessage(msgCtx);
+ }
}
// If this message matches the WSRM 1.0 pattern for an empty last message (e.g.
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -349,7 +349,7 @@
return terminateSeqResponseRMMsg;
}
- public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -89,7 +89,7 @@
return true;
}
- public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
return false;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java Mon Sep 10 23:55:50 2007
@@ -149,17 +149,18 @@
// This sequence must have been terminated, or deleted
stopThreadForSequence(entry.getSequenceId(), true);
} else {
- if (log.isDebugEnabled())
- log.debug("Polling rms " + beanToPoll);
+ if (log.isDebugEnabled())
+ log.debug("Polling rms " + beanToPoll);
// The sequence is there, but we still only poll if we are expecting reply messages,
// or if we don't have clean ack state.
- boolean cleanAcks = false;
- if (beanToPoll.getNextMessageNumber() > -1)
- cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
+ boolean cleanAcks = false;
+ if (beanToPoll.getNextMessageNumber() > -1)
+ cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
long repliesExpected = beanToPoll.getExpectedReplies();
- if((force || !cleanAcks || repliesExpected > 0) && beanToPoll.getReferenceMessageStoreKey() != null)
- pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
+ if(beanToPoll.getSequenceID() != null && (force || !cleanAcks || repliesExpected > 0) && beanToPoll.getReferenceMessageStoreKey() != null)
+ pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
}
+
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide");
}
@@ -262,7 +263,7 @@
//this message should not be sent until it is qualified. I.e. till it is sent through the Sandesha2TransportSender.
makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- SandeshaUtil.executeAndStore(makeConnectionRMMessage, makeConnectionMsgStoreKey);
+ SandeshaUtil.executeAndStore(makeConnectionRMMessage, makeConnectionMsgStoreKey, storageManager);
senderBeanMgr.insert(makeConnectionSenderBean);
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java Mon Sep 10 23:55:50 2007
@@ -100,5 +100,16 @@
public abstract MessageContext retrieveMessageContext (String storageKey, ConfigurationContext configContext) throws SandeshaStorageException;
public abstract void removeMessageContext (String storageKey) throws SandeshaStorageException;
-
+
+
+ /**
+ * If there is no user transaction in scope then we can optimize the sending / invoking of a
+ * message. This method allows the StorageManager to tell the core Sandesha code if there
+ * is a transaction in scope.
+ * @return true, if there is a user transaction in scope.
+ */
+ public abstract boolean hasUserTransaction(MessageContext message) throws SandeshaStorageException;
+
+ public abstract boolean requiresMessageSerialization();
+
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Mon Sep 10 23:55:50 2007
@@ -338,6 +338,15 @@
public void initStorage (AxisModule moduleDesc) {
}
+
+ //We do not support user transactions in-memory
+ public boolean hasUserTransaction(MessageContext msg) {
+ return false;
+ }
+
+ public boolean requiresMessageSerialization() {
+ return useSerialization;
+ }
private class SerializedStorageEntry {
MessageContext message;
@@ -355,6 +364,7 @@
SOAPEnvelope envelope;
}
}
+
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java Mon Sep 10 23:55:50 2007
@@ -18,9 +18,7 @@
package org.apache.sandesha2.util;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.Set;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
@@ -106,63 +104,48 @@
if(log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
return transaction;
}
-
- // From here on, we must be dealing with a real address. Piggyback all sequences that have an
- // acksTo that matches the To address, and that have an ackMessage queued up for sending.
- Set acked = new HashSet();
- SenderBean findBean = new SenderBean();
- findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
- findBean.setSend(true);
- findBean.setToAddress(target.getAddress());
-
- Collection collection = retransmitterBeanMgr.find(findBean);
-
- if (transaction != null && transaction.isActive())
- transaction.commit();
-
- transaction = storageManager.getTransaction();
-
- Iterator it = collection.iterator();
- while (it.hasNext()) {
- SenderBean ackBean = (SenderBean) it.next();
-
+ // From here on, we must be dealing with a real address. Piggyback all sequences that have an
+ // acksTo that matches the To address, and that have an ackMessage queued up for sending. We
+ // search for RMDBeans first, to avoid a deadlock.
+ //
+ // As a special case, if this is a terminate sequence message then add in ack messages for
+ // any sequences that have an acksTo that matches the target address. This helps to ensure
+ // that request-response sequence pairs end cleanly.
+ RMDBean findRMDBean = new RMDBean();
+ findRMDBean.setAcksToEPR(target.getAddress());
+ findRMDBean.setTerminated(false);
+ Collection rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
+ Iterator sequences = rmdBeans.iterator();
+ while(sequences.hasNext()) {
+ RMDBean sequence = (RMDBean) sequences.next();
+ String sequenceId = sequence.getSequenceID();
+
+ // Look for the SenderBean that carries the ack, there should be at most one
+ SenderBean findBean = new SenderBean();
+ findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+ findBean.setSend(true);
+ findBean.setSequenceID(sequenceId);
+ findBean.setToAddress(target.getAddress());
+
+ SenderBean ackBean = retransmitterBeanMgr.findUnique(findBean);
+
// Piggybacking will happen only if the end of ack interval (timeToSend) is not reached.
long timeNow = System.currentTimeMillis();
- if (ackBean.getTimeToSend() > timeNow) {
+ if (ackBean != null && ackBean.getTimeToSend() > timeNow) {
// Delete the beans that would have sent the ack
retransmitterBeanMgr.delete(ackBean.getMessageID());
storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
- String sequenceId = ackBean.getSequenceID();
if (log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
+ RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence);
+
+ } else if(rmMessageContext.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+ if(log.isDebugEnabled()) log.debug("Adding extra acks, as this is a terminate");
+
+ if(sequence.getHighestInMessageNumber() > 0) {
+ if(log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
- RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
- if(rmdBean != null && !rmdBean.isTerminated()) {
- RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, rmdBean);
- }
- acked.add(sequenceId);
- }
- }
-
- // As a special case, if this is a terminate sequence message then add in ack messages for
- // any sequences that have an acksTo that matches the target address. This helps to ensure
- // that request-response sequence pairs end cleanly.
- if(rmMessageContext.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
- if(log.isDebugEnabled()) log.debug("Adding extra acks, as this is a terminate");
-
- RMDBean findRMDBean = new RMDBean();
- findRMDBean.setAcksToEPR(target.getAddress());
- findRMDBean.setTerminated(false);
- Collection rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
- Iterator sequences = rmdBeans.iterator();
- while(sequences.hasNext()) {
- RMDBean sequence = (RMDBean) sequences.next();
- String sequenceId = sequence.getSequenceID();
-
- if(!acked.contains(sequenceId) && sequence.getHighestInMessageNumber() > 0) {
- if(log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence);
- acked.add(sequenceId);
}
}
}
@@ -313,7 +296,7 @@
// passing the message through sandesha2sender
ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
- SandeshaUtil.executeAndStore(ackRMMsgContext, key);
+ SandeshaUtil.executeAndStore(ackRMMsgContext, key, storageManager);
// inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Mon Sep 10 23:55:50 2007
@@ -25,6 +25,7 @@
import java.util.Map;
import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import org.apache.axiom.om.OMElement;
@@ -883,7 +884,7 @@
else
return false;
}
- public static void executeAndStore (RMMsgContext rmMsgContext, String storageKey) throws AxisFault {
+ public static void executeAndStore (RMMsgContext rmMsgContext, String storageKey, StorageManager manager) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: SandeshaUtil::executeAndStore, " + storageKey);
@@ -891,7 +892,7 @@
ConfigurationContext configurationContext = msgContext.getConfigurationContext();
SandeshaPolicyBean policy = getPropertyBean(msgContext.getAxisOperation());
- if(policy.isUseMessageSerialization()) {
+ if(manager.requiresMessageSerialization()) {
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_TRUE);
StorageManager store = getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration());
@@ -908,27 +909,26 @@
Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
msgContext.setTransportOut(sandesha2TransportOutDesc);
+
+ //this invocation has to be a blocking one.
+ Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+ if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
// sending the message once through Sandesha2TransportSender.
if (msgContext.isPaused())
AxisEngine.resumeSend(msgContext);
else {
- //this invocation has to be a blocking one.
-
- Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
- if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
- msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
-
- AxisEngine.send(msgContext);
-
- msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
+ AxisEngine.send(msgContext);
}
+ //put the original value of isTransportNonBlocking back on
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
}
if (log.isDebugEnabled())
log.debug("Exit: SandeshaUtil::executeAndStore");
}
- public static void modifyExecutionChainForStoring (MessageContext message)
+ public static void modifyExecutionChainForStoring (MessageContext message, StorageManager manager)
throws SandeshaException
{
@@ -936,8 +936,7 @@
if (property!=null)
return; //Phases are already set. Dont hv to redo.
- SandeshaPolicyBean policy = getPropertyBean(message.getAxisOperation());
- if(policy.isUseMessageSerialization())
+ if(manager.requiresMessageSerialization())
return; // No need to mess with the transport when we use message serialization
TransportOutDescription transportOutDescription = message.getTransportOut();
@@ -1120,40 +1119,53 @@
}
- public static SOAPEnvelope cloneEnvelope(SOAPEnvelope envelope) throws SandeshaException {
-
- // Now clone the env and set it in the message context
- XMLStreamReader streamReader = envelope.cloneOMElement().getXMLStreamReader();
- SOAPEnvelope clonedEnvelope = new StAXSOAPModelBuilder(streamReader, null).getSOAPEnvelope();
-
- // you have to explicitely set the 'processed' attribute for header
- // blocks, since it get lost in the above read from the stream.
-
- SOAPHeader header = envelope.getHeader();
- if (header != null) {
- Iterator childrenOfOldEnv = header.getChildElements();
- Iterator childrenOfNewEnv = clonedEnvelope.getHeader().getChildElements();
- while (childrenOfOldEnv.hasNext()) {
-
- SOAPHeaderBlock oldEnvHeaderBlock = (SOAPHeaderBlock) childrenOfOldEnv.next();
- SOAPHeaderBlock newEnvHeaderBlock = (SOAPHeaderBlock) childrenOfNewEnv.next();
+ public static SOAPEnvelope cloneEnvelope(SOAPEnvelope envelope) throws SandeshaException {
+
+ // Now clone the env and set it in the message context. We need to be sure that we
+ // close off the stream reader, in order to free up some of the heap.
+ XMLStreamReader streamReader = null;
+ SOAPEnvelope clonedEnvelope = null;
+ try {
+ streamReader = envelope.getXMLStreamReader();
+ clonedEnvelope = new StAXSOAPModelBuilder(streamReader, null).getSOAPEnvelope();
+ // you have to explicitely set the 'processed' attribute for header
+ // blocks, since it get lost in the above read from the stream.
+
+ SOAPHeader header = envelope.getHeader();
+ if (header != null) {
+ Iterator childrenOfOldEnv = header.getChildElements();
+ Iterator childrenOfNewEnv = clonedEnvelope.getHeader().getChildElements();
+ while (childrenOfOldEnv.hasNext()) {
+ SOAPHeaderBlock oldEnvHeaderBlock = (SOAPHeaderBlock) childrenOfOldEnv.next();
+ SOAPHeaderBlock newEnvHeaderBlock = (SOAPHeaderBlock) childrenOfNewEnv.next();
+
+ QName oldEnvHeaderBlockQName = oldEnvHeaderBlock.getQName();
+ if (oldEnvHeaderBlockQName != null) {
+ if (oldEnvHeaderBlockQName.equals(newEnvHeaderBlock.getQName())) {
+ if (oldEnvHeaderBlock.isProcessed())
+ newEnvHeaderBlock.setProcessed();
+ } else {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cloneDoesNotMatchToOriginal);
+ throw new SandeshaException(message);
+ }
+ }
+ }
+ }
+ // Completely build the new tree
+ clonedEnvelope.build();
+ } finally {
+ if(streamReader != null) {
+ try {
+ streamReader.close();
+ } catch(XMLStreamException e) {
+ log.debug("Caught exception closing stream", e);
+ }
+ }
- QName oldEnvHeaderBlockQName = oldEnvHeaderBlock.getQName();
- if (oldEnvHeaderBlockQName != null) {
- if (oldEnvHeaderBlockQName.equals(newEnvHeaderBlock.getQName())) {
- if (oldEnvHeaderBlock.isProcessed())
- newEnvHeaderBlock.setProcessed();
- } else {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cloneDoesNotMatchToOriginal);
- throw new SandeshaException(message);
- }
- }
- }
- }
-
- return clonedEnvelope;
- }
-
+ }
+ return clonedEnvelope;
+ }
+
public static final String getStackTraceFromException(Exception e) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Mon Sep 10 23:55:50 2007
@@ -366,7 +366,7 @@
// terminateRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY, sequencePropertyKey);
// / addTerminateSeqTransaction.commit();
- SandeshaUtil.executeAndStore(terminateRMMessage, key);
+ SandeshaUtil.executeAndStore(terminateRMMessage, key, storageManager);
SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java Mon Sep 10 23:55:50 2007
@@ -167,7 +167,7 @@
SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
- SandeshaUtil.executeAndStore(rmMsgCtx, key);
+ SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
retramsmitterMgr.insert(senderBean);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java Mon Sep 10 23:55:50 2007
@@ -124,7 +124,7 @@
//adding the workId to the lock after assigning it to a thread makes sure
//that all the workIds in the Lock are handled by threads.
- getWorkerLock().addWork(workId);
+ getWorkerLock().addWork(workId, worker);
long msgNumber = invoker.getMsgNo();
//if necessary, update the "next message number" bean under this transaction
@@ -349,12 +349,14 @@
if(contextMgr != null) {
work = contextMgr.wrapWithContext(work, bean.getContext());
}
- threadPool.execute(work);
-
- //adding the workId to the lock after assigning it to a thread makes sure
- //that all the workIds in the Lock are handled by threads.
- getWorkerLock().addWork(workId);
-
+ try {
+ // Set the lock up before we start the thread, but roll it back
+ // if we hit any problems
+ getWorkerLock().addWork(workId, worker);
+ threadPool.execute(work);
+ } catch(Exception e) {
+ getWorkerLock().removeWork(workId);
+ }
processedMessage = true;
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Mon Sep 10 23:55:50 2007
@@ -1,5 +1,7 @@
package org.apache.sandesha2.workers;
+import org.apache.axiom.om.impl.builder.StAXBuilder;
+import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
@@ -7,6 +9,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.Handler.InvocationResponse;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.commons.logging.Log;
@@ -85,19 +88,30 @@
&& Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
postFailureInvocation = true;
+ InvocationResponse response = null;
if (postFailureInvocation) {
makeMessageReadyForReinjection(msgToInvoke);
if (log.isDebugEnabled())
log.debug("Receiving message, key=" + messageContextKey + ", msgCtx="
+ msgToInvoke.getEnvelope().getHeader());
- AxisEngine.receive(msgToInvoke);
+ response = AxisEngine.receive(msgToInvoke);
} else {
if (log.isDebugEnabled())
log.debug("Resuming message, key=" + messageContextKey + ", msgCtx="
+ msgToInvoke.getEnvelope().getHeader());
msgToInvoke.setPaused(false);
- AxisEngine.resumeReceive(msgToInvoke);
+ response = AxisEngine.resumeReceive(msgToInvoke);
}
+ if(!InvocationResponse.SUSPEND.equals(response)) {
+ // Performance work - need to close the XMLStreamReader to prevent GC thrashing.
+ SOAPEnvelope env = msgToInvoke.getEnvelope();
+ if(env!=null){
+ StAXBuilder sb = (StAXBuilder)msgToInvoke.getEnvelope().getBuilder();
+ if(sb!=null){
+ sb.close();
+ }
+ }
+ }
} catch (Exception e) {
if (log.isDebugEnabled())
@@ -105,7 +119,6 @@
handleFault(rmMsg, e);
}
-
transaction = storageManager.getTransaction();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java Mon Sep 10 23:55:50 2007
@@ -131,8 +131,10 @@
}
}
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaThread::stopRunning, " + this);
+ // In a unit test, tracing 'this' once the thread was stopped caused
+ // an exception, so we just trace exit.
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaThread::stopRunning");
}
public synchronized boolean isThreadStarted() {
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Mon Sep 10 23:55:50 2007
@@ -200,12 +200,15 @@
SenderWorker worker = new SenderWorker(context, senderBean, rmVersion);
worker.setLock(getWorkerLock());
worker.setWorkId(workId);
- threadPool.execute(worker);
-
- // adding the workId to the lock after assigning it to a thread
- // makes sure
- // that all the workIds in the Lock are handled by threads.
- getWorkerLock().addWork(workId);
+
+ try {
+ // Set the lock up before we start the thread, but roll it back
+ // if we hit any problems
+ getWorkerLock().addWork(workId, worker);
+ threadPool.execute(worker);
+ } catch(Exception e) {
+ getWorkerLock().removeWork(workId);
+ }
// If we got to here then we found work to do on the sequence, so we should
// remember not to sleep at the end of the list of sequences.
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Mon Sep 10 23:55:50 2007
@@ -4,6 +4,7 @@
import java.util.HashMap;
import java.util.Iterator;
+import org.apache.axiom.om.impl.builder.StAXBuilder;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
@@ -76,6 +77,12 @@
if (log.isDebugEnabled())
log.debug("Enter: SenderWorker::run");
+
+ // If we are not the holder of the correct lock, then we have to stop
+ if(lock != null && !lock.ownsLock(workId, this)) {
+ if (log.isDebugEnabled()) log.debug("Exit: SenderWorker::run, another worker holds the lock");
+ return;
+ }
Transaction transaction = null;
@@ -253,8 +260,7 @@
try {
InvocationResponse response = InvocationResponse.CONTINUE;
- SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation());
- if(policy.isUseMessageSerialization()) {
+ if(storageManager.requiresMessageSerialization()) {
if(msgCtx.isPaused()) {
if (log.isDebugEnabled())
log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
@@ -268,12 +274,6 @@
AxisEngine.send(msgCtx); // TODO check if this should return an invocation response
}
} else {
- // had to fully build the SOAP envelope to support
- // retransmissions.
- // Otherwise a 'parserAlreadyAccessed' exception could
- // get thrown in retransmissions.
- // But this has a performance reduction.
- msgCtx.getEnvelope().build();
ArrayList retransmittablePhases = (ArrayList) msgCtx.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
if (retransmittablePhases!=null) {
@@ -289,6 +289,7 @@
if (log.isDebugEnabled())
log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
+ msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
response = AxisEngine.resumeSend(msgCtx);
}
if(log.isDebugEnabled()) log.debug("Engine resume returned " + response);
@@ -647,9 +648,21 @@
responseMessageContext.setSoapAction("");
}
+ InvocationResponse response = null;
+
if (resenvelope!=null) {
- AxisEngine.receive(responseMessageContext);
+ response = AxisEngine.receive(responseMessageContext);
}
+ if(!InvocationResponse.SUSPEND.equals(response)) {
+ // Performance work - need to close the XMLStreamReader to prevent GC thrashing.
+ SOAPEnvelope env = responseMessageContext.getEnvelope();
+ if(env!=null){
+ StAXBuilder sb = (StAXBuilder)responseMessageContext.getEnvelope().getBuilder();
+ if(sb!=null){
+ sb.close();
+ }
+ }
+ }
} catch (Exception e) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java Mon Sep 10 23:55:50 2007
@@ -1,25 +1,47 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
package org.apache.sandesha2.workers;
-import java.util.ArrayList;
+import java.util.HashMap;
public class WorkerLock {
- public ArrayList workList = null;
+ private HashMap locks = new HashMap();
public WorkerLock () {
- workList = new ArrayList ();
+
}
- public synchronized void addWork (String work) {
- workList.add(work);
- }
+ public synchronized void addWork (String work, Object owner) {
+ if(locks.containsKey(work)) return;
+ locks.put(work, owner);
+ }
public synchronized void removeWork (String work) {
- workList.remove(work);
+ locks.remove(work);
}
public synchronized boolean isWorkPresent (String work) {
- return workList.contains(work);
+ return locks.containsKey(work);
}
+
+ public synchronized boolean ownsLock(String work, Object owner) {
+ Object realOwner = locks.get(work);
+ return realOwner == owner;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org