You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/05/12 21:16:35 UTC
svn commit: r405839 [1/2] - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/ handlers/
msgprocessors/ util/ workers/
Author: chamikara
Date: Fri May 12 12:16:32 2006
New Revision: 405839
URL: http://svn.apache.org/viewcvs?rev=405839&view=rev
Log:
Redesigned the transaction model of Sandesha2.
Transactions will be started in following components.
Handlers, Invoker, Sender, SandeshaClient.
In a exception situation these transactions will be roledbacked else
they will be commited. Commiting is usually done in the 'finally' block.
A boolean property was used to avoid nested transactions. A transaction will
start only if this property is not set.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java Fri May 12 12:16:32 2006
@@ -277,8 +277,8 @@
}
public void setAddressingNamespaceValue(String addressingNamespaceValue) throws SandeshaException {
- if (addressingNamespaceValue!=AddressingConstants.Submission.WSA_NAMESPACE &&
- addressingNamespaceValue!=AddressingConstants.Final.WSA_NAMESPACE)
+ if (!AddressingConstants.Submission.WSA_NAMESPACE.equals(addressingNamespaceValue) &&
+ !AddressingConstants.Final.WSA_NAMESPACE.equals(addressingNamespaceValue))
throw new SandeshaException ("Unknown addressing version");
this.addressingNamespaceValue = addressingNamespaceValue;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Fri May 12 12:16:32 2006
@@ -479,4 +479,6 @@
String INVOKER = "Invoker";
+ String WITHIN_TRANSACTION = "WithinTransaction";
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java Fri May 12 12:16:32 2006
@@ -57,728 +57,804 @@
import org.apache.sandesha2.wsrm.TerminateSequence;
/**
- * Contains all the Sandesha2Constants of Sandesha2.
- * Please see sub-interfaces to see grouped data.
+ * Contains all the Sandesha2Constants of Sandesha2. Please see sub-interfaces
+ * to see grouped data.
*
* @author Chamikara Jayalath <ch...@gmail.com>
*/
public class SandeshaClient {
-
+
private static final Log log = LogFactory.getLog(SandeshaClient.class);
-
+
/**
- * Users can get a SequenceReport of the sequence defined by the information given from the
- * passed serviceClient object.
+ * Users can get a SequenceReport of the sequence defined by the information
+ * given from the passed serviceClient object.
*
* @param serviceClient
* @return
* @throws SandeshaException
*/
- public static SequenceReport getOutgoingSequenceReport (ServiceClient serviceClient) throws SandeshaException {
+ public static SequenceReport getOutgoingSequenceReport(ServiceClient serviceClient) throws SandeshaException {
Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
EndpointReference toEPR = options.getTo();
- if (toEPR==null)
- throw new SandeshaException ("'To' address is not set");
-
+ if (toEPR == null)
+ throw new SandeshaException("'To' address is not set");
+
String to = toEPR.getAddress();
String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-
+
ServiceContext serviceContext = serviceClient.getServiceContext();
- if (serviceContext==null)
- throw new SandeshaException ("Service Context is null");
-
+ if (serviceContext == null)
+ throw new SandeshaException("Service Context is null");
+
ConfigurationContext configurationContext = serviceContext.getConfigurationContext();
- String internalSequenceID = getInternalSequenceID(to,sequenceKey);
-
- return getOutgoingSequenceReport(internalSequenceID,configurationContext);
- }
-
- public static SequenceReport getOutgoingSequenceReport (String to,String sequenceKey,ConfigurationContext configurationContext) throws SandeshaException {
-
- String internalSequenceID = SandeshaUtil.getInternalSequenceID(to,sequenceKey);
- return getOutgoingSequenceReport(internalSequenceID,configurationContext);
- }
-
- public static SequenceReport getOutgoingSequenceReport (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
-
- SequenceReport sequenceReport = new SequenceReport ();
+
+ String internalSequenceID = getInternalSequenceID(to, sequenceKey);
+
+ return getOutgoingSequenceReport(internalSequenceID, configurationContext);
+ }
+
+ public static SequenceReport getOutgoingSequenceReport(String to, String sequenceKey,
+ ConfigurationContext configurationContext) throws SandeshaException {
+
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+ return getOutgoingSequenceReport(internalSequenceID, configurationContext);
+ }
+
+ public static SequenceReport getOutgoingSequenceReport(String internalSequenceID,
+ ConfigurationContext configurationContext) throws SandeshaException {
+
+ SequenceReport sequenceReport = new SequenceReport();
sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_OUT);
-
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
-
- Transaction reportTransaction = storageManager.getTransaction();
-
- sequenceReport.setInternalSequenceID(internalSequenceID);
-
- CreateSeqBean createSeqFindBean = new CreateSeqBean ();
- createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
- CreateSeqBean createSeqBean = createSeqMgr.findUnique(createSeqFindBean);
-
- //if data not is available sequence has to be terminated or timedOut.
- if (createSeqBean==null) {
-
- //check weather this is an terminated sequence.
- if (isSequenceTerminated(internalSequenceID,seqPropMgr)) {
- fillTerminatedOutgoingSequenceInfo (sequenceReport,internalSequenceID,seqPropMgr);
-
+
+ String withinTransactionStr = (String) configurationContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ boolean withinTransaction = false;
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
+ withinTransaction = true;
+
+ Transaction reportTransaction = null;
+ if (!withinTransaction)
+ reportTransaction = storageManager.getTransaction();
+
+ boolean rolebacked = false;
+
+ try {
+
+ sequenceReport.setInternalSequenceID(internalSequenceID);
+
+ CreateSeqBean createSeqFindBean = new CreateSeqBean();
+ createSeqFindBean.setInternalSequenceID(internalSequenceID);
+
+ CreateSeqBean createSeqBean = createSeqMgr.findUnique(createSeqFindBean);
+
+ // if data not is available sequence has to be terminated or
+ // timedOut.
+ if (createSeqBean == null) {
+
+ // check weather this is an terminated sequence.
+ if (isSequenceTerminated(internalSequenceID, seqPropMgr)) {
+ fillTerminatedOutgoingSequenceInfo(sequenceReport, internalSequenceID, seqPropMgr);
+
+ return sequenceReport;
+ }
+
+ if (isSequenceTimedout(internalSequenceID, seqPropMgr)) {
+ fillTimedoutOutgoingSequenceInfo(sequenceReport, internalSequenceID, seqPropMgr);
+
+ return sequenceReport;
+ }
+
+ // sequence must hv been timed out before establiching. No other
+ // posibility I can think of.
+ // this does not get recorded since there is no key (which is
+ // normally the sequenceID) to store it.
+ // (properties with key as the internalSequenceID get deleted in
+ // timing out)
+
+ // so, setting the sequence status to INITIAL
+ sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_INITIAL);
+
+ // returning the current sequence report.
return sequenceReport;
}
-
- if (isSequenceTimedout(internalSequenceID,seqPropMgr)) {
- fillTimedoutOutgoingSequenceInfo (sequenceReport,internalSequenceID,seqPropMgr);
-
+
+ String outSequenceID = createSeqBean.getSequenceID();
+ if (outSequenceID == null) {
+ sequenceReport.setInternalSequenceID(internalSequenceID);
+ sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_INITIAL);
+ sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_OUT);
+
return sequenceReport;
}
-
- //sequence must hv been timed out before establiching. No other posibility I can think of.
- //this does not get recorded since there is no key (which is normally the sequenceID) to store it.
- //(properties with key as the internalSequenceID get deleted in timing out)
-
- //so, setting the sequence status to INITIAL
- sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_INITIAL);
-
- //returning the current sequence report.
- return sequenceReport;
+
+ sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_ESTABLISHED);
+ fillOutgoingSequenceInfo(sequenceReport, outSequenceID, seqPropMgr);
+
+ } catch (Exception e) {
+ if (!withinTransaction && reportTransaction!=null) {
+ reportTransaction.rollback();
+ configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked && reportTransaction!=null) {
+ reportTransaction.commit();
+ configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ }
}
-
- String outSequenceID = createSeqBean.getSequenceID();
- if (outSequenceID==null) {
- sequenceReport.setInternalSequenceID(internalSequenceID);
- sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_INITIAL);
- sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_OUT);
-
- return sequenceReport;
- }
-
- sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_ESTABLISHED);
- fillOutgoingSequenceInfo(sequenceReport,outSequenceID,seqPropMgr);
-
- reportTransaction.commit();
-
- return sequenceReport;
+
+ return sequenceReport;
}
-
+
/**
- * Users can get a list of sequenceReports each describing a incoming sequence, which are
- * the sequences the client work as a RMD.
+ * Users can get a list of sequenceReports each describing a incoming
+ * sequence, which are the sequences the client work as a RMD.
*
* @param configCtx
* @return
* @throws SandeshaException
*/
- public static ArrayList getIncomingSequenceReports (ConfigurationContext configCtx) throws SandeshaException {
-
+ public static ArrayList getIncomingSequenceReports(ConfigurationContext configCtx) throws SandeshaException {
+
SandeshaReport report = getSandeshaReport(configCtx);
ArrayList incomingSequenceIDs = report.getIncomingSequenceList();
Iterator incomingSequenceIDIter = incomingSequenceIDs.iterator();
-
- ArrayList incomingSequenceReports = new ArrayList ();
+
+ ArrayList incomingSequenceReports = new ArrayList();
while (incomingSequenceIDIter.hasNext()) {
String sequnceID = (String) incomingSequenceIDIter.next();
- SequenceReport incomingSequenceReport = getIncomingSequenceReport(sequnceID,configCtx);
- if (incomingSequenceReport==null) {
- throw new SandeshaException ("An incoming sequence report is not present for the given sequenceID");
+ SequenceReport incomingSequenceReport = getIncomingSequenceReport(sequnceID, configCtx);
+ if (incomingSequenceReport == null) {
+ throw new SandeshaException("An incoming sequence report is not present for the given sequenceID");
}
incomingSequenceReports.add(incomingSequenceReport);
}
-
+
return incomingSequenceReports;
}
-
+
/**
* SandeshaReport gives the details of all incoming and outgoing sequences.
- * The outgoing sequence have to pass the initial state (CS/CSR exchange) to be included in a SandeshaReport
+ * The outgoing sequence have to pass the initial state (CS/CSR exchange) to
+ * be included in a SandeshaReport
*
* @param configurationContext
* @return
* @throws SandeshaException
*/
- public static SandeshaReport getSandeshaReport (ConfigurationContext configurationContext) throws SandeshaException {
-
+ public static SandeshaReport getSandeshaReport(ConfigurationContext configurationContext) throws SandeshaException {
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
- SandeshaReport sandeshaReport = new SandeshaReport ();
- SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean ();
- internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
- Collection collection = seqPropMgr.find(internalSequenceFindBean);
- Iterator iterator = collection.iterator();
- while (iterator.hasNext()) {
- SequencePropertyBean bean = (SequencePropertyBean) iterator.next();
- String sequenceID = bean.getSequenceID();
- sandeshaReport.addToOutgoingSequenceList (sequenceID);
- sandeshaReport.addToOutgoingInternalSequenceMap(sequenceID,bean.getValue());
-
- SequenceReport report = getOutgoingSequenceReport(bean.getValue(),configurationContext);
-
- sandeshaReport.addToNoOfCompletedMessagesMap(sequenceID ,report.getCompletedMessages().size());
- sandeshaReport.addToSequenceStatusMap(sequenceID ,report.getSequenceStatus());
- }
-
-
- //incoming sequences
- SequencePropertyBean serverCompletedMsgsFindBean = new SequencePropertyBean ();
- serverCompletedMsgsFindBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-
- Collection serverCompletedMsgsBeans = seqPropMgr.find(serverCompletedMsgsFindBean);
- Iterator iter = serverCompletedMsgsBeans.iterator();
- while (iter.hasNext()) {
- SequencePropertyBean serverCompletedMsgsBean = (SequencePropertyBean) iter.next();
- String sequenceID = serverCompletedMsgsBean.getSequenceID();
- sandeshaReport.addToIncomingSequenceList(sequenceID);
-
- SequenceReport sequenceReport = getIncomingSequenceReport(sequenceID,configurationContext);
-
- sandeshaReport.addToNoOfCompletedMessagesMap(sequenceID,sequenceReport.getCompletedMessages().size());
- sandeshaReport.addToSequenceStatusMap(sequenceID,sequenceReport.getSequenceStatus());
+ SandeshaReport sandeshaReport = new SandeshaReport();
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
+
+ String withinTransactionStr = (String) configurationContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ boolean withinTransaction = false;
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
+ withinTransaction = true;
+
+ Transaction reportTransaction = null;
+ if (!withinTransaction)
+ reportTransaction = storageManager.getTransaction();
+
+ boolean rolebacked = false;
+
+ try {
+
+ internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ Collection collection = seqPropMgr.find(internalSequenceFindBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SequencePropertyBean bean = (SequencePropertyBean) iterator.next();
+ String sequenceID = bean.getSequenceID();
+ sandeshaReport.addToOutgoingSequenceList(sequenceID);
+ sandeshaReport.addToOutgoingInternalSequenceMap(sequenceID, bean.getValue());
+
+ SequenceReport report = getOutgoingSequenceReport(bean.getValue(), configurationContext);
+
+ sandeshaReport.addToNoOfCompletedMessagesMap(sequenceID, report.getCompletedMessages().size());
+ sandeshaReport.addToSequenceStatusMap(sequenceID, report.getSequenceStatus());
+ }
+
+ // incoming sequences
+ SequencePropertyBean serverCompletedMsgsFindBean = new SequencePropertyBean();
+ serverCompletedMsgsFindBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+
+ Collection serverCompletedMsgsBeans = seqPropMgr.find(serverCompletedMsgsFindBean);
+ Iterator iter = serverCompletedMsgsBeans.iterator();
+ while (iter.hasNext()) {
+ SequencePropertyBean serverCompletedMsgsBean = (SequencePropertyBean) iter.next();
+ String sequenceID = serverCompletedMsgsBean.getSequenceID();
+ sandeshaReport.addToIncomingSequenceList(sequenceID);
+
+ SequenceReport sequenceReport = getIncomingSequenceReport(sequenceID, configurationContext);
+
+ sandeshaReport.addToNoOfCompletedMessagesMap(sequenceID, sequenceReport.getCompletedMessages().size());
+ sandeshaReport.addToSequenceStatusMap(sequenceID, sequenceReport.getSequenceStatus());
+ }
+
+ } catch (Exception e) {
+ if (!withinTransaction && reportTransaction!=null) {
+ reportTransaction.rollback();
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked && reportTransaction!=null) {
+ reportTransaction.commit();
+ }
}
-
+
return sandeshaReport;
}
-
- public static void createSequence (ServiceClient serviceClient, boolean offer) throws SandeshaException{
+
+ public static void createSequence(ServiceClient serviceClient, boolean offer) throws SandeshaException {
Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
EndpointReference toEPR = serviceClient.getOptions().getTo();
- if (toEPR==null)
- throw new SandeshaException ("ToEPR is not set");
-
+ if (toEPR == null)
+ throw new SandeshaException("ToEPR is not set");
+
String to = toEPR.getAddress();
- if (to==null)
- throw new SandeshaException ("To EPR is not set");
-
+ if (to == null)
+ throw new SandeshaException("To EPR is not set");
+
if (offer) {
String offeredSequenceID = SandeshaUtil.getUUID();
- options.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,offeredSequenceID);
+ options.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID, offeredSequenceID);
}
-
- //setting a new squenceKey if not already set.
+
+ // setting a new squenceKey if not already set.
String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- if (sequenceKey==null) {
+ if (sequenceKey == null) {
sequenceKey = SandeshaUtil.getUUID();
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
}
-
- options.setProperty(SandeshaClientConstants.DUMMY_MESSAGE,Sandesha2Constants.VALUE_TRUE);
-
- try {
+
+ options.setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_TRUE);
+
+ try {
serviceClient.fireAndForget(null);
} catch (AxisFault e) {
- throw new SandeshaException (e);
+ throw new SandeshaException(e);
}
-
- options.setProperty(SandeshaClientConstants.DUMMY_MESSAGE,Sandesha2Constants.VALUE_FALSE);
-
- }
-
- public static void createSequnce (ServiceClient serviceClient, boolean offer, String sequenceKey) throws SandeshaException {
-
- Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+
+ options.setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_FALSE);
+
+ }
+
+ public static void createSequnce(ServiceClient serviceClient, boolean offer, String sequenceKey)
+ throws SandeshaException {
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
-
- createSequence(serviceClient,offer);
-
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,oldSequenceKey);
- }
-
-
-
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+
+ createSequence(serviceClient, offer);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+ }
+
/**
* User can terminate the sequence defined by the passed serviceClient.
*
* @param serviceClient
* @throws SandeshaException
*/
- public static void terminateSequence (ServiceClient serviceClient) throws SandeshaException {
+ public static void terminateSequence(ServiceClient serviceClient) throws SandeshaException {
ServiceContext serviceContext = serviceClient.getServiceContext();
- if (serviceContext==null)
- throw new SandeshaException ("ServiceContext is null");
-
- Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+ if (serviceContext == null)
+ throw new SandeshaException("ServiceContext is null");
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
- if (rmSpecVersion==null)
- rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion ();
-
+ if (rmSpecVersion == null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion();
+
String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
-
- SOAPEnvelope terminateEnvelope = configureTerminateSequence(options,serviceContext.getConfigurationContext());
- OMElement terminateBody = terminateEnvelope.getBody().getFirstChildWithName(new QName (rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.TERMINATE_SEQUENCE));
-
+
+ SOAPEnvelope terminateEnvelope = configureTerminateSequence(options, serviceContext.getConfigurationContext());
+ OMElement terminateBody = terminateEnvelope.getBody().getFirstChildWithName(
+ new QName(rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.TERMINATE_SEQUENCE));
+
String oldAction = options.getAction();
options.setAction(SpecSpecificConstants.getTerminateSequenceAction(rmSpecVersion));
-
+
try {
serviceClient.fireAndForget(terminateBody);
} catch (AxisFault e) {
String message = "Could not send the terminate message";
- throw new SandeshaException (message,e);
+ throw new SandeshaException(message, e);
} finally {
options.setAction(oldAction);
}
}
-
- public static void terminateSequence (ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
+
+ public static void terminateSequence(ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
terminateSequence(serviceClient);
-
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,oldSequenceKey);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
}
-
+
/**
* User can close the sequence defined by the passed serviceClient.
*
* @param serviceClient
* @throws SandeshaException
*/
- public static void closeSequence (ServiceClient serviceClient) throws SandeshaException {
+ public static void closeSequence(ServiceClient serviceClient) throws SandeshaException {
ServiceContext serviceContext = serviceClient.getServiceContext();
- if (serviceContext==null)
- throw new SandeshaException ("ServiceContext is null");
-
- Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+ if (serviceContext == null)
+ throw new SandeshaException("ServiceContext is null");
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
- if (rmSpecVersion==null)
- rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion ();
-
+ if (rmSpecVersion == null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion();
+
String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
-
- SOAPEnvelope closeSequnceEnvelope = configureCloseSequence (options,serviceContext.getConfigurationContext());
- OMElement closeSequenceBody = closeSequnceEnvelope.getBody().getFirstChildWithName(new QName (rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.CLOSE_SEQUENCE));
-
+
+ SOAPEnvelope closeSequnceEnvelope = configureCloseSequence(options, serviceContext.getConfigurationContext());
+ OMElement closeSequenceBody = closeSequnceEnvelope.getBody().getFirstChildWithName(
+ new QName(rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.CLOSE_SEQUENCE));
+
String oldAction = options.getAction();
options.setAction(SpecSpecificConstants.getCloseSequenceAction(rmSpecVersion));
try {
serviceClient.fireAndForget(closeSequenceBody);
} catch (AxisFault e) {
String message = "Could not send the close sequence message";
- throw new SandeshaException (message,e);
+ throw new SandeshaException(message, e);
} finally {
options.setAction(oldAction);
}
}
-
- public static void closeSequence (ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
- //TODO test
-
- Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+
+ public static void closeSequence(ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
+ // TODO test
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
closeSequence(serviceClient);
-
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,oldSequenceKey);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
}
-
+
/**
* This blocks the system until the messages u have sent hv been completed.
*
* @param serviceClient
*/
- public static void waitUntilSequenceCompleted (ServiceClient serviceClient) throws SandeshaException {
- waitUntilSequenceCompleted(serviceClient,-1);
+ public static void waitUntilSequenceCompleted(ServiceClient serviceClient) throws SandeshaException {
+ waitUntilSequenceCompleted(serviceClient, -1);
}
-
- public static void waitUntilSequenceCompleted (ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
+
+ public static void waitUntilSequenceCompleted(ServiceClient serviceClient, String sequenceKey)
+ throws SandeshaException {
Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
waitUntilSequenceCompleted(serviceClient);
-
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,oldSequenceKey);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
}
-
+
/**
- * This blocks the system until the messages u have sent hv been completed or until the
- * given time interval exceeds. (the time is taken in seconds)
+ * This blocks the system until the messages u have sent hv been completed
+ * or until the given time interval exceeds. (the time is taken in seconds)
*
* @param serviceClient
* @param maxWaitingTime
*/
- public static void waitUntilSequenceCompleted (ServiceClient serviceClient, long maxWaitingTime) throws SandeshaException{
-
+ public static void waitUntilSequenceCompleted(ServiceClient serviceClient, long maxWaitingTime)
+ throws SandeshaException {
+
long startTime = System.currentTimeMillis();
-
+
SequenceReport sequenceReport = getOutgoingSequenceReport(serviceClient);
- if (sequenceReport==null) {
- throw new SandeshaException ("Cannnot find a sequence report for the given data");
+ if (sequenceReport == null) {
+ throw new SandeshaException("Cannnot find a sequence report for the given data");
}
-
+
boolean done = false;
- while (!done){
+ while (!done) {
sequenceReport = getOutgoingSequenceReport(serviceClient);
int status = sequenceReport.getSequenceStatus();
- if (status==SequenceReport.SEQUENCE_STATUS_TERMINATED)
+ if (status == SequenceReport.SEQUENCE_STATUS_TERMINATED)
done = true;
- if (status==SequenceReport.SEQUENCE_STATUS_TIMED_OUT)
+ if (status == SequenceReport.SEQUENCE_STATUS_TIMED_OUT)
done = true;
-
- if (maxWaitingTime>=0) {
+
+ if (maxWaitingTime >= 0) {
long timeNow = System.currentTimeMillis();
- if (timeNow> (startTime+maxWaitingTime))
+ if (timeNow > (startTime + maxWaitingTime))
done = true;
}
- }
+ }
}
-
- public static void waitUntilSequenceCompleted (ServiceClient serviceClient, long maxWaitingTime, String sequenceKey) throws SandeshaException{
+
+ public static void waitUntilSequenceCompleted(ServiceClient serviceClient, long maxWaitingTime, String sequenceKey)
+ throws SandeshaException {
Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
- waitUntilSequenceCompleted(serviceClient,maxWaitingTime);
-
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,oldSequenceKey);
- }
-
- //gives the out sequenceID if CS/CSR exchange is done. Otherwise a SandeshaException
- public static String getSequenceID (ServiceClient serviceClient) throws SandeshaException {
-
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+ waitUntilSequenceCompleted(serviceClient, maxWaitingTime);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+ }
+
+ // gives the out sequenceID if CS/CSR exchange is done. Otherwise a
+ // SandeshaException
+ public static String getSequenceID(ServiceClient serviceClient) throws SandeshaException {
+
Options options = serviceClient.getOptions();
- if (options==null)
+ if (options == null)
throw new SandeshaException("Options object is not set");
-
+
EndpointReference toEPR = options.getTo();
- if (toEPR==null)
- throw new SandeshaException ("To EPR is not set");
-
+ if (toEPR == null)
+ throw new SandeshaException("To EPR is not set");
+
String to = toEPR.getAddress();
String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-
+
ServiceContext serviceContext = serviceClient.getServiceContext();
- if (serviceContext==null)
- throw new SandeshaException ("Service context is not set");
-
+ if (serviceContext == null)
+ throw new SandeshaException("Service context is not set");
+
ConfigurationContext configurationContext = serviceContext.getConfigurationContext();
-
- String internalSequenceID = generateInternalSequenceIDForTheClientSide(to,sequenceKey);
-
+
+ String internalSequenceID = generateInternalSequenceIDForTheClientSide(to, sequenceKey);
+
SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(serviceClient);
- if (sequenceReport==null)
- throw new SandeshaException ("Cannot get a sequence report from the given data");
-
- if (sequenceReport.getSequenceStatus()!=SequenceReport.SEQUENCE_STATUS_ESTABLISHED) {
- throw new SandeshaException ("Sequence is not in a active state. Either create sequence response has not being received or sequence has been terminated," +
- " cannot get sequenceID");
+ if (sequenceReport == null)
+ throw new SandeshaException("Cannot get a sequence report from the given data");
+
+ if (sequenceReport.getSequenceStatus() != SequenceReport.SEQUENCE_STATUS_ESTABLISHED) {
+ throw new SandeshaException(
+ "Sequence is not in a active state. Either create sequence response has not being received or sequence has been terminated,"
+ + " cannot get sequenceID");
}
-
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
-
- SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
- if (sequenceIDBean==null)
- throw new SandeshaException ("SequenceIdBean is not set");
-
+
+ SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+ if (sequenceIDBean == null)
+ throw new SandeshaException("SequenceIdBean is not set");
+
String sequenceID = sequenceIDBean.getValue();
return sequenceID;
}
-
- public static void sendAckRequest (ServiceClient serviceClient) throws SandeshaException {
-
- Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+
+ public static void sendAckRequest(ServiceClient serviceClient) throws SandeshaException {
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
ServiceContext serviceContext = serviceClient.getServiceContext();
- if (serviceContext==null)
- throw new SandeshaException ("ServiceContext is null");
-
+ if (serviceContext == null)
+ throw new SandeshaException("ServiceContext is null");
+
ConfigurationContext configContext = serviceContext.getConfigurationContext();
-
+
EndpointReference toEPR = options.getTo();
- if (toEPR==null)
- throw new SandeshaException ("'To' address is not set is not set");
+ if (toEPR == null)
+ throw new SandeshaException("'To' address is not set is not set");
String to = toEPR.getAddress();
String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-
+
String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
- if (rmSpecVersion==null)
+ if (rmSpecVersion == null)
rmSpecVersion = Sandesha2Constants.SPEC_VERSIONS.v1_0;
-
+
if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmSpecVersion)) {
- throw new SandeshaException ("Empty AckRequest messages can only be sent with the v1_1 spec");
+ throw new SandeshaException("Empty AckRequest messages can only be sent with the v1_1 spec");
}
-
- String internalSequenceID = getInternalSequenceID(to,sequenceKey);
-
- SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID,configContext);
- if (sequenceReport==null)
- throw new SandeshaException ("Cannot generate the sequence report for the given internalSequenceID");
- if (sequenceReport.getSequenceStatus()!=SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
- throw new SandeshaException ("Canot send the ackRequest message since it is not active");
-
+
+ String internalSequenceID = getInternalSequenceID(to, sequenceKey);
+
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID, configContext);
+ if (sequenceReport == null)
+ throw new SandeshaException("Cannot generate the sequence report for the given internalSequenceID");
+ if (sequenceReport.getSequenceStatus() != SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
+ throw new SandeshaException("Canot send the ackRequest message since it is not active");
+
String outSequenceID = getSequenceID(serviceClient);
-
+
String soapNamespaceURI = options.getSoapVersionURI();
SOAPFactory factory = null;
SOAPEnvelope dummyEnvelope = null;
if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(soapNamespaceURI)) {
- factory = new SOAP11Factory ();
+ factory = new SOAP11Factory();
dummyEnvelope = factory.getDefaultEnvelope();
- }else {
- factory = new SOAP12Factory ();
+ } else {
+ factory = new SOAP12Factory();
dummyEnvelope = factory.getDefaultEnvelope();
}
-
+
String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
-
- AckRequested ackRequested = new AckRequested (factory,rmNamespaceValue);
- Identifier identifier = new Identifier (factory,rmNamespaceValue);
+
+ AckRequested ackRequested = new AckRequested(factory, rmNamespaceValue);
+ Identifier identifier = new Identifier(factory, rmNamespaceValue);
identifier.setIndentifer(outSequenceID);
ackRequested.setIdentifier(identifier);
-
+
ackRequested.toSOAPEnvelope(dummyEnvelope);
-
- OMElement ackRequestedHeaderBlock = dummyEnvelope.getHeader().getFirstChildWithName(new QName (rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED));
-
+
+ OMElement ackRequestedHeaderBlock = dummyEnvelope.getHeader().getFirstChildWithName(
+ new QName(rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED));
+
String oldAction = options.getAction();
-
+
options.setAction(SpecSpecificConstants.getAckRequestAction(rmSpecVersion));
serviceClient.addHeader(ackRequestedHeaderBlock);
-
+
try {
serviceClient.fireAndForget(null);
} catch (AxisFault e) {
String message = "Could not send the ack request";
- throw new SandeshaException (message,e);
- }
-
- serviceClient.removeHeaders();
- options.setAction(oldAction);
- }
-
- public static void sendAckRequest (ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
- Options options = serviceClient.getOptions();
- if (options==null)
- throw new SandeshaException ("Options object is not set");
-
+ throw new SandeshaException(message, e);
+ }
+
+ serviceClient.removeHeaders();
+ options.setAction(oldAction);
+ }
+
+ public static void sendAckRequest(ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
sendAckRequest(serviceClient);
-
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,oldSequenceKey);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
}
-
- private static String getInternalSequenceID (String to, String sequenceKey) {
- return SandeshaUtil.getInternalSequenceID(to,sequenceKey);
- }
-
- private static SOAPEnvelope configureCloseSequence (Options options,ConfigurationContext configurationContext) throws SandeshaException {
-
- if (options==null)
- throw new SandeshaException ("You must set the Options object before calling this method");
-
+
+ private static String getInternalSequenceID(String to, String sequenceKey) {
+ return SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+ }
+
+ private static SOAPEnvelope configureCloseSequence(Options options, ConfigurationContext configurationContext)
+ throws SandeshaException {
+
+ if (options == null)
+ throw new SandeshaException("You must set the Options object before calling this method");
+
EndpointReference epr = options.getTo();
- if (epr==null)
- throw new SandeshaException ("You must set the toEPR before calling this method");
-
+ if (epr == null)
+ throw new SandeshaException("You must set the toEPR before calling this method");
+
String to = epr.getAddress();
String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-
- String internalSequenceID = SandeshaUtil.getInternalSequenceID(to,sequenceKey);
-
- SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID,configurationContext);
- if (sequenceReport==null)
- throw new SandeshaException ("Cannot generate the sequence report for the given internalSequenceID");
- if (sequenceReport.getSequenceStatus()!=SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
- throw new SandeshaException ("Canot close the sequence since it is not active");
-
+
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID,
+ configurationContext);
+ if (sequenceReport == null)
+ throw new SandeshaException("Cannot generate the sequence report for the given internalSequenceID");
+ if (sequenceReport.getSequenceStatus() != SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
+ throw new SandeshaException("Canot close the sequence since it is not active");
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
- SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
- if (sequenceIDBean==null)
- throw new SandeshaException ("SequenceIdBean is not set");
-
+ SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+ if (sequenceIDBean == null)
+ throw new SandeshaException("SequenceIdBean is not set");
+
String sequenceID = sequenceIDBean.getValue();
- if (sequenceID==null)
- throw new SandeshaException ("Cannot find the sequenceID");
-
+ if (sequenceID == null)
+ throw new SandeshaException("Cannot find the sequenceID");
+
String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
- if (rmSpecVersion==null)
- rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion ();
-
- if (!SpecSpecificConstants.isSequenceClosingAllowed (rmSpecVersion))
- throw new SandeshaException ("This rm version does not allow sequence closing");
-
+ if (rmSpecVersion == null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion();
+
+ if (!SpecSpecificConstants.isSequenceClosingAllowed(rmSpecVersion))
+ throw new SandeshaException("This rm version does not allow sequence closing");
+
SOAPEnvelope dummyEnvelope = null;
SOAPFactory factory = null;
String soapNamespaceURI = options.getSoapVersionURI();
if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(soapNamespaceURI)) {
- factory = new SOAP12Factory ();
+ factory = new SOAP12Factory();
dummyEnvelope = factory.getDefaultEnvelope();
- }else {
- factory = new SOAP11Factory ();
+ } else {
+ factory = new SOAP11Factory();
dummyEnvelope = factory.getDefaultEnvelope();
}
-
+
String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
-
- CloseSequence closeSequence = new CloseSequence (factory,rmNamespaceValue);
- Identifier identifier = new Identifier (factory,rmNamespaceValue);
+
+ CloseSequence closeSequence = new CloseSequence(factory, rmNamespaceValue);
+ Identifier identifier = new Identifier(factory, rmNamespaceValue);
identifier.setIndentifer(sequenceID);
closeSequence.setIdentifier(identifier);
-
+
closeSequence.toSOAPEnvelope(dummyEnvelope);
-
+
return dummyEnvelope;
}
-
- private static boolean isSequenceTerminated (String internalSequenceID, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
- SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean ();
+
+ private static boolean isSequenceTerminated(String internalSequenceID, SequencePropertyBeanMgr seqPropMgr)
+ throws SandeshaException {
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
internalSequenceFindBean.setValue(internalSequenceID);
internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-
+
SequencePropertyBean internalSequenceBean = seqPropMgr.findUnique(internalSequenceFindBean);
- if (internalSequenceBean==null) {
+ if (internalSequenceBean == null) {
String message = "Internal sequence Bean is not available for the given sequence";
- log.debug (message);
-
+ log.debug(message);
+
return false;
}
-
+
String outSequenceID = internalSequenceBean.getSequenceID();
-
- SequencePropertyBean sequenceTerminatedBean = seqPropMgr.retrieve(outSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED);
- if (sequenceTerminatedBean!=null && Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue())) {
+
+ SequencePropertyBean sequenceTerminatedBean = seqPropMgr.retrieve(outSequenceID,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED);
+ if (sequenceTerminatedBean != null && Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue())) {
return true;
}
-
+
return false;
}
-
- private static boolean isSequenceTimedout (String internalSequenceID, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
- SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean ();
+
+ private static boolean isSequenceTimedout(String internalSequenceID, SequencePropertyBeanMgr seqPropMgr)
+ throws SandeshaException {
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
internalSequenceFindBean.setValue(internalSequenceID);
internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-
+
SequencePropertyBean internalSequenceBean = seqPropMgr.findUnique(internalSequenceFindBean);
- if (internalSequenceBean==null) {
+ if (internalSequenceBean == null) {
String message = "Internal sequence Bean is not available for the given sequence";
- log.debug (message);
-
+ log.debug(message);
+
return false;
}
-
+
String outSequenceID = internalSequenceBean.getSequenceID();
- SequencePropertyBean sequenceTerminatedBean = seqPropMgr.retrieve(outSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
- if (sequenceTerminatedBean!=null && Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue())) {
+ SequencePropertyBean sequenceTerminatedBean = seqPropMgr.retrieve(outSequenceID,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
+ if (sequenceTerminatedBean != null && Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue())) {
return true;
}
-
+
return false;
}
-
- private static void fillTerminatedOutgoingSequenceInfo (SequenceReport report,String internalSequenceID,SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
- SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean ();
+
+ private static void fillTerminatedOutgoingSequenceInfo(SequenceReport report, String internalSequenceID,
+ SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
internalSequenceFindBean.setValue(internalSequenceID);
internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-
+
SequencePropertyBean internalSequenceBean = seqPropMgr.findUnique(internalSequenceFindBean);
- if (internalSequenceBean==null) {
+ if (internalSequenceBean == null) {
String message = "Not a valid terminated sequence. Internal sequence Bean is not available for the given sequence";
- log.debug (message);
-
- throw new SandeshaException (message);
+ log.debug(message);
+
+ throw new SandeshaException(message);
}
-
+
report.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_TERMINATED);
-
+
String outSequenceID = internalSequenceBean.getSequenceID();
- fillOutgoingSequenceInfo(report,outSequenceID,seqPropMgr);
+ fillOutgoingSequenceInfo(report, outSequenceID, seqPropMgr);
}
-
- private static void fillTimedoutOutgoingSequenceInfo (SequenceReport report,String internalSequenceID, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
- SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean ();
+
+ private static void fillTimedoutOutgoingSequenceInfo(SequenceReport report, String internalSequenceID,
+ SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
internalSequenceFindBean.setValue(internalSequenceID);
internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-
+
SequencePropertyBean internalSequenceBean = seqPropMgr.findUnique(internalSequenceFindBean);
- if (internalSequenceBean==null) {
+ if (internalSequenceBean == null) {
String message = "Not a valid timedOut sequence. Internal sequence Bean is not available for the given sequence";
- log.debug (message);
-
- throw new SandeshaException (message);
+ log.debug(message);
+
+ throw new SandeshaException(message);
}
-
+
report.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_TIMED_OUT);
String outSequenceID = internalSequenceBean.getSequenceID();
- fillOutgoingSequenceInfo(report,outSequenceID,seqPropMgr);
+ fillOutgoingSequenceInfo(report, outSequenceID, seqPropMgr);
}
-
- private static void fillOutgoingSequenceInfo (SequenceReport report,String outSequenceID, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+
+ private static void fillOutgoingSequenceInfo(SequenceReport report, String outSequenceID,
+ SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
report.setSequenceID(outSequenceID);
-
- ArrayList completedMessageList = AcknowledgementManager.getClientCompletedMessagesList (outSequenceID,seqPropMgr);
-
+
+ ArrayList completedMessageList = AcknowledgementManager.getClientCompletedMessagesList(outSequenceID,
+ seqPropMgr);
+
Iterator iter = completedMessageList.iterator();
while (iter.hasNext()) {
- Long lng = new Long (Long.parseLong((String) iter.next()));
+ Long lng = new Long(Long.parseLong((String) iter.next()));
report.addCompletedMessage(lng);
}
}
-
- private static byte getServerSequenceStatus (String sequenceID,StorageManager storageManager) throws SandeshaException {
-
+
+ private static byte getServerSequenceStatus(String sequenceID, StorageManager storageManager)
+ throws SandeshaException {
+
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
-
- SequencePropertyBean terminatedBean = seqPropMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED);
- if (terminatedBean!=null) {
+
+ SequencePropertyBean terminatedBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED);
+ if (terminatedBean != null) {
return SequenceReport.SEQUENCE_STATUS_TERMINATED;
}
-
- SequencePropertyBean timedOutBean = seqPropMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
- if (timedOutBean!=null) {
+
+ SequencePropertyBean timedOutBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
+ if (timedOutBean != null) {
return SequenceReport.SEQUENCE_STATUS_TIMED_OUT;
}
-
+
NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceID);
-
- if (nextMsgBean!=null) {
+
+ if (nextMsgBean != null) {
return SequenceReport.SEQUENCE_STATUS_ESTABLISHED;
}
-
- throw new SandeshaException ("Unrecorded sequenceID");
+
+ throw new SandeshaException("Unrecorded sequenceID");
}
-
+
private class DummyCallback extends Callback {
public void onComplete(AsyncResult result) {
@@ -789,92 +865,127 @@
public void onError(Exception e) {
// TODO Auto-generated method stub
System.out.println("Error: dummy callback received an error");
-
+
}
-
+
}
-
-
- private static String generateInternalSequenceIDForTheClientSide (String toEPR,String sequenceKey) {
- return SandeshaUtil.getInternalSequenceID(toEPR,sequenceKey);
- }
-
- private static SequenceReport getIncomingSequenceReport (String sequenceID,ConfigurationContext configCtx) throws SandeshaException {
-
+
+ private static String generateInternalSequenceIDForTheClientSide(String toEPR, String sequenceKey) {
+ return SandeshaUtil.getInternalSequenceID(toEPR, sequenceKey);
+ }
+
+ private static SequenceReport getIncomingSequenceReport(String sequenceID, ConfigurationContext configCtx)
+ throws SandeshaException {
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx);
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
-
- SequenceReport sequenceReport = new SequenceReport ();
- ArrayList completedMessageList = AcknowledgementManager.getServerCompletedMessagesList (sequenceID,seqPropMgr);
+ String withinTransactionStr = (String) configCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ boolean withinTransaction = false;
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
+ withinTransaction = true;
+
+ Transaction reportTransaction = null;
+ if (!withinTransaction)
+ reportTransaction = storageManager.getTransaction();
- Iterator iter = completedMessageList.iterator();
- while (iter.hasNext()) {;
- sequenceReport.addCompletedMessage((Long) iter.next());
+ boolean rolebacked = false;
+
+ try {
+
+ SequenceReport sequenceReport = new SequenceReport();
+
+ ArrayList completedMessageList = AcknowledgementManager.getServerCompletedMessagesList(sequenceID,
+ seqPropMgr);
+
+ Iterator iter = completedMessageList.iterator();
+ while (iter.hasNext()) {
+ ;
+ sequenceReport.addCompletedMessage((Long) iter.next());
+ }
+
+ sequenceReport.setSequenceID(sequenceID);
+ sequenceReport.setInternalSequenceID(sequenceID); // for the
+ // incoming side
+ // internalSequenceID=sequenceID
+ sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_IN);
+
+ sequenceReport.setSequenceStatus(getServerSequenceStatus(sequenceID, storageManager));
+
+ return sequenceReport;
+
+ } catch (Exception e) {
+ if (!withinTransaction && reportTransaction!=null) {
+ reportTransaction.rollback();
+ configCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked && reportTransaction!=null) {
+ reportTransaction.commit();
+ configCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ }
}
-
- sequenceReport.setSequenceID(sequenceID);
- sequenceReport.setInternalSequenceID(sequenceID); //for the incoming side internalSequenceID=sequenceID
- sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_IN);
- sequenceReport.setSequenceStatus(getServerSequenceStatus (sequenceID,storageManager));
-
- return sequenceReport;
+ return null;
}
-
- private static SOAPEnvelope configureTerminateSequence (Options options,ConfigurationContext configurationContext) throws SandeshaException {
- if (options==null)
- throw new SandeshaException ("You must set the Options object before calling this method");
-
+ private static SOAPEnvelope configureTerminateSequence(Options options, ConfigurationContext configurationContext)
+ throws SandeshaException {
+
+ if (options == null)
+ throw new SandeshaException("You must set the Options object before calling this method");
+
EndpointReference epr = options.getTo();
- if (epr==null)
- throw new SandeshaException ("You must set the toEPR before calling this method");
-
+ if (epr == null)
+ throw new SandeshaException("You must set the toEPR before calling this method");
+
String to = epr.getAddress();
String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- String internalSequenceID = SandeshaUtil.getInternalSequenceID(to,sequenceKey);
- SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID,configurationContext);
- if (sequenceReport==null)
- throw new SandeshaException ("Cannot generate the sequence report for the given internalSequenceID");
- if (sequenceReport.getSequenceStatus()!=SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
- throw new SandeshaException ("Canot terminate the sequence since it is not active");
-
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID,
+ configurationContext);
+ if (sequenceReport == null)
+ throw new SandeshaException("Cannot generate the sequence report for the given internalSequenceID");
+ if (sequenceReport.getSequenceStatus() != SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
+ throw new SandeshaException("Canot terminate the sequence since it is not active");
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
- SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
- if (sequenceIDBean==null)
- throw new SandeshaException ("SequenceIdBean is not set");
-
+ SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+ if (sequenceIDBean == null)
+ throw new SandeshaException("SequenceIdBean is not set");
+
String sequenceID = sequenceIDBean.getValue();
- if (sequenceID==null)
- throw new SandeshaException ("Cannot find the sequenceID");
-
+ if (sequenceID == null)
+ throw new SandeshaException("Cannot find the sequenceID");
+
String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
- if (rmSpecVersion==null)
- rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion ();
-
- options.setAction(SpecSpecificConstants.getTerminateSequenceAction(rmSpecVersion));
+ if (rmSpecVersion == null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion();
+
+ options.setAction(SpecSpecificConstants.getTerminateSequenceAction(rmSpecVersion));
SOAPEnvelope dummyEnvelope = null;
SOAPFactory factory = null;
String soapNamespaceURI = options.getSoapVersionURI();
if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(soapNamespaceURI)) {
- factory = new SOAP12Factory ();
+ factory = new SOAP12Factory();
dummyEnvelope = factory.getDefaultEnvelope();
- }else {
- factory = new SOAP11Factory ();
+ } else {
+ factory = new SOAP11Factory();
dummyEnvelope = factory.getDefaultEnvelope();
}
-
+
String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
- TerminateSequence terminateSequence = new TerminateSequence (factory,rmNamespaceValue);
- Identifier identifier = new Identifier (factory,rmNamespaceValue);
+ TerminateSequence terminateSequence = new TerminateSequence(factory, rmNamespaceValue);
+ Identifier identifier = new Identifier(factory, rmNamespaceValue);
identifier.setIndentifer(sequenceID);
terminateSequence.setIdentifier(identifier);
terminateSequence.toSOAPEnvelope(dummyEnvelope);
-
+
return dummyEnvelope;
}
-
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Fri May 12 12:16:32 2006
@@ -42,6 +42,7 @@
import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.MsgInitializer;
@@ -49,106 +50,139 @@
import org.apache.sandesha2.wsrm.Sequence;
/**
- * The Global handler of Sandesha2. This is used to perform things that should be done before
- * diapatching such as duplicate detection.
+ * The Global handler of Sandesha2. This is used to perform things that should
+ * be done before diapatching such as duplicate detection.
*
* @author Chamikara Jayalath <ch...@gmail.com>
*/
public class SandeshaGlobalInHandler extends AbstractHandler {
- private static final Log log = LogFactory.getLog(SandeshaGlobalInHandler.class
- .getName());
+ private static final Log log = LogFactory.getLog(SandeshaGlobalInHandler.class.getName());
public void invoke(MessageContext msgContext) throws AxisFault {
ConfigurationContext configContext = msgContext.getConfigurationContext();
- if ( configContext==null)
- throw new AxisFault ("Configuration context is not set");
-
+ if (configContext == null)
+ throw new AxisFault("Configuration context is not set");
+
SOAPEnvelope envelope = msgContext.getEnvelope();
- if (envelope==null)
- throw new SandeshaException ("SOAP envelope is not set");
-
- //processing faults.
- //Had to do this before dispatching. A fault message comes with the relatesTo part. So this will
- //fill the opContext of te req/res message. But RM keeps retransmitting. So RM has to report the
- //error and stop this fault being dispatched as the response message.
-
- SOAPFault faultPart = envelope.getBody().getFault();
-
- if (faultPart!=null) {
- RelatesTo relatesTo = msgContext.getRelatesTo();
- if (relatesTo!=null) {
- String relatesToValue = relatesTo.getValue();
- OperationContext operationContext = configContext.getOperationContext(relatesToValue);
- if (operationContext!=null) {
- MessageContext requestMessage = operationContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
- if (requestMessage!=null) {
- if(SandeshaUtil.isRetriableOnFaults(requestMessage)){
-
- SandeshaListener faultCallback = (SandeshaListener) operationContext.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
- if (faultCallback!=null) {
-
-
- //constructing the fault
- AxisFault axisFault = getAxisFaultFromFromSOAPFault (faultPart);
-
-
- //reporting the fault
- log.error(axisFault);
- if (faultCallback!=null) {
- faultCallback.onError(axisFault);
- }
-
+ if (envelope == null)
+ throw new SandeshaException("SOAP envelope is not set");
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+
+ boolean withinTransaction = false;
+ String withinTransactionStr = (String) msgContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+ withinTransaction = true;
+ }
+
+ Transaction transaction = null;
+ if (!withinTransaction) {
+ transaction = storageManager.getTransaction();
+ msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+ }
+ boolean rolebacked = false;
+
+ try {
+ // processing faults.
+ // Had to do this before dispatching. A fault message comes with the
+ // relatesTo part. So this will
+ // fill the opContext of te req/res message. But RM keeps
+ // retransmitting. So RM has to report the
+ // error and stop this fault being dispatched as the response
+ // message.
+
+ SOAPFault faultPart = envelope.getBody().getFault();
+
+ if (faultPart != null) {
+ RelatesTo relatesTo = msgContext.getRelatesTo();
+ if (relatesTo != null) {
+ String relatesToValue = relatesTo.getValue();
+ OperationContext operationContext = configContext.getOperationContext(relatesToValue);
+ if (operationContext != null) {
+ MessageContext requestMessage = operationContext
+ .getMessageContext(OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
+ if (requestMessage != null) {
+ if (SandeshaUtil.isRetriableOnFaults(requestMessage)) {
+
+ SandeshaListener faultCallback = (SandeshaListener) operationContext
+ .getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
+ if (faultCallback != null) {
+
+ // constructing the fault
+ AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart);
+
+ // reporting the fault
+ log.error(axisFault);
+ if (faultCallback != null) {
+ faultCallback.onError(axisFault);
+ }
+
+ }
+
+ // stopping the fault from going further and
+ // getting dispatched
+ msgContext.pause(); // TODO let this go in the
+ // last try
+
+ return;
}
-
- //stopping the fault from going further and getting dispatched
- msgContext.pause(); //TODO let this go in the last try
- return;
}
}
}
}
- }
-
- //Quitting the message with minimum processing if not intended for RM.
- boolean isRMGlobalMessage = SandeshaUtil.isRMGlobalMessage(msgContext);
- if (!isRMGlobalMessage) {
- return;
- }
- RMMsgContext rmMessageContext = MsgInitializer
- .initializeMessage(msgContext);
+ // Quitting the message with minimum processing if not intended for
+ // RM.
+ boolean isRMGlobalMessage = SandeshaUtil.isRMGlobalMessage(msgContext);
+ if (!isRMGlobalMessage) {
+ return;
+ }
- //Dropping duplicates
- boolean dropped = dropIfDuplicate(rmMessageContext);
- if (dropped) {
- processDroppedMessage(rmMessageContext);
- return;
- }
+ RMMsgContext rmMessageContext = MsgInitializer.initializeMessage(msgContext);
+
+ // Dropping duplicates
+ boolean dropped = dropIfDuplicate(rmMessageContext);
+ if (dropped) {
+ processDroppedMessage(rmMessageContext);
+ return;
+ }
- //Persisting the application messages
-// if (rmMessageContext.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) {
-// SandeshaUtil.PersistMessageContext ()
-// }
-
- //Process if global processing possible. - Currently none
- if (SandeshaUtil.isGloballyProcessableMessageType(rmMessageContext
- .getMessageType())) {
- doGlobalProcessing(rmMessageContext);
+ // Persisting the application messages
+ // if
+ // (rmMessageContext.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION)
+ // {
+ // SandeshaUtil.PersistMessageContext ()
+ // }
+
+ // Process if global processing possible. - Currently none
+ if (SandeshaUtil.isGloballyProcessableMessageType(rmMessageContext.getMessageType())) {
+ doGlobalProcessing(rmMessageContext);
+ }
+
+ } catch (Exception e) {
+ if (!withinTransaction) {
+ transaction.rollback();
+ msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked) {
+ transaction.commit();
+ msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ }
}
}
- private boolean dropIfDuplicate(RMMsgContext rmMsgContext)
- throws SandeshaException {
+ private boolean dropIfDuplicate(RMMsgContext rmMsgContext) throws SandeshaException {
boolean drop = false;
if (rmMsgContext.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
- Sequence sequence = (Sequence) rmMsgContext
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
String sequenceId = null;
if (sequence != null) {
@@ -158,19 +192,14 @@
long msgNo = sequence.getMessageNumber().getMessageNumber();
if (sequenceId != null && msgNo > 0) {
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(rmMsgContext
- .getMessageContext().getConfigurationContext());
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
- SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(
- sequenceId,
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(rmMsgContext.getMessageContext()
+ .getConfigurationContext());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(sequenceId,
Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
if (receivedMsgsBean != null) {
- String receivedMsgStr = (String) receivedMsgsBean
- .getValue();
- ArrayList msgNoArrList = SandeshaUtil
- .getSplittedMsgNoArraylist(receivedMsgStr);
+ String receivedMsgStr = (String) receivedMsgsBean.getValue();
+ ArrayList msgNoArrList = SandeshaUtil.getSplittedMsgNoArraylist(receivedMsgStr);
Iterator iterator = msgNoArrList.iterator();
while (iterator.hasNext()) {
@@ -183,7 +212,7 @@
}
if (drop == false) {
- //Checking for RM specific EMPTY_BODY LASTMESSAGE.
+ // Checking for RM specific EMPTY_BODY LASTMESSAGE.
SOAPBody body = rmMsgContext.getSOAPEnvelope().getBody();
boolean emptyBody = false;
if (body.getChildElements().hasNext() == false) {
@@ -192,59 +221,56 @@
if (emptyBody) {
if (sequence.getLastMessage() != null) {
- log.info ("Empty Body LastMessage Received");
+ log.info("Empty Body LastMessage Received");
drop = true;
if (receivedMsgsBean == null) {
- receivedMsgsBean = new SequencePropertyBean(
- sequenceId,
- Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES,
- "");
+ receivedMsgsBean = new SequencePropertyBean(sequenceId,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
seqPropMgr.insert(receivedMsgsBean);
}
- String receivedMsgStr = (String) receivedMsgsBean
- .getValue();
+ String receivedMsgStr = (String) receivedMsgsBean.getValue();
if (receivedMsgStr != "" && receivedMsgStr != null)
- receivedMsgStr = receivedMsgStr + ","
- + Long.toString(msgNo);
+ receivedMsgStr = receivedMsgStr + "," + Long.toString(msgNo);
else
receivedMsgStr = Long.toString(msgNo);
receivedMsgsBean.setValue(receivedMsgStr);
-
- //TODO correct the syntac into '[received msgs]'
-
+
+ // TODO correct the syntac into '[received msgs]'
+
seqPropMgr.update(receivedMsgsBean);
ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor();
- ackProcessor.sendAckIfNeeded(rmMsgContext,
- receivedMsgStr);
+ ackProcessor.sendAckIfNeeded(rmMsgContext, receivedMsgStr);
}
}
}
}
- } else if (rmMsgContext.getMessageType()!=Sandesha2Constants.MessageTypes.UNKNOWN) {
- //droping other known message types if, an suitable operation context is not available,
- //and if a relates to value is present.
+ } else if (rmMsgContext.getMessageType() != Sandesha2Constants.MessageTypes.UNKNOWN) {
+ // droping other known message types if, an suitable operation
+ // context is not available,
+ // and if a relates to value is present.
RelatesTo relatesTo = rmMsgContext.getRelatesTo();
- if (relatesTo!=null) {
+ if (relatesTo != null) {
String value = relatesTo.getValue();
-
- //TODO do not drop, relationshipTypes other than reply
-
+
+ // TODO do not drop, relationshipTypes other than reply
+
ConfigurationContext configurationContext = rmMsgContext.getMessageContext().getConfigurationContext();
OperationContext operationContextFromMap = configurationContext.getOperationContext(value);
OperationContext operationContext = rmMsgContext.getMessageContext().getOperationContext();
-
-
- //reply messages should be dropped if it cannot be instance dispatched.
- //I.e. both not having a op. ctx not and not having a op. ctx in the global list.
- if (operationContext==null && operationContextFromMap==null) {
+
+ // reply messages should be dropped if it cannot be instance
+ // dispatched.
+ // I.e. both not having a op. ctx not and not having a op. ctx
+ // in the global list.
+ if (operationContext == null && operationContextFromMap == null) {
String message = "Dropping duplicate RM message";
log.debug(message);
- drop=true;
+ drop = true;
}
}
}
@@ -257,58 +283,55 @@
return false;
}
- private void processDroppedMessage(RMMsgContext rmMsgContext)
- throws SandeshaException {
+ private void processDroppedMessage(RMMsgContext rmMsgContext) throws SandeshaException {
if (rmMsgContext.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
- Sequence sequence = (Sequence) rmMsgContext
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
String sequenceId = null;
if (sequence != null) {
sequenceId = sequence.getIdentifier().getIdentifier();
}
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(rmMsgContext.getMessageContext()
- .getConfigurationContext());
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
- SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(
- sequenceId, Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(rmMsgContext.getMessageContext()
+ .getConfigurationContext());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(sequenceId,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
String receivedMsgStr = (String) receivedMsgsBean.getValue();
ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor();
- //Even though the duplicate message is dropped, hv to send the ack if needed.
+ // Even though the duplicate message is dropped, hv to send the ack
+ // if needed.
ackProcessor.sendAckIfNeeded(rmMsgContext, receivedMsgStr);
}
}
- private void doGlobalProcessing(RMMsgContext rmMsgCtx)
- throws SandeshaException {
+ private void doGlobalProcessing(RMMsgContext rmMsgCtx) throws SandeshaException {
switch (rmMsgCtx.getMessageType()) {
case Sandesha2Constants.MessageTypes.ACK:
-
-// //rmMsgCtx.addRelatesTo(null);
-// rmMsgCtx.getMessageContext().getre
-// //Removing the relatesTo part from ackMessageIf present. Some Frameworks tend to send this.
+
+ // //rmMsgCtx.addRelatesTo(null);
+ // rmMsgCtx.getMessageContext().getre
+ // //Removing the relatesTo part from ackMessageIf present. Some
+ // Frameworks tend to send this.
}
}
public QName getName() {
return new QName(Sandesha2Constants.GLOBAL_IN_HANDLER_NAME);
}
-
- private AxisFault getAxisFaultFromFromSOAPFault (SOAPFault faultPart) {
+
+ private AxisFault getAxisFaultFromFromSOAPFault(SOAPFault faultPart) {
SOAPFaultReason reason = faultPart.getReason();
-
+
AxisFault axisFault = null;
- if (reason!=null)
- axisFault = new AxisFault (reason.getText());
+ if (reason != null)
+ axisFault = new AxisFault(reason.getText());
else
- axisFault = new AxisFault ("");
-
+ axisFault = new AxisFault("");
+
return axisFault;
}
-
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Fri May 12 12:16:32 2006
@@ -31,11 +31,14 @@
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.msgprocessors.MsgProcessor;
import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
/**
- * This is invoked in the inFlow of an RM endpoint. This is responsible for selecting an suitable
- * message processor and letting it process the message.
+ * This is invoked in the inFlow of an RM endpoint. This is responsible for
+ * selecting an suitable message processor and letting it process the message.
*
* @author Chamikara Jayalath <ch...@gmail.com>
*/
@@ -49,9 +52,7 @@
}
public void invoke(MessageContext msgCtx) throws AxisFault {
-
-
-
+
ConfigurationContext context = msgCtx.getConfigurationContext();
if (context == null) {
String message = "ConfigurationContext is null";
@@ -59,36 +60,65 @@
throw new AxisFault(message);
}
- String DONE = (String) msgCtx
- .getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+ String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
if (null != DONE && "true".equals(DONE))
return;
- AxisService axisService = msgCtx.getAxisService();
- if (axisService == null) {
- String message = "AxisService is null";
- log.debug(message);
- throw new AxisFault(message);
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
+
+ boolean withinTransaction = false;
+ String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+ withinTransaction = true;
}
- RMMsgContext rmMsgCtx = null;
- try {
- rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
- } catch (SandeshaException ex) {
- String message = "Cant initialize the message";
- log.debug(message);
- throw new AxisFault(message);
+ Transaction transaction = null;
+ if (!withinTransaction) {
+ transaction = storageManager.getTransaction();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
}
-
- MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor (rmMsgCtx);
+ boolean rolebacked = false;
try {
- if (msgProcessor!=null)
- msgProcessor.processInMessage(rmMsgCtx);
- } catch (SandeshaException se) {
- String message = "Error in processing the message";
- log.debug(message);
- throw new AxisFault(message,se);
+
+ AxisService axisService = msgCtx.getAxisService();
+ if (axisService == null) {
+ String message = "AxisService is null";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ RMMsgContext rmMsgCtx = null;
+ try {
+ rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+ } catch (SandeshaException ex) {
+ String message = "Cant initialize the message";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+
+ try {
+ if (msgProcessor != null)
+ msgProcessor.processInMessage(rmMsgCtx);
+ } catch (SandeshaException se) {
+ String message = "Error in processing the message";
+ log.debug(message);
+ throw new AxisFault(message, se);
+ }
+
+ } catch (Exception e) {
+ if (!withinTransaction) {
+ transaction.rollback();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked) {
+ transaction.commit();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org