You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/05/12 21:16:35 UTC
svn commit: r405839 [2/2] - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/ handlers/
msgprocessors/ util/ workers/
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Fri May 12 12:16:32 2006
@@ -20,6 +20,7 @@
import javax.xml.namespace.QName;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContextFactory;
@@ -34,6 +35,7 @@
import org.apache.sandesha2.msgprocessors.MsgProcessor;
import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.Sequence;
@@ -63,42 +65,71 @@
log.debug(message);
throw new AxisFault(message);
}
-
- // getting rm message
- RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
- String DONE = (String) msgCtx
- .getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+ String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
if (null != DONE && "true".equals(DONE))
return;
- msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-
- String dummyMessageString = (String) msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
- boolean dummyMessage = false;
- if (dummyMessageString!=null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
- dummyMessage = true;
-
+ msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
- MsgProcessor msgProcessor = null;
- int messageType = rmMsgCtx.getMessageType();
- if (messageType==Sandesha2Constants.MessageTypes.UNKNOWN) {
- MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- if (requestMsgCtx!=null) { //for the server side
- RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
- Sequence sequencePart = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (sequencePart!=null)
- msgProcessor = new ApplicationMsgProcessor ();// a rm intended message.
- } else if (!msgCtx.isServerSide()) //if client side.
- msgProcessor = new ApplicationMsgProcessor ();
- }else {
- msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+ boolean withinTransaction = false;
+ String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+ withinTransaction = true;
}
+
+ Transaction transaction = null;
+ if (!withinTransaction) {
+ transaction = storageManager.getTransaction();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+ }
+ boolean rolebacked = false;
- if (msgProcessor!=null)
- msgProcessor.processOutMessage(rmMsgCtx);
-
+ try {
+ // getting rm message
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+ String dummyMessageString = (String) msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
+ boolean dummyMessage = false;
+ if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
+ dummyMessage = true;
+
+ MsgProcessor msgProcessor = null;
+ int messageType = rmMsgCtx.getMessageType();
+ if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
+ MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(
+ OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if (requestMsgCtx != null) { // for the server side
+ RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
+ Sequence sequencePart = (Sequence) reqRMMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (sequencePart != null)
+ msgProcessor = new ApplicationMsgProcessor();// a rm
+ // intended
+ // message.
+ } else if (!msgCtx.isServerSide()) // if client side.
+ msgProcessor = new ApplicationMsgProcessor();
+ } else {
+ msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+ }
+
+ if (msgProcessor != null)
+ msgProcessor.processOutMessage(rmMsgCtx);
+
+ } catch (Exception e) {
+ if (!withinTransaction) {
+ transaction.rollback();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked) {
+ transaction.commit();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ }
+ }
}
public QName getName() {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Fri May 12 12:16:32 2006
@@ -39,7 +39,6 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -64,7 +63,6 @@
public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
-
AckRequested ackRequested = (AckRequested) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
if (ackRequested==null) {
throw new SandeshaException ("Message identified as of type ackRequested does not have an AckRequeted element");
@@ -79,9 +77,11 @@
String sequenceID = ackRequested.getIdentifier().getIdentifier();
ConfigurationContext configurationContext = rmMsgCtx.getMessageContext().getConfigurationContext();
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
//Setting the ack depending on AcksTo.
SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceID,
Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
@@ -139,8 +139,6 @@
ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION,
msgContext.getProperty(AddressingConstants.WS_ADDRESSING_VERSION)); //TODO do this in the RMMsgCreator
-// RMMsgContext ackRMMsgCtx = rmm
-
String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configurationContext);
String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
@@ -170,15 +168,15 @@
rmMsgCtx.getMessageContext().setProperty(
Sandesha2Constants.ACK_WRITTEN, "true");
+
try {
engine.send(ackRMMsgCtx.getMessageContext());
} catch (AxisFault e1) {
throw new SandeshaException(e1.getMessage());
}
+
} else {
- Transaction asyncAckTransaction = storageManager.getTransaction();
-
SenderBeanMgr retransmitterBeanMgr = storageManager
.getRetransmitterBeanMgr();
@@ -238,8 +236,6 @@
//inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
- asyncAckTransaction.commit();
-
//passing the message through sandesha2sender
ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
@@ -255,6 +251,7 @@
} catch (AxisFault e) {
throw new SandeshaException (e.getMessage());
}
+
SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Fri May 12 12:16:32 2006
@@ -31,7 +31,6 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -65,6 +64,8 @@
throw new SandeshaException(message);
}
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
+
MessageContext msgCtx = rmMsgCtx.getMessageContext();
ConfigurationContext configCtx = msgCtx.getConfigurationContext();
@@ -72,16 +73,12 @@
sequenceAck.setMustUnderstand(false);
rmMsgCtx.addSOAPEnvelope();
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(rmMsgCtx.getMessageContext()
- .getConfigurationContext());
+
SenderBeanMgr retransmitterMgr = storageManager
.getRetransmitterBeanMgr();
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
-
-
Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges()
.iterator();
@@ -96,6 +93,7 @@
FaultManager faultManager = new FaultManager();
RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,outSequenceId);
if (faultMessageContext != null) {
+
ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
AxisEngine engine = new AxisEngine(configurationContext);
@@ -105,11 +103,13 @@
throw new SandeshaException ("Could not send the fault message",e);
}
+ msgCtx.pause();
return;
}
faultMessageContext = faultManager.checkForInvalidAcknowledgement(rmMsgCtx);
if (faultMessageContext != null) {
+
ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
AxisEngine engine = new AxisEngine(configurationContext);
@@ -119,25 +119,22 @@
throw new SandeshaException ("Could not send the fault message",e);
}
+ msgCtx.pause();
return;
}
String internalSequenceID = SandeshaUtil.getSequenceProperty(outSequenceId,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configCtx);
//updating the last activated time of the sequence.
- Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
SequenceManager.updateLastActivatedTime(internalSequenceID,rmMsgCtx.getMessageContext().getConfigurationContext());
- lastUpdatedTimeTransaction.commit();
- //Starting transaction
- Transaction ackTransaction = storageManager.getTransaction();
-
SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(
outSequenceId, Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
if (internalSequenceBean == null || internalSequenceBean.getValue() == null) {
String message = "TempSequenceId is not set correctly";
log.debug(message);
+
throw new SandeshaException(message);
}
@@ -218,9 +215,6 @@
allCompletedMsgsBean.setValue(str);
seqPropMgr.update(allCompletedMsgsBean);
-
- //commiting transaction
- ackTransaction.commit();
String lastOutMsgNoStr = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,configCtx);
if (lastOutMsgNoStr!=null ) {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Fri May 12 12:16:32 2006
@@ -42,7 +42,6 @@
import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
@@ -127,8 +126,6 @@
.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
.getConfigurationContext());
-
-
FaultManager faultManager = new FaultManager();
RMMsgContext faultMessageContext = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx);
if (faultMessageContext != null) {
@@ -141,6 +138,7 @@
throw new SandeshaException ("Could not send the fault message",e);
}
+ msgCtx.pause();
return;
}
@@ -169,6 +167,7 @@
throw new SandeshaException ("Could not send the fault message",e);
}
+ msgCtx.pause();
return;
}
@@ -191,14 +190,8 @@
return;
}
- Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
-
//updating the last activated time of the sequence.
SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
- lastUpdatedTimeTransaction.commit();
-
- Transaction updataMsgStringTransaction = storageManager
- .getTransaction();
SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
@@ -263,16 +256,13 @@
msgsBean.setValue(messagesStr);
seqPropMgr.update(msgsBean);
- updataMsgStringTransaction.commit();
-
- Transaction invokeTransaction = storageManager.getTransaction();
-
// Pause the messages bean if not the right message to invoke.
NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
NextMsgBean bean = mgr.retrieve(sequenceId);
- if (bean == null)
+ if (bean == null) {
throw new SandeshaException("Error- The sequence does not exist");
+ }
InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
@@ -281,8 +271,6 @@
if (inOrderInvocation) {
- //pause the message
- rmMsgCtx.pause();
SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr
.retrieve(
@@ -299,6 +287,7 @@
incomingSequenceListBean.setValue(incomingSequenceList
.toString());
+ //this get inserted before
seqPropMgr.insert(incomingSequenceListBean);
}
@@ -312,7 +301,7 @@
//saving the property.
incomingSequenceListBean.setValue(incomingSequenceList
.toString());
- seqPropMgr.insert(incomingSequenceListBean);
+ seqPropMgr.update(incomingSequenceListBean);
}
//saving the message.
@@ -330,6 +319,9 @@
} catch (Exception ex) {
throw new SandeshaException(ex.getMessage());
}
+
+ //pause the message
+ rmMsgCtx.pause();
//Starting the invoker if stopped.
SandeshaUtil
@@ -337,8 +329,6 @@
}
- invokeTransaction.commit();
-
//Sending acknowledgements
sendAckIfNeeded(rmMsgCtx, messagesStr);
@@ -367,7 +357,7 @@
.getSandeshaStorageManager(msgCtx.getConfigurationContext());
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
-
+
Sequence sequence = (Sequence) rmMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
String sequenceId = sequence.getIdentifier().getIdentifier();
@@ -416,7 +406,6 @@
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
- Transaction outHandlerTransaction = storageManager.getTransaction();
boolean serverSide = msgContext.isServerSide();
// setting message Id if null
@@ -590,16 +579,18 @@
throw new SandeshaException (e);
}
- if (requestMessageContext==null)
+ if (requestMessageContext==null) {
throw new SandeshaException ("Request message context is null, cant find out the request side sequenceID");
+ }
RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(requestMessageContext);
Sequence sequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
String requestSequenceID = sequence.getIdentifier().getIdentifier();
SequencePropertyBean specVersionBean = seqPropMgr.retrieve(requestSequenceID,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
- if (specVersionBean==null)
+ if (specVersionBean==null) {
throw new SandeshaException ("SpecVersion sequence property bean is not available for the incoming sequence. Cant find the RM version for outgoing side");
+ }
specVersion = specVersionBean.getValue();
} else {
@@ -744,8 +735,6 @@
processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber,storageKey);
msgContext.pause(); // the execution will be stopped.
- outHandlerTransaction.commit();
-
}
private void addCreateSequenceMessage(RMMsgContext applicationRMMsg,
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Fri May 12 12:16:32 2006
@@ -11,7 +11,6 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.AcknowledgementManager;
@@ -45,13 +44,12 @@
throw new SandeshaException ("Could not send the fault message",e);
}
+ msgCtx.pause();
return;
}
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx);
- Transaction closeSequenceTransaction = storageManager.getTransaction();
-
SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropretyBeanMgr();
SequencePropertyBean sequenceClosedBean = new SequencePropertyBean ();
sequenceClosedBean.setSequenceID(sequenceID);
@@ -107,9 +105,6 @@
String message = "Could not send the terminate sequence response";
throw new SandeshaException (message,e);
}
-
-
- closeSequenceTransaction.commit();
}
public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Fri May 12 12:16:32 2006
@@ -34,7 +34,6 @@
import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.CreateSeqBean;
@@ -82,6 +81,7 @@
throw new SandeshaException ("Could not send the fault message",e);
}
+ createSeqMsg.pause();
return;
}
@@ -92,7 +92,6 @@
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
- Transaction createSequenceTransaction = storageManager.getTransaction(); //begining of a new transaction
try {
String newSequenceId = SequenceManager.setupNewSequence(createSeqRMMsg); //newly created sequnceID.
@@ -170,11 +169,8 @@
outMessage.setResponseWritten(true);
//commiting tr. before sending the response msg.
- createSequenceTransaction.commit();
- Transaction updateLastActivatedTransaction = storageManager.getTransaction();
SequenceManager.updateLastActivatedTime(newSequenceId,createSeqRMMsg.getMessageContext().getConfigurationContext());
- updateLastActivatedTransaction.commit();
AxisEngine engine = new AxisEngine(context);
engine.send(outMessage);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri May 12 12:16:32 2006
@@ -75,7 +75,7 @@
.getSandeshaStorageManager(configCtx);
//Processing for ack if available
- Transaction ackProcessTransaction = storageManager.getTransaction();
+/// Transaction ackProcessTransaction = storageManager.getTransaction();
SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) createSeqResponseRMMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
@@ -84,11 +84,11 @@
ackProcessor.processInMessage(createSeqResponseRMMsgCtx);
}
- ackProcessTransaction.commit();
+/// ackProcessTransaction.commit();
//Processing the create sequence response.
- Transaction createSeqResponseTransaction = storageManager.getTransaction();
+/// Transaction createSeqResponseTransaction = storageManager.getTransaction();
CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
@@ -154,10 +154,10 @@
sequencePropMgr.insert(outSequenceBean);
sequencePropMgr.insert(internalSequenceBean);
- createSeqResponseTransaction.commit();
+/// createSeqResponseTransaction.commit();
- Transaction offerProcessTransaction = storageManager.getTransaction();
+/// Transaction offerProcessTransaction = storageManager.getTransaction();
//processing for accept (offer has been sent)
Accept accept = createSeqResponsePart.getAccept();
@@ -217,9 +217,9 @@
}
- offerProcessTransaction.commit();
+/// offerProcessTransaction.commit();
- Transaction updateAppMessagesTransaction = storageManager.getTransaction();
+/// Transaction updateAppMessagesTransaction = storageManager.getTransaction();
SenderBean target = new SenderBean();
target.setInternalSequenceID(internalSequenceId);
@@ -282,11 +282,11 @@
storageManager.updateMessageContext(key,applicationMsg);
}
- updateAppMessagesTransaction.commit();
+/// updateAppMessagesTransaction.commit();
- Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
+/// Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
SequenceManager.updateLastActivatedTime(internalSequenceId,configCtx);
- lastUpdatedTimeTransaction.commit();
+/// lastUpdatedTimeTransaction.commit();
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
.setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Fri May 12 12:16:32 2006
@@ -39,7 +39,6 @@
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -106,6 +105,8 @@
} catch (AxisFault e) {
throw new SandeshaException ("Could not send the fault message",e);
}
+
+ terminateSeqMsg.pause();
return;
}
@@ -113,7 +114,6 @@
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
- Transaction terminateReceivedTransaction = storageManager.getTransaction();
SequencePropertyBean terminateReceivedBean = new SequencePropertyBean ();
terminateReceivedBean.setSequenceID(sequenceId);
terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
@@ -127,9 +127,6 @@
setUpHighestMsgNumbers(context,storageManager,sequenceId,terminateSeqRMMsg);
- terminateReceivedTransaction.commit();
-
- Transaction terminateTransaction = storageManager.getTransaction();
TerminateManager.cleanReceivingSideOnTerminateMessage(context,sequenceId);
@@ -139,16 +136,12 @@
sequencePropertyBeanMgr.insert(terminatedBean);
- terminateTransaction.commit();
-
//removing an entry from the listener
String transport = terminateSeqMsg.getTransportIn().getName().getLocalPart();
- Transaction lastUpdatedTransaction = storageManager.getTransaction();
SequenceManager.updateLastActivatedTime(sequenceId,context);
- lastUpdatedTransaction.commit();
- terminateSeqRMMsg.pause();
+ terminateSeqMsg.pause();
}
private void setUpHighestMsgNumbers (ConfigurationContext configCtx, StorageManager storageManager, String sequenceID, RMMsgContext terminateRMMsg) throws SandeshaException {
@@ -281,7 +274,7 @@
if (outSequenceID==null)
throw new SandeshaException ("SequenceID was not found. Cannot send the terminate message");
- Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+/// Transaction addTerminateSeqTransaction = storageManager.getTransaction();
String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED,configurationContext);
@@ -384,7 +377,7 @@
rmMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
rmMsgCtx.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
- addTerminateSeqTransaction.commit();
+/// addTerminateSeqTransaction.commit();
AxisEngine engine = new AxisEngine (configurationContext);
try {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java Fri May 12 12:16:32 2006
@@ -291,7 +291,7 @@
} else {
- Transaction asyncAckTransaction = storageManager.getTransaction();
+/// Transaction asyncAckTransaction = storageManager.getTransaction();
SenderBeanMgr retransmitterBeanMgr = storageManager
.getRetransmitterBeanMgr();
@@ -338,7 +338,7 @@
//inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
- asyncAckTransaction.commit();
+/// asyncAckTransaction.commit();
//passing the message through sandesha2sender
ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java Fri May 12 12:16:32 2006
@@ -411,6 +411,9 @@
} else {
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
+
+ //TODO get the acksTo value using the property key.
+
String sequenceId = data.getSequenceId();
SequencePropertyBean acksToBean = seqPropMgr.retrieve(
sequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Fri May 12 12:16:32 2006
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.client.SandeshaClient;
import org.apache.sandesha2.client.SandeshaClientConstants;
@@ -132,6 +133,8 @@
private void finalizeTimedOutSequence (String internalSequenceID, String sequenceID ,MessageContext messageContext) throws SandeshaException {
ConfigurationContext configurationContext = messageContext.getConfigurationContext();
+
+ configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,messageContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
SequenceReport report = SandeshaClient.getOutgoingSequenceReport(internalSequenceID ,configurationContext);
TerminateManager.timeOutSendingSideSequence(configurationContext,internalSequenceID, false);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java Fri May 12 12:16:32 2006
@@ -166,7 +166,7 @@
try {
processor.setup();
} catch (NoSuchMethodException e) {
- throw new SandeshaException(e.getMessage());
+ throw new SandeshaException(e);
}
processor.processPolicy(policy);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Fri May 12 12:16:32 2006
@@ -419,7 +419,7 @@
public static long getOutGoingSequenceAckedMessageCount (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
- Transaction transaction = storageManager.getTransaction();
+/// Transaction transaction = storageManager.getTransaction();
SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
@@ -447,14 +447,14 @@
return 0; //No acknowledgement has been received yet.
long noOfMessagesAcked = Long.parseLong(ackedMsgBean.getValue());
- transaction.commit();
+/// transaction.commit();
return noOfMessagesAcked;
}
public static boolean isOutGoingSequenceCompleted (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
- Transaction transaction = storageManager.getTransaction();
+/// Transaction transaction = storageManager.getTransaction();
SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
@@ -484,14 +484,14 @@
if ("true".equals(terminateAddedBean.getValue()))
return true;
- transaction.commit();
+/// transaction.commit();
return false;
}
public static boolean isIncomingSequenceCompleted (String sequenceID, ConfigurationContext configurationContext) throws SandeshaException {
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
- Transaction transaction = storageManager.getTransaction();
+/// Transaction transaction = storageManager.getTransaction();
SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
SequencePropertyBean terminateReceivedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
@@ -500,7 +500,7 @@
if (terminateReceivedBean!=null && "true".equals(terminateReceivedBean.getValue()))
complete = true;
- transaction.commit();
+/// transaction.commit();
return complete;
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java Fri May 12 12:16:32 2006
@@ -240,8 +240,8 @@
if (Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
deleatable = false;
- if (Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
- deleatable = false;
+// if (Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
+// deleatable = false;
if (Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED.equals(name))
deleatable = false;
@@ -316,6 +316,7 @@
SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
doUpdatesIfNeeded (outSequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
+ //TODO all properties which hv the temm:Seq:id as the key should be deletable.
if (isProportyDeletable(sequencePropertyBean.getName())) {
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
}
@@ -330,7 +331,7 @@
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(configurationContext);
- Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+/// Transaction addTerminateSeqTransaction = storageManager.getTransaction();
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
@@ -424,7 +425,7 @@
terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
- addTerminateSeqTransaction.commit();
+/// addTerminateSeqTransaction.commit();
AxisEngine engine = new AxisEngine (configurationContext);
try {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Fri May 12 12:16:32 2006
@@ -100,43 +100,44 @@
log.debug(ex.getMessage());
}
+ Transaction transaction = null;
+ boolean rolebacked = false;
+
try {
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(context);
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
- InvokerBeanMgr storageMapMgr = storageManager
- .getStorageMapBeanMgr();
+ InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
SequencePropertyBeanMgr sequencePropMgr = storageManager
.getSequencePropretyBeanMgr();
- Transaction preInvocationTransaction = storageManager.getTransaction();
+ transaction = storageManager.getTransaction();
//Getting the incomingSequenceIdList
SequencePropertyBean allSequencesBean = (SequencePropertyBean) sequencePropMgr
.retrieve(
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
- if (allSequencesBean == null)
- continue;
-
- ArrayList allSequencesList = SandeshaUtil.getArrayListFromString (allSequencesBean
- .getValue());
- preInvocationTransaction.commit();
+ if (allSequencesBean == null) {
+ continue;
+ }
+ ArrayList allSequencesList = SandeshaUtil.getArrayListFromString (allSequencesBean.getValue());
Iterator allSequencesItr = allSequencesList.iterator();
-
+
currentIteration: while (allSequencesItr.hasNext()) {
-
String sequenceId = (String) allSequencesItr.next();
- Transaction invocationTransaction = storageManager.getTransaction(); //Transaction based invocation
+ //commiting the old transaction
+ transaction.commit();
+
+ //starting a new transaction for the new iteration.
+ transaction = storageManager.getTransaction();
NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
if (nextMsgBean == null) {
-
String message = "Next message not set correctly. Removing invalid entry.";
log.debug(message);
allSequencesItr.remove();
@@ -144,14 +145,12 @@
//cleaning the invalid data of the all sequences.
allSequencesBean.setValue(allSequencesList.toString());
sequencePropMgr.update(allSequencesBean);
-
- throw new SandeshaException (message);
+ continue;
}
long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
if (nextMsgno <= 0) {
- String message = "Invalid messaage number as the Next Message Number. Removing invalid entry";
-
+ String message = "Invalid message number as the Next Message Number.";
throw new SandeshaException(message);
}
@@ -163,57 +162,36 @@
while (stMapIt.hasNext()) {
- InvokerBean stMapBean = (InvokerBean) stMapIt
- .next();
+ InvokerBean stMapBean = (InvokerBean) stMapIt.next();
String key = stMapBean.getMessageContextRefKey();
-
MessageContext msgToInvoke = storageManager.retrieveMessageContext(key,context);
+ RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
- RMMsgContext rmMsg = MsgInitializer
- .initializeMessage(msgToInvoke);
- Sequence seq = (Sequence) rmMsg
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
- long msgNo = seq.getMessageNumber().getMessageNumber();
-
+ //have to commit the transaction before invoking. This may get changed when WS-AT is available.
+ transaction.commit();
+
try {
- //Invoking the message.
-
- //currently Transaction based invocation can be supplied only for the in-only case.
-
- if (!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern())) {
- invocationTransaction.commit();
- }
-
+ //Invoking the message.
+ msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
new AxisEngine (msgToInvoke.getConfigurationContext())
.resume(msgToInvoke);
invoked = true;
-
- if (!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern())) {
- invocationTransaction = storageManager.getTransaction();
- }
-
storageMapMgr.delete(key);
} catch (AxisFault e) {
throw new SandeshaException(e);
+ } finally {
+ transaction = storageManager.getTransaction();
}
-
+
//undating the next msg to invoke
-
if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = (Sequence) rmMsg
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
if (sequence.getLastMessage() != null) {
-
TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId);
-
- //this sequence has no more invocations
-// stopInvokerForTheSequence(sequenceId);
-
//exit from current iteration. (since an entry was removed)
- invocationTransaction.commit();
break currentIteration;
}
}
@@ -223,15 +201,19 @@
nextMsgno++;
nextMsgBean.setNextMsgNoToProcess(nextMsgno);
nextMsgMgr.update(nextMsgBean);
- invocationTransaction.commit();
- }
+ }
}
- } catch (SandeshaException e1) {
+ } catch (Exception e1) {
e1.printStackTrace();
+ if (transaction!=null) {
+ transaction.rollback();
+ rolebacked = true;
+ }
+ } finally {
+ if (!rolebacked && transaction!=null)
+ transaction.commit();
}
}
-
- int i = 1;
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Fri May 12 12:16:32 2006
@@ -58,11 +58,9 @@
public class Sender extends Thread {
private boolean runSender = false;
- private boolean stopSenderAfterWork = false;
private ArrayList workingSequences = new ArrayList();
private ConfigurationContext context = null;
private static final Log log = LogFactory.getLog(Sender.class);
- private ThreadPool threadPool = new ThreadPool ();
public synchronized void stopSenderForTheSequence(String sequenceID) {
workingSequences.remove(sequenceID);
@@ -103,29 +101,29 @@
log.debug("End printing Interrupt...");
}
+ Transaction transaction = null;
+ boolean rolebacked = false;
+
try {
if (context == null) {
String message = "Can't continue the Sender. Context is null";
log.debug(message);
throw new SandeshaException(message);
}
-
- Transaction pickMessagesToSendTransaction = storageManager.getTransaction();
-
+
+ transaction = storageManager.getTransaction();
+
SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr();
SenderBean senderBean = mgr.getNextMsgToSend();
if (senderBean==null) {
- pickMessagesToSendTransaction.commit();
continue;
}
-
MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context);
- if (!continueSending)
+ if (!continueSending) {
continue;
-
- pickMessagesToSendTransaction.commit();
+ }
String key = (String) senderBean.getMessageContextRefKey();
MessageContext msgCtx = storageManager.retrieveMessageContext(key, context);
@@ -161,8 +159,6 @@
updateMessage(msgCtx);
- Transaction preSendTransaction = storageManager.getTransaction();
-
int messageType = rmMsgCtx.getMessageType();
if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -177,16 +173,16 @@
AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx);
}
- preSendTransaction.commit();
-
//sending the message
TransportOutDescription transportOutDescription = msgCtx.getTransportOut();
TransportSender transportSender = transportOutDescription.getSender();
+ //have to commit the transaction before sending. This may get changed when WS-AT is available.
+ transaction.commit();
+
boolean successfullySent = false;
if (transportSender != null) {
try {
-
//TODO change this to cater for security.
transportSender.invoke(msgCtx);
successfullySent = true;
@@ -194,11 +190,11 @@
// TODO Auto-generated catch block
log.debug("Could not send message");
log.debug(e.getStackTrace().toString());
+ } finally {
+ transaction = storageManager.getTransaction();
}
}
- Transaction postSendTransaction = storageManager.getTransaction();
-
// update or delete only if the object is still present.
SenderBean bean1 = mgr.retrieve(senderBean.getMessageID());
if (bean1 != null) {
@@ -210,14 +206,13 @@
mgr.delete(bean1.getMessageID());
}
- postSendTransaction.commit(); // commiting the current transaction
-
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+
if (successfullySent) {
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
}
-
- Transaction terminateCleaningTransaction = storageManager.getTransaction();
+
if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
// terminate sending side.
TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
@@ -228,15 +223,22 @@
TerminateManager.terminateSendingSide(configContext,internalSequenceID, msgCtx.isServerSide());
}
- terminateCleaningTransaction.commit();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
- } catch (AxisFault e) {
+ } catch (Exception e) {
String message = "An Exception was throws in sending";
log.debug(message,e);
// TODO : when this is the client side throw the exception to
// the client when necessary.
+ if (transaction!=null) {
+ transaction.rollback();
+ rolebacked = true;
+ }
+ } finally {
+ if (transaction!=null && !rolebacked)
+ transaction.commit();
}
}
}
@@ -311,7 +313,12 @@
log.debug("Valid SOAP envelope not found");
log.debug(e.getStackTrace().toString());
}
-
+
+ //if the request msg ctx is withina a transaction, processing if the response should also happen
+ //withing the same transaction
+ responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION
+ ,msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
+
if (resenvelope != null) {
responseMessageContext.setEnvelope(resenvelope);
AxisEngine engine = new AxisEngine(msgCtx
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org