You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2007/05/21 14:17:46 UTC
svn commit: r540130 - in /webservices/sandesha/trunk/java: config/
modules/core/src/main/java/org/apache/sandesha2/client/
modules/core/src/main/java/org/apache/sandesha2/handlers/
modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ modules/...
Author: gatfora
Date: Mon May 21 05:17:40 2007
New Revision: 540130
URL: http://svn.apache.org/viewvc?view=rev&rev=540130
Log:
Allow finer transactional control to stop window where acks are sent before transactions commit. This reduces the time the transactions hold onto locks. Finally improve the transaction commit/rollback handling when exceptions are thrown
Modified:
webservices/sandesha/trunk/java/config/log4j.properties
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java
Modified: webservices/sandesha/trunk/java/config/log4j.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/config/log4j.properties?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/config/log4j.properties (original)
+++ webservices/sandesha/trunk/java/config/log4j.properties Mon May 21 05:17:40 2007
@@ -1,6 +1,6 @@
# Set root category priority to INFO and its only appender to CONSOLE.
-#log4j.rootCategory=INFO, CONSOLE
-log4j.rootCategory=DEBUG, CONSOLE, LOGFILE
+log4j.rootCategory=INFO, CONSOLE
+#log4j.rootCategory=DEBUG, CONSOLE, LOGFILE
# Set the enterprise logger category to FATAL and its only appender to CONSOLE.
#log4j.logger.org.apache.axis2.enterprise=FATAL, CONSOLE
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java Mon May 21 05:17:40 2007
@@ -173,14 +173,15 @@
sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_ESTABLISHED);
fillOutgoingSequenceInfo(sequenceReport, rMSBean, storageManager);
+
+ if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.commit();
+ reportTransaction = null;
} catch (Exception e) {
- if (reportTransaction!=null) {
- reportTransaction.rollback();
- reportTransaction = null;
- }
+ // Just log the exception
+ if(log.isDebugEnabled()) log.debug("Exception", e);
} finally {
- if (reportTransaction!=null) reportTransaction.commit();
+ if (reportTransaction!=null && reportTransaction.isActive()) reportTransaction.rollback();
}
return sequenceReport;
@@ -276,14 +277,15 @@
sandeshaReport.addToNoOfCompletedMessagesMap(sequenceID, sequenceReport.getCompletedMessages().size());
sandeshaReport.addToSequenceStatusMap(sequenceID, sequenceReport.getSequenceStatus());
}
+
+ if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.commit();
+ reportTransaction = null;
} catch (Exception e) {
- if (reportTransaction!=null) {
- reportTransaction.rollback();
- reportTransaction = null;
- }
+ // just log the error
+ if(log.isDebugEnabled()) log.debug("Exception", e);
} finally {
- if (reportTransaction!=null) reportTransaction.commit();
+ if (reportTransaction!=null && reportTransaction.isActive()) reportTransaction.rollback();
}
return sandeshaReport;
@@ -381,9 +383,10 @@
if (log.isTraceEnabled())
log.trace("Checking if sequence " + internalSequenceId + " previously terminated");
- Transaction tran = storageManager.getTransaction();
+ Transaction tran = null;
try {
+ tran = storageManager.getTransaction();
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
//see if the sequence is terminated
@@ -400,16 +403,14 @@
// Delete the rmsBean
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
}
+
+ if(tran != null && tran.isActive()) tran.commit();
+ tran = null;
- } catch (SandeshaException e) {
- if(tran!=null)
+ } finally {
+ if(tran!=null && tran.isActive())
tran.rollback();
- tran = null;
-
- throw e;
- }
- if(tran!=null)
- tran.commit();
+ }
}
/**
@@ -495,7 +496,8 @@
} catch (AxisFault e) {
String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.couldNotSendTerminate, e.toString());
+ SandeshaMessageKeys.couldNotSendTerminate,
+ e.toString());
throw new SandeshaException(message, e);
} finally {
options.setAction(oldAction);
@@ -692,7 +694,7 @@
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
// Get a transaction to retrieve the properties
- Transaction transaction = storageManager.getTransaction();
+ Transaction transaction = null;
String sequenceID = null;
try
@@ -880,13 +882,14 @@
invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, allowLaterDeliveryOfMissingMessages);
}
+ if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.commit();
+ reportTransaction = null;
+
} catch (Exception e) {
- if (reportTransaction!=null) {
- reportTransaction.rollback();
- reportTransaction = null;
- }
+ // Just log the exception
+ if(log.isDebugEnabled()) log.debug("Exception", e);
} finally {
- if(reportTransaction != null) reportTransaction.commit();
+ if(reportTransaction != null && reportTransaction.isActive()) reportTransaction.rollback();
}
}
@@ -1023,15 +1026,18 @@
if(rmdBean.getSecurityTokenData() != null) sequenceReport.setSecureSequence(true);
+ if (reportTransaction!=null && reportTransaction.isActive()) reportTransaction.commit();
+ reportTransaction = null;
+
return sequenceReport;
} catch (Exception e) {
- if (reportTransaction!=null) {
+ // Just log the exception
+ if(log.isDebugEnabled()) log.debug("Exception", e);
+ } finally {
+ if(reportTransaction != null && reportTransaction.isActive()) {
reportTransaction.rollback();
- reportTransaction = null;
}
- } finally {
- if (reportTransaction!=null) reportTransaction.commit();
}
return null;
@@ -1365,14 +1371,15 @@
String soapNamespaceURI = null;
// Get the RMSBean for this sequence.
- Transaction transaction = storageManager.getTransaction();
+ Transaction transaction = null;
- try {
+ try {
+ transaction = storageManager.getTransaction();
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
if (rmsBean.getSoapVersion() == Sandesha2Constants.SOAPVersion.v1_2)
soapNamespaceURI = SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI;
} finally {
- transaction.commit();
+ if(transaction != null) transaction.commit();
}
return soapNamespaceURI;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Mon May 21 05:17:40 2007
@@ -152,9 +152,10 @@
SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getConfigurationContext(),
rmMsgCtx.getConfigurationContext().getAxisConfiguration());
- Transaction transaction = storageManager.getTransaction();
+ Transaction transaction = null;
try {
+ transaction = storageManager.getTransaction();
// Check that both the Sequence header and message body have been secured properly
RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
@@ -219,11 +220,12 @@
rmMsgCtx.getMessageContext().getAxisService());
rmMsgCtx.getMessageContext().setAxisOperation(duplicateMessageOperation);
}
- transaction.commit();
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
}
finally {
- if (transaction != null)
+ if (transaction != null && transaction.isActive())
transaction.rollback();
}
if (log.isDebugEnabled())
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaInHandler.java Mon May 21 05:17:40 2007
@@ -111,7 +111,7 @@
MessageValidator.validateMessage(rmMsgCtx, storageManager);
// commit the current transaction
- transaction.commit();
+ if(transaction != null && transaction.isActive()) transaction.commit();
transaction = storageManager.getTransaction();
// Process Ack headers in the message
@@ -119,7 +119,7 @@
ackProcessor.processAckHeaders(rmMsgCtx);
// commit the current transaction
- transaction.commit();
+ if(transaction != null && transaction.isActive()) transaction.commit();
transaction = storageManager.getTransaction();
// Process Ack Request headers in the message
@@ -133,13 +133,17 @@
pendingProcessor.processMessagePendingHeaders(rmMsgCtx);
// commit the current transaction
- transaction.commit();
+ if(transaction != null && transaction.isActive()) transaction.commit();
transaction = storageManager.getTransaction();
// Process the Sequence header, if there is one
SequenceProcessor seqProcessor = new SequenceProcessor();
- returnValue = seqProcessor.processSequenceHeader(rmMsgCtx);
+ returnValue = seqProcessor.processSequenceHeader(rmMsgCtx, transaction);
+ // commit the current transaction
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("SandeshaInHandler::invoke Exception caught during processInMessage", e);
@@ -147,16 +151,6 @@
msgCtx.pause();
returnValue = InvocationResponse.SUSPEND;
- if (transaction != null) {
- try {
- transaction.rollback();
- transaction = null;
- } catch (Exception e1) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
- log.debug(message, e);
- }
- }
-
// Rethrow the original exception if it is an AxisFault
if (e instanceof AxisFault)
throw (AxisFault)e;
@@ -166,15 +160,17 @@
}
finally {
if (log.isDebugEnabled()) log.debug("SandeshaInHandler::invoke Doing final processing");
- if (transaction != null) {
+ if (transaction != null && transaction.isActive()) {
try {
- transaction.commit();
+ transaction.rollback();
+ transaction = null;
} catch (Exception e) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
log.debug(message, e);
}
}
}
+
if (log.isDebugEnabled())
log.debug("Exit: SandeshaInHandler::invoke " + returnValue);
return returnValue;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon May 21 05:17:40 2007
@@ -147,34 +147,29 @@
opCtx.setProperty(RequestResponseTransport.HOLD_RESPONSE, Boolean.TRUE);
}
+ if (transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+
} catch (Exception e) {
// message should not be sent in a exception situation.
msgCtx.pause();
returnValue = InvocationResponse.SUSPEND;
- // rolling back the transaction
- if (transaction != null) {
- try {
- transaction.rollback();
- transaction = null;
- } catch (Exception e1) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
- log.debug(message, e);
- }
- }
-
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
throw new AxisFault(message, e);
+
} finally {
- if (transaction != null) {
+ // roll back the transaction
+ if (transaction != null && transaction.isActive()) {
try {
- transaction.commit();
- } catch (Exception e) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
- log.debug(message, e);
+ transaction.rollback();
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+ log.debug(message, e1);
}
}
}
+
if (log.isDebugEnabled())
log.debug("Exit: SandeshaOutHandler::invoke " + returnValue);
return returnValue;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon May 21 05:17:40 2007
@@ -40,6 +40,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
@@ -72,7 +73,7 @@
this.inboundMessageNumber = inboundMessageNumber;
}
- public boolean processInMessage(RMMsgContext rmMsgCtx) {
+ public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
if (log.isDebugEnabled()) {
log.debug("Enter: ApplicationMsgProcessor::processInMessage");
log.debug("Exit: ApplicationMsgProcessor::processInMessage");
@@ -287,11 +288,9 @@
lastMessage = true;
}
}
- if (lastMessage) {
- rmsBean.setLastOutMessage(messageNumber);
- // Update the rmsBean
- storageManager.getRMSBeanMgr().update(rmsBean);
- }
+
+ if (lastMessage)
+ rmsBean.setLastOutMessage(messageNumber);
// set this as the response highest message.
rmsBean.setHighestOutMessageNumber(messageNumber);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon May 21 05:17:40 2007
@@ -37,6 +37,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
@@ -57,7 +58,7 @@
private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
- public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: CloseSequenceProcessor::processInMessage");
@@ -120,6 +121,14 @@
closeSequenceResponseMsg.setResponseWritten(true);
closeSeqResponseRMMsg.addSOAPEnvelope();
+
+ //
+ // Now that we have generated the message we can commit the transaction
+ //
+ if(transaction != null && transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
AxisEngine engine = new AxisEngine(closeSequenceMsg.getConfigurationContext());
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Mon May 21 05:17:40 2007
@@ -41,6 +41,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.util.FaultManager;
@@ -64,7 +65,7 @@
private static final Log log = LogFactory.getLog(CreateSeqMsgProcessor.class);
- public boolean processInMessage(RMMsgContext createSeqRMMsg) throws AxisFault {
+ public boolean processInMessage(RMMsgContext createSeqRMMsg, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: CreateSeqMsgProcessor::processInMessage");
@@ -259,6 +260,10 @@
SandeshaUtil.startWorkersForSequence(context, rmdBean);
+ //
+ // We have done all of our updates, so commit the transaction
+ if(transaction != null && transaction.isActive()) transaction.commit();
+
AxisEngine engine = new AxisEngine(context);
try{
engine.send(outMessage);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Mon May 21 05:17:40 2007
@@ -38,6 +38,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -57,7 +58,7 @@
private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
- public boolean processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
+ public boolean processInMessage(RMMsgContext createSeqResponseRMMsgCtx, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java Mon May 21 05:17:40 2007
@@ -9,18 +9,15 @@
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.wsrm.Sequence;
public class LastMessageProcessor {
- public boolean processLastMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public static void processLastMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (!Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmMsgCtx.getRMSpecVersion()))
- return true;
+ return;
MessageContext msgContext = rmMsgCtx.getMessageContext();
- Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String sequenceId = sequence.getIdentifier().getIdentifier();
MessageContext outMessage = MessageContextBuilder.createOutMessageContext(msgContext);
//add the SOAP envelope with body null
@@ -37,7 +34,6 @@
AxisEngine engine = new AxisEngine (rmMsgCtx.getConfigurationContext());
engine.send(outMessage);
- return true;
}
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Mon May 21 05:17:40 2007
@@ -20,6 +20,7 @@
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -47,7 +48,7 @@
* A message is selected by the set of SenderBeans that are waiting to be sent.
* This is processed using a SenderWorker.
*/
- public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::processInMessage " + rmMsgCtx.getSOAPEnvelope().getBody());
MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION);
@@ -110,7 +111,7 @@
return false;
}
- replyToPoll(rmMsgCtx, senderBean, storageManager, pending, makeConnection.getNamespaceValue());
+ replyToPoll(rmMsgCtx, senderBean, storageManager, pending, makeConnection.getNamespaceValue(), transaction);
if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage");
return false;
@@ -120,7 +121,8 @@
SenderBean matchingMessage,
StorageManager storageManager,
boolean pending,
- String namespace)
+ String namespace,
+ Transaction transaction)
throws AxisFault
{
if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::replyToPoll");
@@ -175,6 +177,10 @@
returnMessage.setOperationContext(context);
returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
+
+ //
+ // Commit the current transaction, so that the SenderWorker can do it's own locking
+ if(transaction != null && transaction.isActive()) transaction.commit();
//running the MakeConnection through a SenderWorker.
//This will allow Sandesha2 to consider both of following senarios equally.
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java Mon May 21 05:17:40 2007
@@ -19,6 +19,7 @@
import org.apache.axis2.AxisFault;
import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.storage.Transaction;
/**
* The message processor interface.
@@ -31,7 +32,7 @@
* @return true if the msg context has been paused
* @throws AxisFault
*/
- public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+ public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault;
/**
*
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Mon May 21 05:17:40 2007
@@ -42,6 +42,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
@@ -66,7 +67,7 @@
private static final Log log = LogFactory.getLog(SequenceProcessor.class);
- public InvocationResponse processSequenceHeader(RMMsgContext rmMsgCtx) throws AxisFault {
+ public InvocationResponse processSequenceHeader(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: SequenceProcessor::processSequenceHeader");
@@ -74,7 +75,7 @@
Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
if(sequence != null) {
// This is a reliable message, so hand it on to the main routine
- result = processReliableMessage(rmMsgCtx);
+ result = processReliableMessage(rmMsgCtx, transaction);
} else {
if (log.isDebugEnabled())
log.debug("Message does not contain a sequence header");
@@ -84,7 +85,7 @@
return result;
}
- public InvocationResponse processReliableMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public InvocationResponse processReliableMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: SequenceProcessor::processReliableMessage");
@@ -214,12 +215,12 @@
SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
- // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
+ // this is effectively a poll for the replyMessage, so re-use the logic in the MakeConnection
// processor. This will use this thread to re-send the reply, writing it into the transport.
// As the reply is now written we do not want to continue processing, or suspend, so we abort.
if(replyMessageBean != null) {
if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
- MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
+ MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null, transaction);
result = InvocationResponse.ABORT;
if (log.isDebugEnabled())
log.debug("Exit: SequenceProcessor::processReliableMessage, replayed message: " + result);
@@ -313,13 +314,14 @@
// - We have async acks
boolean backchannelFree = (replyTo != null && !replyTo.hasAnonymousAddress()) ||
WSDLConstants.MEP_CONSTANT_IN_ONLY == mep;
+
+ boolean sendAck = false;
+
EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
if (acksTo.hasAnonymousAddress() && backchannelFree) {
Object responseWritten = msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
- if (responseWritten==null || !Constants.VALUE_TRUE.equals(responseWritten)) {
- RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
- msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
- AcknowledgementManager.sendAckNow(ackRMMsgContext);
+ if (responseWritten==null || !Constants.VALUE_TRUE.equals(responseWritten)) {
+ sendAck = true;
}
} else if (!acksTo.hasAnonymousAddress()) {
SandeshaPolicyBean policyBean = SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation());
@@ -365,6 +367,24 @@
// This will avoid performing application processing more than once.
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+ }
+
+ if (transaction != null && transaction.isActive())
+ transaction.commit();
+
+ if (sendAck) {
+ try {
+ transaction = storageManager.getTransaction();
+
+ RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
+ msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+ AcknowledgementManager.sendAckNow(ackRMMsgContext);
+ if (transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+
+ } finally {
+ if (transaction != null && transaction.isActive()) transaction.rollback();
+ }
}
if (log.isDebugEnabled())
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon May 21 05:17:40 2007
@@ -29,7 +29,6 @@
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.transport.http.server.AxisHttpResponseImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
@@ -40,6 +39,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
@@ -64,7 +64,7 @@
private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
- public boolean processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
+ public boolean processInMessage(RMMsgContext terminateSeqRMMsg, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
@@ -150,12 +150,15 @@
//sending the terminate sequence response
if (terminateSequenceResponse != null) {
+ //
+ // As we have processed the input and prepared the response we can commit the
+ // transaction now.
+ if(transaction != null && transaction.isActive()) transaction.commit();
MessageContext outMessage = terminateSequenceResponse.getMessageContext();
EndpointReference toEPR = outMessage.getTo();
- AxisEngine engine = new AxisEngine(terminateSeqMsg
- .getConfigurationContext());
+ AxisEngine engine = new AxisEngine(terminateSeqMsg.getConfigurationContext());
outMessage.setServerSide(true);
@@ -308,7 +311,7 @@
throw new SandeshaException (message);
}
- RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
+ //RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
}
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Mon May 21 05:17:40 2007
@@ -29,6 +29,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
@@ -43,7 +44,7 @@
private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
- public boolean processInMessage(RMMsgContext terminateResRMMsg)
+ public boolean processInMessage(RMMsgContext terminateResRMMsg, Transaction transaction)
throws AxisFault {
if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Mon May 21 05:17:40 2007
@@ -72,31 +72,23 @@
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
transaction = storageManager.getTransaction();
- msgProcessor.processInMessage(rmMsgCtx);
+ msgProcessor.processInMessage(rmMsgCtx, transaction);
//If message is a LastMessage then we deligate the processing to the LastMessageProcessor
Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
if (sequence!=null && sequence.getLastMessage()!=null) {
- LastMessageProcessor lastMsgProcessor = new LastMessageProcessor ();
- lastMsgProcessor.processLastMessage(rmMsgCtx);
+ LastMessageProcessor.processLastMessage(rmMsgCtx);
}
-
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("Exception caught during processInMessage", e);
// message should not be sent in a exception situation.
msgCtx.pause();
- if (transaction != null) {
- try {
- transaction.rollback();
- transaction = null;
- } catch (Exception e1) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
- log.debug(message, e);
- }
- }
-
if (!(e instanceof AxisFault)) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.inMsgError, e.toString());
throw new AxisFault(message, e);
@@ -104,12 +96,12 @@
throw (AxisFault)e;
} finally {
- if (transaction != null) {
+ if (transaction != null && transaction.isActive()) {
try {
- transaction.commit();
- } catch (Exception e) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
- log.debug(message, e);
+ transaction.rollback();
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+ log.debug(message, e1);
}
}
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java Mon May 21 05:17:40 2007
@@ -108,11 +108,12 @@
} catch (Exception e) {
if(log.isDebugEnabled()) log.debug("Exception", e);
- if(t != null) {
+ } finally {
+ if(t != null && t.isActive()) {
try {
t.rollback();
} catch(Exception e2) {
- if(log.isDebugEnabled()) log.debug("Exception during rollback", e);
+ if(log.isDebugEnabled()) log.debug("Exception during rollback", e2);
}
}
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/Transaction.java Mon May 21 05:17:40 2007
@@ -23,9 +23,9 @@
public interface Transaction {
- public void commit ();
+ public void commit () throws SandeshaStorageException;
- public void rollback ();
+ public void rollback () throws SandeshaStorageException;
//indicates that the transaction has been started, but has not been committed or rolledbacked yet.
public boolean isActive ();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Mon May 21 05:17:40 2007
@@ -46,6 +46,7 @@
private InMemoryTransaction waitingForTran = null;
private boolean sentMessages = false;
private boolean receivedMessages = false;
+ private boolean active = true;
InMemoryTransaction(InMemoryStorageManager manager, String threadName, int id) {
if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::<init>");
@@ -62,14 +63,16 @@
SandeshaThread invoker = manager.getInvoker();
if(invoker != null) invoker.wakeThread();
}
+ active = false;
}
public void rollback() {
releaseLocks();
+ active = false;
}
public boolean isActive () {
- return !enlistedBeans.isEmpty();
+ return active;
}
public void enlist(RMBean bean) throws SandeshaStorageException {
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Mon May 21 05:17:40 2007
@@ -602,19 +602,20 @@
SOAPFault faultPart = envelope.getBody().getFault();
if (faultPart != null) {
- Transaction transaction = null;
-
- try {
- transaction = storageManager.getTransaction();
- // constructing the fault
- AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart);
- response = manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+ Transaction transaction = null;
- if(transaction != null && transaction.isActive()) transaction.commit();
- transaction = null;
- } finally {
- if (transaction != null && transaction.isActive())
- transaction.rollback();
+ try {
+ transaction = storageManager.getTransaction();
+
+ // constructing the fault
+ AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart);
+ response = manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+ } finally {
+ if (transaction != null && transaction.isActive())
+ transaction.rollback();
}
}
return response;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java Mon May 21 05:17:40 2007
@@ -95,11 +95,11 @@
//invoke each bean in turn.
//NOTE: here we are breaking ordering
while(stMapIt.hasNext()){
- transaction = storageManager.getTransaction();
- InvokerBean invoker = (InvokerBean)stMapIt.next();
-
//invoke the app
try{
+ transaction = storageManager.getTransaction();
+ InvokerBean invoker = (InvokerBean)stMapIt.next();
+
// start a new worker thread and let it do the invocation.
String workId = sequenceID + "::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
//piece of work that will be assigned to the Worker.
@@ -149,15 +149,15 @@
rmdBeanMgr.update(rMDBean);
}
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
}
catch(Exception e){
- if(transaction != null) {
- transaction.rollback();
- transaction = null;
- }
+ // Just log the error
+ if(log.isDebugEnabled()) log.debug("Exception", e);
} finally {
- if(transaction != null) {
- transaction.commit();
+ if(transaction != null && transaction.isActive()) {
+ transaction.rollback();
transaction = null;
}
}
@@ -327,30 +327,21 @@
processedMessage = true;
}
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
} catch (Exception e) {
- if (transaction != null) {
- try {
- transaction.rollback();
- transaction = null;
- } catch (Exception e1) {
- String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.rollbackError, e1
- .toString());
- log.debug(message, e1);
- }
- }
String message = SandeshaMessageHelper
.getMessage(SandeshaMessageKeys.invokeMsgError);
- log.debug(message, e);
+ if(log.isDebugEnabled()) log.debug(message, e);
} finally {
- if (transaction != null) {
+ if (transaction != null && transaction.isActive()) {
try {
- transaction.commit();
- transaction = null;
+ transaction.rollback();
} catch (Exception e) {
String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.commitError, e.toString());
- log.debug(message, e);
+ SandeshaMessageKeys.rollbackError, e.toString());
+ if(log.isDebugEnabled()) log.debug(message, e);
}
}
}
@@ -359,5 +350,5 @@
log.debug("Exit: InOrderInvoker::internalRun");
return sleep;
}
-
+
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Mon May 21 05:17:40 2007
@@ -14,6 +14,7 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
@@ -105,17 +106,15 @@
transaction.commit();
transaction = storageManager.getTransaction();
}
-
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("Exception :", e);
- if(transaction!=null){
- transaction.rollback();
- transaction = storageManager.getTransaction();
- }
+
handleFault(rmMsg, e);
}
+
+
if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = (Sequence) rmMsg
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -158,18 +157,25 @@
rMDBean.setNextMsgNoToProcess(nextMsgNo);
storageManager.getRMDBeanMgr().update(rMDBean);
}
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+
} catch (Exception e) {
if (log.isErrorEnabled())
log.error(e.toString(), e);
- if(transaction != null) {
- transaction.rollback();
- transaction = null;
- }
} finally {
- if (transaction!=null) transaction.commit();
-
if (workId !=null && lock!=null) {
lock.removeWork(workId);
+ }
+
+ if (transaction!=null && transaction.isActive()) {
+ try {
+ transaction.rollback();
+ } catch (SandeshaStorageException e) {
+ if (log.isWarnEnabled())
+ log.warn("Caught exception rolling back transaction", e);
+ }
}
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Mon May 21 05:17:40 2007
@@ -174,6 +174,9 @@
// If we got to here then we found work to do on the sequence, so we should
// remember not to sleep at the end of the list of sequences.
processedMessage = true;
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
} catch (Exception e) {
@@ -184,28 +187,16 @@
//TODO rollback only if a SandeshaStorageException.
//This allows the other Exceptions to be used within the Normal flow.
- if (transaction != null) {
- try {
- transaction.rollback();
- transaction = null;
- } catch (Exception e1) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
- .toString());
- log.debug(message, e1);
- }
- }
-
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
-
log.debug(message, e);
} finally {
- if (transaction != null) {
+ if (transaction != null && transaction.isActive()) {
try {
- transaction.commit();
+ transaction.rollback();
transaction = null;
} catch (Exception e) {
String message = SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.commitError, e.toString());
+ .getMessage(SandeshaMessageKeys.rollbackError, e.toString());
log.debug(message, e);
}
}
@@ -228,9 +219,10 @@
RMSBean finderBean = new RMSBean();
finderBean.setTerminated(true);
- Transaction transaction = storageManager.getTransaction();
+ Transaction transaction = null;
try {
+ transaction = storageManager.getTransaction();
SandeshaPolicyBean propertyBean =
SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
@@ -301,11 +293,21 @@
}
}
- } catch (SandeshaException e) {
- if (log.isErrorEnabled())
- log.error(e);
- } finally {
- transaction.commit();
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+
+ } catch (SandeshaException e) {
+ if (log.isErrorEnabled())
+ log.error(e);
+ } finally {
+ if(transaction != null && transaction.isActive()) {
+ try {
+ transaction.rollback();
+ } catch (SandeshaStorageException e) {
+ if (log.isDebugEnabled())
+ log.debug("Caught exception rolling back transaction", e);
+ }
+ }
}
if (log.isDebugEnabled())
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Mon May 21 05:17:40 2007
@@ -32,11 +32,11 @@
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
-import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
@@ -185,14 +185,6 @@
// sending the message
boolean successfullySent = false;
- // have to commit the transaction before sending. This may
- // get changed when WS-AT is available.
- if(transaction != null) {
- transaction.commit();
- transaction = null;
- transaction = storageManager.getTransaction();
- }
-
// Although not actually sent yet, update the send count to indicate an attempt
if (senderBean.isReSend()) {
SenderBean bean2 = senderBeanMgr.retrieve(senderBean.getMessageID());
@@ -271,11 +263,10 @@
// Store the Exception as a sequence property to enable the client to lookup the last
// exception time and timestamp.
- // Create a new Transaction
- transaction = storageManager.getTransaction();
-
try
{
+ // Create a new Transaction
+ transaction = storageManager.getTransaction();
// Get the internal sequence id from the context
String internalSequenceId = (String)rmMsgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
@@ -300,7 +291,7 @@
{
if (log.isErrorEnabled())
log.error(e1);
-
+ } finally {
if (transaction != null) {
transaction.rollback();
transaction = null;
@@ -340,34 +331,45 @@
if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
&&
(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) {
- transaction = storageManager.getTransaction();
- //terminate message sent using the SandeshaClient. Since the terminate message will simply get the
- //InFlow of the reference message get called which could be zero sized (OutOnly operations).
-
- // terminate sending side if this is the WSRM 1.0 spec.
- // If the WSRM versoion is 1.1 termination will happen in the terminate sequence response message.
-
- TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- String sequenceID = terminateSequence.getIdentifier().getIdentifier();
-
- RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
- TerminateManager.terminateSendingSide(rmsBean, storageManager);
-
- transaction.commit();
+ try {
+ transaction = storageManager.getTransaction();
+ //terminate message sent using the SandeshaClient. Since the terminate message will simply get the
+ //InFlow of the reference message get called which could be zero sized (OutOnly operations).
+
+ // terminate sending side if this is the WSRM 1.0 spec.
+ // If the WSRM versoion is 1.1 termination will happen in the terminate sequence response message.
+
+ TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ String sequenceID = terminateSequence.getIdentifier().getIdentifier();
+
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
+ TerminateManager.terminateSendingSide(rmsBean, storageManager);
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+ } finally {
+ if(transaction != null && transaction.isActive()) {
+ transaction.rollback();
+ transaction = null;
+ }
+ }
}
} catch (Exception e) {
- if (log.isErrorEnabled()) log.error("Caught exception", e);
- if (transaction!=null) {
- transaction.rollback();
- transaction = null;
- }
+ if (log.isDebugEnabled()) log.debug("Caught exception", e);
} finally {
- if (transaction!=null) transaction.commit();
-
if (lock!=null && workId!=null) {
lock.removeWork(workId);
+ }
+
+ if (transaction!=null && transaction.isActive()) {
+ try {
+ transaction.rollback();
+ } catch (SandeshaStorageException e) {
+ if (log.isWarnEnabled())
+ log.warn("Caught exception rolling back transaction", e);
+ }
}
}
Modified: webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java?view=diff&rev=540130&r1=540129&r2=540130
==============================================================================
--- webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java (original)
+++ webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/scenarios/RMScenariosTest.java Mon May 21 05:17:40 2007
@@ -116,6 +116,7 @@
// Test sync echo with no offer, and the 1.1 spec
clientOptions = new Options();
clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1);
+ org.apache.log4j.BasicConfigurator.configure();
runEcho(clientOptions, false, false, true,true,true);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org