You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/06/15 07:51:24 UTC
svn commit: r414476 [4/15] - in /webservices/sandesha/trunk: ./ c/ config/
interop/ java/ java/config/ java/interop/ java/interop/conf/
java/interop/src/ java/interop/src/org/ java/interop/src/org/apache/
java/interop/src/org/apache/sandesha2/ java/int...
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,1007 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.sandesha2.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.axiom.soap.impl.llom.soap12.SOAP12Factory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.client.async.AsyncResult;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ServiceContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+/**
+ * Contains all the Sandesha2Constants of Sandesha2. Please see sub-interfaces
+ * to see grouped data.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class SandeshaClient {
+
+ private static final Log log = LogFactory.getLog(SandeshaClient.class);
+
+ /**
+ * Users can get a SequenceReport of the sequence defined by the information
+ * given from the passed serviceClient object.
+ *
+ * @param serviceClient
+ * @return
+ * @throws SandeshaException
+ */
+ public static SequenceReport getOutgoingSequenceReport(ServiceClient serviceClient) throws SandeshaException {
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ EndpointReference toEPR = options.getTo();
+ if (toEPR == null)
+ throw new SandeshaException("'To' address is not set");
+
+ String to = toEPR.getAddress();
+ String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+
+ ServiceContext serviceContext = serviceClient.getServiceContext();
+ if (serviceContext == null)
+ throw new SandeshaException("Service Context is null");
+
+ ConfigurationContext configurationContext = serviceContext.getConfigurationContext();
+
+ String internalSequenceID = getInternalSequenceID(to, sequenceKey);
+
+ return getOutgoingSequenceReport(internalSequenceID, configurationContext);
+ }
+
+ public static SequenceReport getOutgoingSequenceReport(String to, String sequenceKey,
+ ConfigurationContext configurationContext) throws SandeshaException {
+
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+ return getOutgoingSequenceReport(internalSequenceID, configurationContext);
+ }
+
+ public static SequenceReport getOutgoingSequenceReport(String internalSequenceID,
+ ConfigurationContext configurationContext) throws SandeshaException {
+
+ SequenceReport sequenceReport = new SequenceReport();
+ sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_OUT);
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
+
+ String withinTransactionStr = (String) configurationContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ boolean withinTransaction = false;
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
+ withinTransaction = true;
+
+ Transaction reportTransaction = null;
+ if (!withinTransaction)
+ reportTransaction = storageManager.getTransaction();
+
+ boolean rolebacked = false;
+
+ try {
+
+ sequenceReport.setInternalSequenceID(internalSequenceID);
+
+ CreateSeqBean createSeqFindBean = new CreateSeqBean();
+ createSeqFindBean.setInternalSequenceID(internalSequenceID);
+
+ CreateSeqBean createSeqBean = createSeqMgr.findUnique(createSeqFindBean);
+
+ // if data not is available sequence has to be terminated or
+ // timedOut.
+ if (createSeqBean == null) {
+
+ // check weather this is an terminated sequence.
+ if (isSequenceTerminated(internalSequenceID, seqPropMgr)) {
+ fillTerminatedOutgoingSequenceInfo(sequenceReport, internalSequenceID, seqPropMgr);
+
+ return sequenceReport;
+ }
+
+ if (isSequenceTimedout(internalSequenceID, seqPropMgr)) {
+ fillTimedoutOutgoingSequenceInfo(sequenceReport, internalSequenceID, seqPropMgr);
+
+ return sequenceReport;
+ }
+
+ // sequence must hv been timed out before establiching. No other
+ // posibility I can think of.
+ // this does not get recorded since there is no key (which is
+ // normally the sequenceID) to store it.
+ // (properties with key as the internalSequenceID get deleted in
+ // timing out)
+
+ // so, setting the sequence status to INITIAL
+ sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_INITIAL);
+
+ // returning the current sequence report.
+ return sequenceReport;
+ }
+
+ String outSequenceID = createSeqBean.getSequenceID();
+ if (outSequenceID == null) {
+ sequenceReport.setInternalSequenceID(internalSequenceID);
+ sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_INITIAL);
+ sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_OUT);
+
+ return sequenceReport;
+ }
+
+ sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_ESTABLISHED);
+ fillOutgoingSequenceInfo(sequenceReport, outSequenceID, seqPropMgr);
+
+ } catch (Exception e) {
+ if (!withinTransaction && reportTransaction!=null) {
+ reportTransaction.rollback();
+ configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked && reportTransaction!=null) {
+ reportTransaction.commit();
+ configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ }
+ }
+
+ return sequenceReport;
+ }
+
+ /**
+ * Users can get a list of sequenceReports each describing a incoming
+ * sequence, which are the sequences the client work as a RMD.
+ *
+ * @param configCtx
+ * @return
+ * @throws SandeshaException
+ */
+ public static ArrayList getIncomingSequenceReports(ConfigurationContext configCtx) throws SandeshaException {
+
+ SandeshaReport report = getSandeshaReport(configCtx);
+ ArrayList incomingSequenceIDs = report.getIncomingSequenceList();
+ Iterator incomingSequenceIDIter = incomingSequenceIDs.iterator();
+
+ ArrayList incomingSequenceReports = new ArrayList();
+
+ while (incomingSequenceIDIter.hasNext()) {
+ String sequnceID = (String) incomingSequenceIDIter.next();
+ SequenceReport incomingSequenceReport = getIncomingSequenceReport(sequnceID, configCtx);
+ if (incomingSequenceReport == null) {
+ throw new SandeshaException("An incoming sequence report is not present for the given sequenceID");
+ }
+ incomingSequenceReports.add(incomingSequenceReport);
+ }
+
+ return incomingSequenceReports;
+ }
+
+ /**
+ * SandeshaReport gives the details of all incoming and outgoing sequences.
+ * The outgoing sequence have to pass the initial state (CS/CSR exchange) to
+ * be included in a SandeshaReport
+ *
+ * @param configurationContext
+ * @return
+ * @throws SandeshaException
+ */
+ public static SandeshaReport getSandeshaReport(ConfigurationContext configurationContext) throws SandeshaException {
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SandeshaReport sandeshaReport = new SandeshaReport();
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
+
+ String withinTransactionStr = (String) configurationContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ boolean withinTransaction = false;
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
+ withinTransaction = true;
+
+ Transaction reportTransaction = null;
+ if (!withinTransaction)
+ reportTransaction = storageManager.getTransaction();
+
+ boolean rolebacked = false;
+
+ try {
+
+ internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ Collection collection = seqPropMgr.find(internalSequenceFindBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SequencePropertyBean bean = (SequencePropertyBean) iterator.next();
+ String sequenceID = bean.getSequenceID();
+ sandeshaReport.addToOutgoingSequenceList(sequenceID);
+ sandeshaReport.addToOutgoingInternalSequenceMap(sequenceID, bean.getValue());
+
+ SequenceReport report = getOutgoingSequenceReport(bean.getValue(), configurationContext);
+
+ sandeshaReport.addToNoOfCompletedMessagesMap(sequenceID, report.getCompletedMessages().size());
+ sandeshaReport.addToSequenceStatusMap(sequenceID, report.getSequenceStatus());
+ }
+
+ // incoming sequences
+ SequencePropertyBean serverCompletedMsgsFindBean = new SequencePropertyBean();
+ serverCompletedMsgsFindBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+
+ Collection serverCompletedMsgsBeans = seqPropMgr.find(serverCompletedMsgsFindBean);
+ Iterator iter = serverCompletedMsgsBeans.iterator();
+ while (iter.hasNext()) {
+ SequencePropertyBean serverCompletedMsgsBean = (SequencePropertyBean) iter.next();
+ String sequenceID = serverCompletedMsgsBean.getSequenceID();
+ sandeshaReport.addToIncomingSequenceList(sequenceID);
+
+ SequenceReport sequenceReport = getIncomingSequenceReport(sequenceID, configurationContext);
+
+ sandeshaReport.addToNoOfCompletedMessagesMap(sequenceID, sequenceReport.getCompletedMessages().size());
+ sandeshaReport.addToSequenceStatusMap(sequenceID, sequenceReport.getSequenceStatus());
+ }
+
+ } catch (Exception e) {
+ if (!withinTransaction && reportTransaction!=null) {
+ reportTransaction.rollback();
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked && reportTransaction!=null) {
+ reportTransaction.commit();
+ }
+ }
+
+ return sandeshaReport;
+ }
+
+ public static void createSequence(ServiceClient serviceClient, boolean offer) throws SandeshaException {
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ EndpointReference toEPR = serviceClient.getOptions().getTo();
+ if (toEPR == null)
+ throw new SandeshaException("ToEPR is not set");
+
+ String to = toEPR.getAddress();
+ if (to == null)
+ throw new SandeshaException("To EPR is not set");
+
+ if (offer) {
+ String offeredSequenceID = SandeshaUtil.getUUID();
+ options.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID, offeredSequenceID);
+ }
+
+ // setting a new squenceKey if not already set.
+ String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ if (sequenceKey == null) {
+ sequenceKey = SandeshaUtil.getUUID();
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+ }
+
+ options.setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_TRUE);
+
+ try {
+ serviceClient.fireAndForget(null);
+ } catch (AxisFault e) {
+ throw new SandeshaException(e);
+ }
+
+ options.setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_FALSE);
+
+ }
+
+ public static void createSequence(ServiceClient serviceClient, boolean offer, String sequenceKey)
+ throws SandeshaException {
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+
+ createSequence(serviceClient, offer);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+ }
+
+ /**
+ * User can terminate the sequence defined by the passed serviceClient.
+ *
+ * @deprecated
+ */
+ public static void createSequnce(ServiceClient serviceClient, boolean offer, String sequenceKey)
+ throws SandeshaException {
+ createSequence(serviceClient,offer,sequenceKey);
+ }
+
+ /**
+ * User can terminate the sequence defined by the passed serviceClient.
+ *
+ * @param serviceClient
+ * @throws SandeshaException
+ */
+ public static void terminateSequence(ServiceClient serviceClient) throws SandeshaException {
+ ServiceContext serviceContext = serviceClient.getServiceContext();
+ if (serviceContext == null)
+ throw new SandeshaException("ServiceContext is null");
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
+
+ if (rmSpecVersion == null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion();
+
+ String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
+
+ SOAPEnvelope terminateEnvelope = configureTerminateSequence(options, serviceContext.getConfigurationContext());
+ OMElement terminateBody = terminateEnvelope.getBody().getFirstChildWithName(
+ new QName(rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.TERMINATE_SEQUENCE));
+
+ String oldAction = options.getAction();
+ options.setAction(SpecSpecificConstants.getTerminateSequenceAction(rmSpecVersion));
+
+ try {
+ serviceClient.fireAndForget(terminateBody);
+ } catch (AxisFault e) {
+ String message = "Could not send the terminate message";
+ throw new SandeshaException(message, e);
+ } finally {
+ options.setAction(oldAction);
+ }
+ }
+
+ public static void terminateSequence(ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+ terminateSequence(serviceClient);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+ }
+
+ /**
+ * User can close the sequence defined by the passed serviceClient.
+ *
+ * @param serviceClient
+ * @throws SandeshaException
+ */
+ public static void closeSequence(ServiceClient serviceClient) throws SandeshaException {
+ ServiceContext serviceContext = serviceClient.getServiceContext();
+ if (serviceContext == null)
+ throw new SandeshaException("ServiceContext is null");
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
+
+ if (rmSpecVersion == null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion();
+
+ String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
+
+ SOAPEnvelope closeSequnceEnvelope = configureCloseSequence(options, serviceContext.getConfigurationContext());
+ OMElement closeSequenceBody = closeSequnceEnvelope.getBody().getFirstChildWithName(
+ new QName(rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.CLOSE_SEQUENCE));
+
+ String oldAction = options.getAction();
+ options.setAction(SpecSpecificConstants.getCloseSequenceAction(rmSpecVersion));
+ try {
+ serviceClient.fireAndForget(closeSequenceBody);
+ } catch (AxisFault e) {
+ String message = "Could not send the close sequence message";
+ throw new SandeshaException(message, e);
+ } finally {
+ options.setAction(oldAction);
+ }
+ }
+
+ public static void closeSequence(ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
+ // TODO test
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ String specVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
+ if (!Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) {
+ String message = "Close Sequence feature is only available for WSRM 1.1";
+ throw new SandeshaException (message);
+ }
+
+ String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+ closeSequence(serviceClient);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+ }
+
+ /**
+ * This blocks the system until the messages u have sent hv been completed.
+ *
+ * @param serviceClient
+ */
+ public static void waitUntilSequenceCompleted(ServiceClient serviceClient) throws SandeshaException {
+ waitUntilSequenceCompleted(serviceClient, -1);
+ }
+
+ public static void waitUntilSequenceCompleted(ServiceClient serviceClient, String sequenceKey)
+ throws SandeshaException {
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+ waitUntilSequenceCompleted(serviceClient);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+ }
+
+ /**
+ * This blocks the system until the messages u have sent hv been completed
+ * or until the given time interval exceeds. (the time is taken in seconds)
+ *
+ * @param serviceClient
+ * @param maxWaitingTime
+ */
+ public static void waitUntilSequenceCompleted(ServiceClient serviceClient, long maxWaitingTime)
+ throws SandeshaException {
+
+ long startTime = System.currentTimeMillis();
+
+ SequenceReport sequenceReport = getOutgoingSequenceReport(serviceClient);
+ if (sequenceReport == null) {
+ throw new SandeshaException("Cannnot find a sequence report for the given data");
+ }
+
+ boolean done = false;
+ while (!done) {
+ sequenceReport = getOutgoingSequenceReport(serviceClient);
+ int status = sequenceReport.getSequenceStatus();
+ if (status == SequenceReport.SEQUENCE_STATUS_TERMINATED)
+ done = true;
+ if (status == SequenceReport.SEQUENCE_STATUS_TIMED_OUT)
+ done = true;
+
+ if (maxWaitingTime >= 0) {
+ long timeNow = System.currentTimeMillis();
+ if (timeNow > (startTime + maxWaitingTime))
+ done = true;
+ }
+ }
+ }
+
+ public static void waitUntilSequenceCompleted(ServiceClient serviceClient, long maxWaitingTime, String sequenceKey)
+ throws SandeshaException {
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+ waitUntilSequenceCompleted(serviceClient, maxWaitingTime);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+ }
+
+ // gives the out sequenceID if CS/CSR exchange is done. Otherwise a
+ // SandeshaException
+ public static String getSequenceID(ServiceClient serviceClient) throws SandeshaException {
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ EndpointReference toEPR = options.getTo();
+ if (toEPR == null)
+ throw new SandeshaException("To EPR is not set");
+
+ String to = toEPR.getAddress();
+ String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+
+ ServiceContext serviceContext = serviceClient.getServiceContext();
+ if (serviceContext == null)
+ throw new SandeshaException("Service context is not set");
+
+ ConfigurationContext configurationContext = serviceContext.getConfigurationContext();
+
+ String internalSequenceID = generateInternalSequenceIDForTheClientSide(to, sequenceKey);
+
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(serviceClient);
+ if (sequenceReport == null)
+ throw new SandeshaException("Cannot get a sequence report from the given data");
+
+ if (sequenceReport.getSequenceStatus() != SequenceReport.SEQUENCE_STATUS_ESTABLISHED) {
+ throw new SandeshaException(
+ "Sequence is not in a active state. Either create sequence response has not being received or sequence has been terminated,"
+ + " cannot get sequenceID");
+ }
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+ if (sequenceIDBean == null)
+ throw new SandeshaException("SequenceIdBean is not set");
+
+ String sequenceID = sequenceIDBean.getValue();
+ return sequenceID;
+ }
+
+ public static void sendAckRequest(ServiceClient serviceClient) throws SandeshaException {
+
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ ServiceContext serviceContext = serviceClient.getServiceContext();
+ if (serviceContext == null)
+ throw new SandeshaException("ServiceContext is null");
+
+ ConfigurationContext configContext = serviceContext.getConfigurationContext();
+
+ EndpointReference toEPR = options.getTo();
+ if (toEPR == null)
+ throw new SandeshaException("'To' address is not set is not set");
+
+ String to = toEPR.getAddress();
+
+ String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+
+ String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
+ if (rmSpecVersion == null)
+ rmSpecVersion = Sandesha2Constants.SPEC_VERSIONS.v1_0;
+
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmSpecVersion)) {
+ throw new SandeshaException("Empty AckRequest messages can only be sent with the v1_1 spec");
+ }
+
+ String internalSequenceID = getInternalSequenceID(to, sequenceKey);
+
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID, configContext);
+ if (sequenceReport == null)
+ throw new SandeshaException("Cannot generate the sequence report for the given internalSequenceID");
+ if (sequenceReport.getSequenceStatus() != SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
+ throw new SandeshaException("Canot send the ackRequest message since it is not active");
+
+ String outSequenceID = getSequenceID(serviceClient);
+
+ String soapNamespaceURI = options.getSoapVersionURI();
+ SOAPFactory factory = null;
+ SOAPEnvelope dummyEnvelope = null;
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(soapNamespaceURI)) {
+ factory = new SOAP11Factory();
+ dummyEnvelope = factory.getDefaultEnvelope();
+ } else {
+ factory = new SOAP12Factory();
+ dummyEnvelope = factory.getDefaultEnvelope();
+ }
+
+ String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
+
+ AckRequested ackRequested = new AckRequested(factory, rmNamespaceValue);
+ Identifier identifier = new Identifier(factory, rmNamespaceValue);
+ identifier.setIndentifer(outSequenceID);
+ ackRequested.setIdentifier(identifier);
+
+ ackRequested.toSOAPEnvelope(dummyEnvelope);
+
+ OMElement ackRequestedHeaderBlock = dummyEnvelope.getHeader().getFirstChildWithName(
+ new QName(rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED));
+
+ String oldAction = options.getAction();
+
+ options.setAction(SpecSpecificConstants.getAckRequestAction(rmSpecVersion));
+
+ serviceClient.addHeader(ackRequestedHeaderBlock);
+
+ try {
+ serviceClient.fireAndForget(null);
+ } catch (AxisFault e) {
+ String message = "Could not send the ack request";
+ throw new SandeshaException(message, e);
+ }
+
+ serviceClient.removeHeaders();
+ options.setAction(oldAction);
+ }
+
+ public static void sendAckRequest(ServiceClient serviceClient, String sequenceKey) throws SandeshaException {
+ Options options = serviceClient.getOptions();
+ if (options == null)
+ throw new SandeshaException("Options object is not set");
+
+ String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+ sendAckRequest(serviceClient);
+
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+ }
+
+ private static String getInternalSequenceID(String to, String sequenceKey) {
+ return SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+ }
+
+ private static SOAPEnvelope configureCloseSequence(Options options, ConfigurationContext configurationContext)
+ throws SandeshaException {
+
+ if (options == null)
+ throw new SandeshaException("You must set the Options object before calling this method");
+
+ EndpointReference epr = options.getTo();
+ if (epr == null)
+ throw new SandeshaException("You must set the toEPR before calling this method");
+
+ String to = epr.getAddress();
+ String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID,
+ configurationContext);
+ if (sequenceReport == null)
+ throw new SandeshaException("Cannot generate the sequence report for the given internalSequenceID");
+ if (sequenceReport.getSequenceStatus() != SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
+ throw new SandeshaException("Canot close the sequence since it is not active");
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+ if (sequenceIDBean == null)
+ throw new SandeshaException("SequenceIdBean is not set");
+
+ String sequenceID = sequenceIDBean.getValue();
+
+ if (sequenceID == null)
+ throw new SandeshaException("Cannot find the sequenceID");
+
+ String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
+
+ if (rmSpecVersion == null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion();
+
+ if (!SpecSpecificConstants.isSequenceClosingAllowed(rmSpecVersion))
+ throw new SandeshaException("This rm version does not allow sequence closing");
+
+ SOAPEnvelope dummyEnvelope = null;
+ SOAPFactory factory = null;
+ String soapNamespaceURI = options.getSoapVersionURI();
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(soapNamespaceURI)) {
+ factory = new SOAP12Factory();
+ dummyEnvelope = factory.getDefaultEnvelope();
+ } else {
+ factory = new SOAP11Factory();
+ dummyEnvelope = factory.getDefaultEnvelope();
+ }
+
+ String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
+
+ CloseSequence closeSequence = new CloseSequence(factory, rmNamespaceValue);
+ Identifier identifier = new Identifier(factory, rmNamespaceValue);
+ identifier.setIndentifer(sequenceID);
+ closeSequence.setIdentifier(identifier);
+
+ closeSequence.toSOAPEnvelope(dummyEnvelope);
+
+ return dummyEnvelope;
+ }
+
+ private static boolean isSequenceTerminated(String internalSequenceID, SequencePropertyBeanMgr seqPropMgr)
+ throws SandeshaException {
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
+ internalSequenceFindBean.setValue(internalSequenceID);
+ internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+
+ SequencePropertyBean internalSequenceBean = seqPropMgr.findUnique(internalSequenceFindBean);
+ if (internalSequenceBean == null) {
+ String message = "Internal sequence Bean is not available for the given sequence";
+ log.debug(message);
+
+ return false;
+ }
+
+ String outSequenceID = internalSequenceBean.getSequenceID();
+
+ SequencePropertyBean sequenceTerminatedBean = seqPropMgr.retrieve(outSequenceID,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED);
+ if (sequenceTerminatedBean != null && Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue())) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private static boolean isSequenceTimedout(String internalSequenceID, SequencePropertyBeanMgr seqPropMgr)
+ throws SandeshaException {
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
+ internalSequenceFindBean.setValue(internalSequenceID);
+ internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+
+ SequencePropertyBean internalSequenceBean = seqPropMgr.findUnique(internalSequenceFindBean);
+ if (internalSequenceBean == null) {
+ String message = "Internal sequence Bean is not available for the given sequence";
+ log.debug(message);
+
+ return false;
+ }
+
+ String outSequenceID = internalSequenceBean.getSequenceID();
+ SequencePropertyBean sequenceTerminatedBean = seqPropMgr.retrieve(outSequenceID,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
+ if (sequenceTerminatedBean != null && Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue())) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private static void fillTerminatedOutgoingSequenceInfo(SequenceReport report, String internalSequenceID,
+ SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
+ internalSequenceFindBean.setValue(internalSequenceID);
+ internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+
+ SequencePropertyBean internalSequenceBean = seqPropMgr.findUnique(internalSequenceFindBean);
+ if (internalSequenceBean == null) {
+ String message = "Not a valid terminated sequence. Internal sequence Bean is not available for the given sequence";
+ log.debug(message);
+
+ throw new SandeshaException(message);
+ }
+
+ report.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_TERMINATED);
+
+ String outSequenceID = internalSequenceBean.getSequenceID();
+ fillOutgoingSequenceInfo(report, outSequenceID, seqPropMgr);
+ }
+
+ private static void fillTimedoutOutgoingSequenceInfo(SequenceReport report, String internalSequenceID,
+ SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+ SequencePropertyBean internalSequenceFindBean = new SequencePropertyBean();
+ internalSequenceFindBean.setValue(internalSequenceID);
+ internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+
+ SequencePropertyBean internalSequenceBean = seqPropMgr.findUnique(internalSequenceFindBean);
+ if (internalSequenceBean == null) {
+ String message = "Not a valid timedOut sequence. Internal sequence Bean is not available for the given sequence";
+ log.debug(message);
+
+ throw new SandeshaException(message);
+ }
+
+ report.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_TIMED_OUT);
+ String outSequenceID = internalSequenceBean.getSequenceID();
+ fillOutgoingSequenceInfo(report, outSequenceID, seqPropMgr);
+ }
+
+ private static void fillOutgoingSequenceInfo(SequenceReport report, String outSequenceID,
+ SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+ report.setSequenceID(outSequenceID);
+
+ ArrayList completedMessageList = AcknowledgementManager.getClientCompletedMessagesList(outSequenceID,
+ seqPropMgr);
+
+ Iterator iter = completedMessageList.iterator();
+ while (iter.hasNext()) {
+ Long lng = new Long(Long.parseLong((String) iter.next()));
+ report.addCompletedMessage(lng);
+ }
+ }
+
+ private static byte getServerSequenceStatus(String sequenceID, StorageManager storageManager)
+ throws SandeshaException {
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean terminatedBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED);
+ if (terminatedBean != null) {
+ return SequenceReport.SEQUENCE_STATUS_TERMINATED;
+ }
+
+ SequencePropertyBean timedOutBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
+ if (timedOutBean != null) {
+ return SequenceReport.SEQUENCE_STATUS_TIMED_OUT;
+ }
+
+ NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+ NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceID);
+
+ if (nextMsgBean != null) {
+ return SequenceReport.SEQUENCE_STATUS_ESTABLISHED;
+ }
+
+ throw new SandeshaException("Unrecorded sequenceID");
+ }
+
+ private class DummyCallback extends Callback {
+
+ public void onComplete(AsyncResult result) {
+ // TODO Auto-generated method stub
+ System.out.println("Error: dummy callback was called");
+ }
+
+ public void onError(Exception e) {
+ // TODO Auto-generated method stub
+ System.out.println("Error: dummy callback received an error");
+
+ }
+
+ }
+
+ private static String generateInternalSequenceIDForTheClientSide(String toEPR, String sequenceKey) {
+ return SandeshaUtil.getInternalSequenceID(toEPR, sequenceKey);
+ }
+
+ private static SequenceReport getIncomingSequenceReport(String sequenceID, ConfigurationContext configCtx)
+ throws SandeshaException {
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx,configCtx.getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ String withinTransactionStr = (String) configCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ boolean withinTransaction = false;
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr))
+ withinTransaction = true;
+
+ Transaction reportTransaction = null;
+ if (!withinTransaction)
+ reportTransaction = storageManager.getTransaction();
+
+ boolean rolebacked = false;
+
+ try {
+
+ SequenceReport sequenceReport = new SequenceReport();
+
+ ArrayList completedMessageList = AcknowledgementManager.getServerCompletedMessagesList(sequenceID,
+ seqPropMgr);
+
+ Iterator iter = completedMessageList.iterator();
+ while (iter.hasNext()) {
+ ;
+ sequenceReport.addCompletedMessage((Long) iter.next());
+ }
+
+ sequenceReport.setSequenceID(sequenceID);
+ sequenceReport.setInternalSequenceID(sequenceID); // for the
+ // incoming side
+ // internalSequenceID=sequenceID
+ sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_IN);
+
+ sequenceReport.setSequenceStatus(getServerSequenceStatus(sequenceID, storageManager));
+
+ return sequenceReport;
+
+ } catch (Exception e) {
+ if (!withinTransaction && reportTransaction!=null) {
+ reportTransaction.rollback();
+ configCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ }
+ } finally {
+ if (!withinTransaction && !rolebacked && reportTransaction!=null) {
+ reportTransaction.commit();
+ configCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ }
+ }
+
+ return null;
+ }
+
+ private static SOAPEnvelope configureTerminateSequence(Options options, ConfigurationContext configurationContext)
+ throws SandeshaException {
+
+ if (options == null)
+ throw new SandeshaException("You must set the Options object before calling this method");
+
+ EndpointReference epr = options.getTo();
+ if (epr == null)
+ throw new SandeshaException("You must set the toEPR before calling this method");
+
+ String to = epr.getAddress();
+ String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(internalSequenceID,
+ configurationContext);
+ if (sequenceReport == null)
+ throw new SandeshaException("Cannot generate the sequence report for the given internalSequenceID");
+ if (sequenceReport.getSequenceStatus() != SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
+ throw new SandeshaException("Canot terminate the sequence since it is not active");
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+ if (sequenceIDBean == null)
+ throw new SandeshaException("SequenceIdBean is not set");
+
+ String sequenceID = sequenceIDBean.getValue();
+
+ if (sequenceID == null)
+ throw new SandeshaException("Cannot find the sequenceID");
+
+ String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
+ if (rmSpecVersion == null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion();
+
+ options.setAction(SpecSpecificConstants.getTerminateSequenceAction(rmSpecVersion));
+ SOAPEnvelope dummyEnvelope = null;
+ SOAPFactory factory = null;
+ String soapNamespaceURI = options.getSoapVersionURI();
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(soapNamespaceURI)) {
+ factory = new SOAP12Factory();
+ dummyEnvelope = factory.getDefaultEnvelope();
+ } else {
+ factory = new SOAP11Factory();
+ dummyEnvelope = factory.getDefaultEnvelope();
+ }
+
+ String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
+ TerminateSequence terminateSequence = new TerminateSequence(factory, rmNamespaceValue);
+ Identifier identifier = new Identifier(factory, rmNamespaceValue);
+ identifier.setIndentifer(sequenceID);
+ terminateSequence.setIdentifier(identifier);
+ terminateSequence.toSOAPEnvelope(dummyEnvelope);
+
+ return dummyEnvelope;
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClientConstants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClientConstants.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClientConstants.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClientConstants.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.sandesha2.client;
+
+public class SandeshaClientConstants {
+ public static String AcksTo = "Sandesha2AcksTo";
+ public static String LAST_MESSAGE = "Sandesha2LastMessage";
+ public static String OFFERED_SEQUENCE_ID = "Sandesha2OfferedSequenceId";
+ public static String SANDESHA_DEBUG_MODE = "Sandesha2DebugMode";
+ public static String SEQUENCE_KEY = "Sandesha2SequenceKey";
+ public static String MESSAGE_NUMBER = "Sandesha2MessageNumber";
+ public static String RM_SPEC_VERSION = "Sandesha2RMSpecVersion";
+ public static String DUMMY_MESSAGE = "Sandesha2DummyMessage"; //If this property is set, even though this message will invoke the RM handlers, this will not be sent as an actual application message
+ public static String SANDESHA_LISTENER = "Sandesha2Listener";
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaListener.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaListener.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaListener.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaListener.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,43 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.client;
+
+import org.apache.axis2.AxisFault;
+
+/**
+ * By implementing this interface and registering an object with
+ * Sandesha2, users will be invoked in some events.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public interface SandeshaListener {
+
+ /**
+ * This sill be invoked when Sandesha2 receive a fault message
+ * in response to a RM control message that was sent by it.
+ */
+ public void onError(AxisFault fault);
+
+ /**
+ * This will be invoked when a specific sequence time out.
+ * The timing out method depends on policies.
+ */
+ public void onTimeOut(SequenceReport report);
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaReport.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaReport.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaReport.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaReport.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sandesha2.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+public class SandeshaReport {
+
+ private ArrayList incomingSequenceList = null;
+ private ArrayList outgoingSequenceList = null;
+ private HashMap sequenceStatusMap = null;
+ private HashMap noOfCompletedMessagesMap = null;
+ private HashMap outgoingInternalSequenceIDMap = null;
+
+ public SandeshaReport () {
+ incomingSequenceList = new ArrayList ();
+ outgoingSequenceList = new ArrayList ();
+ sequenceStatusMap = new HashMap ();
+ noOfCompletedMessagesMap = new HashMap ();
+ outgoingInternalSequenceIDMap = new HashMap ();
+ }
+
+ public long getCompletedMessagesCount(String sequenceID) {
+ Long lng = (Long) noOfCompletedMessagesMap.get(sequenceID);
+ if (lng==null)
+ return -1;
+
+ return lng.longValue();
+ }
+
+ public ArrayList getIncomingSequenceList() {
+ return incomingSequenceList;
+ }
+
+ public ArrayList getOutgoingSequenceList() {
+ return outgoingSequenceList;
+ }
+
+ public byte getSequenceStatusMap(String sequenceID) {
+ Byte status = (Byte) sequenceStatusMap.get(sequenceID);
+ if (status==null)
+ return SequenceReport.SEQUENCE_STATUS_UNKNOWN;
+
+ return status.byteValue();
+ }
+
+ public void addToIncomingSequenceList (String incomingSequenceID) {
+ incomingSequenceList.add(incomingSequenceID);
+ }
+
+ public void addToOutgoingSequenceList (String outSequenceID) {
+ outgoingSequenceList.add(outSequenceID);
+ }
+
+ public void addToNoOfCompletedMessagesMap (String id, long noOfMsgs) {
+ noOfCompletedMessagesMap.put(id, new Long (noOfMsgs));
+ }
+
+ public void addToSequenceStatusMap (String id, byte status) {
+ sequenceStatusMap.put(id, new Byte (status));
+ }
+
+ public String getInternalSequenceIdOfOutSequence (String outSequenceID) {
+ return (String) outgoingInternalSequenceIDMap.get(outSequenceID);
+ }
+
+ public void addToOutgoingInternalSequenceMap (String outSequenceID, String internalSequenceID) {
+ outgoingInternalSequenceIDMap.put(outSequenceID,internalSequenceID);
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SequenceReport.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SequenceReport.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SequenceReport.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SequenceReport.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.sandesha2.client;
+
+import java.util.ArrayList;
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class SequenceReport {
+
+ public static final byte SEQUENCE_STATUS_UNKNOWN = 0;
+ public static final byte SEQUENCE_STATUS_INITIAL = 1;
+ public static final byte SEQUENCE_STATUS_ESTABLISHED = 2;
+ public static final byte SEQUENCE_STATUS_TERMINATED = 3;
+ public static final byte SEQUENCE_STATUS_TIMED_OUT = 4;
+ private static final byte MAX_SEQUENCE_STATUS = 4;
+
+ public static final byte SEQUENCE_DIRECTION_UNKNOWN=0;
+ public static final byte SEQUENCE_DIRECTION_IN=1;
+ public static final byte SEQUENCE_DIRECTION_OUT=2;
+ private static final byte MAX_SEQUENCE_DIRECTION = 2;
+
+ private byte sequenceStatus = SEQUENCE_STATUS_UNKNOWN;
+ private byte sequenceDirection = SEQUENCE_DIRECTION_UNKNOWN;
+ private String sequenceID = null;
+ private String internalSequenceID = null; //only for outgoing sequences
+ private ArrayList completedMessages = null; //no of messages acked (both for incoming and outgoing)
+
+ public SequenceReport () {
+ completedMessages = new ArrayList ();
+ }
+
+ public void setSequenceStatus (byte sequenceStatus) {
+ if (sequenceStatus>=SEQUENCE_STATUS_UNKNOWN && sequenceStatus<=MAX_SEQUENCE_STATUS) {
+ this.sequenceStatus = sequenceStatus;
+ }
+ }
+
+ public void setSequenceDirection (byte sequenceDirection) {
+ if (sequenceDirection>=SEQUENCE_DIRECTION_UNKNOWN && sequenceDirection<=MAX_SEQUENCE_DIRECTION) {
+ this.sequenceDirection = sequenceDirection;
+ }
+ }
+
+ public byte getSequenceStatus () {
+ return sequenceStatus;
+ }
+
+ public byte getSequenceDirection () {
+ return sequenceDirection;
+ }
+
+ public String getSequenceID() {
+ return sequenceID;
+ }
+
+ public void setSequenceID(String sequenceID) {
+ this.sequenceID = sequenceID;
+ }
+
+ public ArrayList getCompletedMessages () {
+ return completedMessages;
+ }
+
+ public void addCompletedMessage (Long messageNo) {
+ completedMessages.add(messageNo);
+ }
+
+ public void setCompletedMessages (ArrayList completedMessages) {
+ this.completedMessages = completedMessages;
+ }
+
+ public String getInternalSequenceID() {
+ return internalSequenceID;
+ }
+
+ public void setInternalSequenceID(String internalSequenceID) {
+ this.internalSequenceID = internalSequenceID;
+ }
+
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,385 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.handlers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
+import org.apache.axiom.soap.SOAPFaultReason;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * The Global handler of Sandesha2. This is used to perform things that should
+ * be done before diapatching such as duplicate detection.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class SandeshaGlobalInHandler extends AbstractHandler {
+
+ private static final long serialVersionUID = -7187928423123306156L;
+
+ private static final Log log = LogFactory.getLog(SandeshaGlobalInHandler.class.getName());
+
+ public void invoke(MessageContext msgContext) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaGlobalInHandler::msgContext, " + msgContext.getEnvelope().getHeader());
+
+ ConfigurationContext configContext = msgContext.getConfigurationContext();
+ if (configContext == null)
+ throw new AxisFault("Configuration context is not set");
+
+ SOAPEnvelope envelope = msgContext.getEnvelope();
+ if (envelope == null)
+ throw new SandeshaException("SOAP envelope is not set");
+
+ String reinjectedMessage = (String) msgContext.getProperty(Sandesha2Constants.REINJECTED_MESSAGE);
+ if (reinjectedMessage!=null && Sandesha2Constants.VALUE_TRUE.equals(reinjectedMessage))
+ return; //Reinjected messages are not processed by Sandesha2 inflow handlers
+
+ StorageManager storageManager = null;
+ try {
+ storageManager = SandeshaUtil.getSandeshaStorageManager(configContext,configContext.getAxisConfiguration());
+ if (storageManager==null) {
+ log.debug ("Sandesha2 cannot proceed. The StorageManager is not available");
+ return;
+ }
+ } catch (SandeshaException e1) {
+ //TODO make this a log
+ log.debug ("Sandesha2 cannot proceed. Exception thrown when looking for the StorageManager");
+ return;
+ }
+
+ boolean withinTransaction = false;
+ String withinTransactionStr = (String) msgContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+ withinTransaction = true;
+ }
+
+ Transaction transaction = null;
+ if (!withinTransaction) {
+ transaction = storageManager.getTransaction();
+ msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+ }
+ boolean rolebacked = false;
+
+ try {
+ // processing faults.
+ // Had to do this before dispatching. A fault message comes with the
+ // relatesTo part. So this will
+ // fill the opContext of te req/res message. But RM keeps
+ // retransmitting. So RM has to report the
+ // error and stop this fault being dispatched as the response
+ // message.
+
+ SOAPFault faultPart = envelope.getBody().getFault();
+
+ if (faultPart != null) {
+ RelatesTo relatesTo = msgContext.getRelatesTo();
+ if (relatesTo != null) {
+ String relatesToValue = relatesTo.getValue();
+ OperationContext operationContext = configContext.getOperationContext(relatesToValue);
+ if (operationContext != null) {
+ MessageContext requestMessage = operationContext
+ .getMessageContext(OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
+ if (requestMessage != null) {
+ if (SandeshaUtil.isRetriableOnFaults(requestMessage)) {
+
+ SandeshaListener faultCallback = (SandeshaListener) operationContext
+ .getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
+ if (faultCallback != null) {
+
+ // constructing the fault
+ AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart);
+
+ // reporting the fault
+// log.error(axisFault);
+ if (faultCallback != null) {
+ faultCallback.onError(axisFault);
+ }
+
+ }
+
+ // stopping the fault from going further and
+ // getting dispatched
+ msgContext.pause(); // TODO let this go in the
+ // last try
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::msgContext");
+
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ // Quitting the message with minimum processing if not intended for
+ // RM.
+ boolean isRMGlobalMessage = SandeshaUtil.isRMGlobalMessage(msgContext);
+ if (!isRMGlobalMessage) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::msgContext, !isRMGlobalMessage");
+ return;
+ }
+
+ RMMsgContext rmMessageContext = MsgInitializer.initializeMessage(msgContext);
+
+ // Dropping duplicates
+ boolean dropped = dropIfDuplicate(rmMessageContext,storageManager);
+ if (dropped) {
+ processDroppedMessage(rmMessageContext,storageManager);
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::msgContext, dropped");
+ return;
+ }
+
+ // Persisting the application messages
+ // if
+ // (rmMessageContext.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION)
+ // {
+ // SandeshaUtil.PersistMessageContext ()
+ // }
+
+ // Process if global processing possible. - Currently none
+ if (SandeshaUtil.isGloballyProcessableMessageType(rmMessageContext.getMessageType())) {
+ doGlobalProcessing(rmMessageContext);
+ }
+
+ } catch (Exception e) {
+ //message should not be sent in a exception situation.
+ msgContext.pause();
+
+ if (!withinTransaction) {
+ try {
+ transaction.rollback();
+ msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ } catch (Exception e1) {
+ String message = "Exception thrown when trying to roleback the transaction.";
+ log.debug(message,e);
+ }
+ }
+
+ String message = "Sandesha2 got an exception when processing the in message";
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::msgContext " ,e);
+ throw new AxisFault (message,e);
+ } finally {
+ if (!withinTransaction && !rolebacked) {
+ try {
+ transaction.commit();
+ msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ } catch (Exception e) {
+ String message = "Exception thrown when trying to commit the transaction.";
+ log.debug(message,e);
+ }
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::msgContext");
+ }
+
+ private boolean dropIfDuplicate(RMMsgContext rmMsgContext,StorageManager storageManager) throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaGlobalInHandler::dropIfDuplicate");
+
+ boolean drop = false;
+
+ if (rmMsgContext.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+
+ Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceId = null;
+
+ if (sequence != null) {
+ sequenceId = sequence.getIdentifier().getIdentifier();
+ }
+
+ long msgNo = sequence.getMessageNumber().getMessageNumber();
+
+ if (sequenceId != null && msgNo > 0) {
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(sequenceId,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+ if (receivedMsgsBean != null) {
+ String receivedMsgStr = receivedMsgsBean.getValue();
+ ArrayList msgNoArrList = SandeshaUtil.getSplittedMsgNoArraylist(receivedMsgStr);
+
+ Iterator iterator = msgNoArrList.iterator();
+ while (iterator.hasNext()) {
+ String temp = (String) iterator.next();
+ String msgNoStr = new Long(msgNo).toString();
+ if (msgNoStr.equals(temp)) {
+ drop = true;
+ }
+ }
+ }
+
+ if (drop == false) {
+ // Checking for RM specific EMPTY_BODY LASTMESSAGE.
+ SOAPBody body = rmMsgContext.getSOAPEnvelope().getBody();
+ boolean emptyBody = false;
+ if (body.getChildElements().hasNext() == false) {
+ emptyBody = true;
+ }
+
+ if (emptyBody) {
+ if (sequence.getLastMessage() != null) {
+ log.debug ("Empty Body LastMessage Received");
+ drop = true;
+
+ if (receivedMsgsBean == null) {
+ receivedMsgsBean = new SequencePropertyBean(sequenceId,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
+ seqPropMgr.insert(receivedMsgsBean);
+ }
+
+ String receivedMsgStr = (String) receivedMsgsBean.getValue();
+ if (receivedMsgStr != "" && receivedMsgStr != null)
+ receivedMsgStr = receivedMsgStr + "," + Long.toString(msgNo);
+ else
+ receivedMsgStr = Long.toString(msgNo);
+
+ receivedMsgsBean.setValue(receivedMsgStr);
+
+ // TODO correct the syntac into '[received msgs]'
+
+ seqPropMgr.update(receivedMsgsBean);
+
+ ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor();
+ ackProcessor.sendAckIfNeeded(rmMsgContext, receivedMsgStr,storageManager);
+
+ }
+ }
+ }
+ }
+ } else if (rmMsgContext.getMessageType() != Sandesha2Constants.MessageTypes.UNKNOWN) {
+ // droping other known message types if, an suitable operation
+ // context is not available,
+ // and if a relates to value is present.
+ RelatesTo relatesTo = rmMsgContext.getRelatesTo();
+ if (relatesTo != null) {
+ String value = relatesTo.getValue();
+
+ // TODO do not drop, relationshipTypes other than reply
+
+ ConfigurationContext configurationContext = rmMsgContext.getMessageContext().getConfigurationContext();
+ OperationContext operationContextFromMap = configurationContext.getOperationContext(value);
+ OperationContext operationContext = rmMsgContext.getMessageContext().getOperationContext();
+
+ // reply messages should be dropped if it cannot be instance
+ // dispatched.
+ // I.e. both not having a op. ctx not and not having a op. ctx
+ // in the global list.
+ if (operationContext == null && operationContextFromMap == null) {
+ String message = "Dropping duplicate RM message";
+ log.debug(message);
+ drop = true;
+ }
+ }
+ }
+
+ if (drop) {
+ rmMsgContext.getMessageContext().pause();
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::dropIfDuplicate, true");
+ return true;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::dropIfDuplicate, false");
+ return false;
+ }
+
+ private void processDroppedMessage(RMMsgContext rmMsgContext, StorageManager storageManager) throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaGlobalInHandler::processDroppedMessage");
+
+ if (rmMsgContext.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+ Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceId = null;
+
+ if (sequence != null) {
+ sequenceId = sequence.getIdentifier().getIdentifier();
+ }
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(sequenceId,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+ String receivedMsgStr = receivedMsgsBean.getValue();
+
+ ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor();
+ // Even though the duplicate message is dropped, hv to send the ack
+ // if needed.
+ ackProcessor.sendAckIfNeeded(rmMsgContext, receivedMsgStr,storageManager);
+
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::processDroppedMessage");
+ }
+
+ private void doGlobalProcessing(RMMsgContext rmMsgCtx) throws SandeshaException {
+ switch (rmMsgCtx.getMessageType()) {
+ case Sandesha2Constants.MessageTypes.ACK:
+
+ // //rmMsgCtx.addRelatesTo(null);
+ // rmMsgCtx.getMessageContext().getre
+ // //Removing the relatesTo part from ackMessageIf present. Some
+ // Frameworks tend to send this.
+ }
+ }
+
+ public QName getName() {
+ return new QName(Sandesha2Constants.GLOBAL_IN_HANDLER_NAME);
+ }
+
+ private AxisFault getAxisFaultFromFromSOAPFault(SOAPFault faultPart) {
+ AxisFault axisFault = new AxisFault(faultPart.getCode(), faultPart.getReason(),
+ faultPart.getNode(), faultPart.getRole(), faultPart.getDetail());
+
+ return axisFault;
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,161 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.handlers;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.MessageValidator;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * This is invoked in the inFlow of an RM endpoint. This is responsible for
+ * selecting an suitable message processor and letting it process the message.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class SandeshaInHandler extends AbstractHandler {
+
+ private static final long serialVersionUID = 733210926016820857L;
+
+ private static final Log log = LogFactory.getLog(SandeshaInHandler.class.getName());
+
+ public QName getName() {
+ return new QName(Sandesha2Constants.IN_HANDLER_NAME);
+ }
+
+ public void invoke(MessageContext msgCtx) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaInHandler::invoke, " + msgCtx.getEnvelope().getHeader());
+
+ ConfigurationContext context = msgCtx.getConfigurationContext();
+ if (context == null) {
+ String message = "ConfigurationContext is null";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+ if (null != DONE && "true".equals(DONE))
+ return;
+
+ String reinjectedMessage = (String) msgCtx.getProperty(Sandesha2Constants.REINJECTED_MESSAGE);
+ if (reinjectedMessage!=null && Sandesha2Constants.VALUE_TRUE.equals(reinjectedMessage))
+ {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaInHandler::invoke, reinjectedMessage" );
+ return; //Reinjected messages are not processed by Sandesha2 inflow handlers
+ }
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+
+ boolean withinTransaction = false;
+ String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+ withinTransaction = true;
+ }
+
+ Transaction transaction = null;
+ if (!withinTransaction) {
+ transaction = storageManager.getTransaction();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+ }
+ boolean rolebacked = false;
+
+ try {
+
+ AxisService axisService = msgCtx.getAxisService();
+ if (axisService == null) {
+ String message = "AxisService is null";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ RMMsgContext rmMsgCtx = null;
+ try {
+ rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+ } catch (SandeshaException ex) {
+ String message = "Cant initialize the message";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+
+ //validating the message
+ MessageValidator.validateMessage(rmMsgCtx,storageManager);
+
+ MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+
+ try {
+ if (msgProcessor != null)
+ msgProcessor.processInMessage(rmMsgCtx);
+ } catch (SandeshaException se) {
+ String message = "Error in processing the message";
+ log.debug(message, se);
+ throw new AxisFault(message, se);
+ }
+
+ } catch (Exception e) {
+ //message should not be sent in a exception situation.
+ msgCtx.pause();
+
+ if (!withinTransaction) {
+ try {
+ transaction.rollback();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ } catch (Exception e1) {
+ String message = "Exception thrown when trying to roleback the transaction.";
+ log.debug(message,e);
+ }
+ }
+
+ String message = "Sandesha2 got an exception when processing the in message";
+ throw new AxisFault (message,e);
+ } finally {
+ if (!withinTransaction && !rolebacked) {
+ try {
+ transaction.commit();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ } catch (Exception e) {
+ String message = "Exception thrown when trying to commit the transaction.";
+ log.debug(message,e);
+ }
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaInHandler::invoke" );
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,163 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.handlers;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * This is invoked in the outFlow of an RM endpoint
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class SandeshaOutHandler extends AbstractHandler {
+
+ private static final long serialVersionUID = 8261092322051924103L;
+
+ private static final Log log = LogFactory.getLog(SandeshaOutHandler.class.getName());
+
+ public void invoke(MessageContext msgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaOutHandler::invoke, " + msgCtx.getEnvelope().getHeader() );
+
+ ConfigurationContext context = msgCtx.getConfigurationContext();
+ if (context == null) {
+ String message = "ConfigurationContext is null";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ AxisService axisService = msgCtx.getAxisService();
+ if (axisService == null) {
+ String message = "AxisService is null";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+ if (null != DONE && "true".equals(DONE))
+ {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke, Application processing done");
+ return;
+ }
+
+ msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+
+ boolean withinTransaction = false;
+ String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+ withinTransaction = true;
+ }
+
+ Transaction transaction = null;
+ if (!withinTransaction) {
+ transaction = storageManager.getTransaction();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+ }
+ boolean rolebacked = false;
+
+ try {
+ // getting rm message
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+ String dummyMessageString = (String) msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
+ boolean dummyMessage = false;
+ if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
+ dummyMessage = true;
+
+ MsgProcessor msgProcessor = null;
+ int messageType = rmMsgCtx.getMessageType();
+ if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
+ MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(
+ OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if (requestMsgCtx != null) { // for the server side
+ RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
+ Sequence sequencePart = (Sequence) reqRMMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (sequencePart != null)
+ msgProcessor = new ApplicationMsgProcessor();// a rm
+ // intended
+ // message.
+ } else if (!msgCtx.isServerSide()) // if client side.
+ msgProcessor = new ApplicationMsgProcessor();
+ } else {
+ msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+ }
+
+ if (msgProcessor != null)
+ msgProcessor.processOutMessage(rmMsgCtx);
+
+ } catch (Exception e) {
+ //message should not be sent in a exception situation.
+ msgCtx.pause();
+
+ //rolling back the transaction
+ if (!withinTransaction) {
+ try {
+ transaction.rollback();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ } catch (Exception e1) {
+ String message = "Exception thrown when trying to roleback the transaction.";
+ log.debug(message,e);
+ }
+ }
+
+ String message = "Sandesha2 got an exception when processing the out message";
+ throw new AxisFault (message,e);
+ } finally {
+ if (!withinTransaction && !rolebacked) {
+ try {
+ transaction.commit();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ } catch (Exception e) {
+ String message = "Exception thrown when trying to commit the transaction.";
+ log.debug(message,e);
+ }
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke");
+ }
+
+ public QName getName() {
+ return new QName(Sandesha2Constants.OUT_HANDLER_NAME);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org