You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/06/15 07:51:24 UTC
svn commit: r414476 [10/15] - in /webservices/sandesha/trunk: ./ c/ config/
interop/ java/ java/config/ java/interop/ java/interop/conf/
java/interop/src/ java/interop/src/org/ java/interop/src/org/apache/
java/interop/src/org/apache/sandesha2/ java/in...
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,495 @@
+/*
+ * Created on Sep 5, 2005
+ *
+ * TODO To change the template for this generated file go to
+ * Window - Preferences - Java - Code Style - Code Templates
+ */
+package org.apache.sandesha2.util;
+
+import java.util.Collection;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.engine.ListenerManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.wsrm.CreateSequence;
+
+/**
+ * This is used to set up a new sequence, both at the sending side and the receiving side.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class SequenceManager {
+
+ private static Log log = LogFactory.getLog(SequenceManager.class);
+
+ public static String setupNewSequence(RMMsgContext createSequenceMsg,StorageManager storageManager)
+ throws AxisFault {
+
+ String sequenceId = SandeshaUtil.getUUID();
+
+ EndpointReference to = createSequenceMsg.getTo();
+ if (to == null) {
+ String message = "To is null";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ EndpointReference replyTo = createSequenceMsg.getReplyTo();
+
+ CreateSequence createSequence = (CreateSequence) createSequenceMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+ if (createSequence == null) {
+ String message = "Create Sequence Part is null";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ EndpointReference acksTo = createSequence.getAcksTo().getAddress()
+ .getEpr();
+
+ if (acksTo == null) {
+ String message = "AcksTo is null";
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ ConfigurationContext configurationContext = createSequenceMsg.getMessageContext()
+ .getConfigurationContext();
+
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager
+ .getSequencePropretyBeanMgr();
+
+ SequencePropertyBean receivedMsgBean = new SequencePropertyBean(
+ sequenceId, Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
+
+
+ //setting the addressing version
+ String addressingNamespaceValue = createSequenceMsg.getAddressingNamespaceValue();
+ SequencePropertyBean addressingNamespaceBean = new SequencePropertyBean (
+ sequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,addressingNamespaceValue);
+ seqPropMgr.insert(addressingNamespaceBean);
+
+ String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceValue);
+
+ //If no replyTo value. Send responses as sync.
+ SequencePropertyBean toBean = null;
+ if (replyTo!=null) {
+ toBean = new SequencePropertyBean(sequenceId,
+ Sandesha2Constants.SequenceProperties.TO_EPR, replyTo.getAddress());
+ }else {
+ toBean = new SequencePropertyBean(sequenceId,
+ Sandesha2Constants.SequenceProperties.TO_EPR, anonymousURI);
+ }
+
+ SequencePropertyBean replyToBean = new SequencePropertyBean(sequenceId,
+ Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, to.getAddress());
+ SequencePropertyBean acksToBean = new SequencePropertyBean(sequenceId,
+ Sandesha2Constants.SequenceProperties.ACKS_TO_EPR, acksTo.getAddress());
+
+
+ seqPropMgr.insert(receivedMsgBean);
+ seqPropMgr.insert(replyToBean);
+ seqPropMgr.insert(acksToBean);
+
+ if (toBean!=null)
+ seqPropMgr.insert(toBean);
+
+ NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+ nextMsgMgr.insert(new NextMsgBean(sequenceId, 1)); // 1 will be the next
+
+ // message to invoke. This will apply for only in-order invocations.
+
+ SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceId);
+
+ //stting the RM SPEC version for this sequence.
+ String createSequenceMsgAction = createSequenceMsg.getWSAAction();
+ if (createSequenceMsgAction==null)
+ throw new SandeshaException ("Create sequence message does not have the WSA:Action value");
+
+ String messageRMNamespace = createSequence.getNamespaceValue();
+
+ String specVersion = null;
+ if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(messageRMNamespace)) {
+ specVersion = Sandesha2Constants.SPEC_VERSIONS.v1_0;
+ }else if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(messageRMNamespace)) {
+ specVersion = Sandesha2Constants.SPEC_VERSIONS.v1_1;
+ } else {
+ throw new SandeshaException ("Create sequence message does not has a valid RM namespace value. Cant decide the RM version");
+ }
+
+ SequencePropertyBean specVerionBean = new SequencePropertyBean ();
+ specVerionBean.setSequenceID(sequenceId);
+ specVerionBean.setName(Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+ specVerionBean.setValue(specVersion);
+
+ seqPropMgr.insert(specVerionBean);
+
+ //TODO get the SOAP version from the create seq message.
+
+ return sequenceId;
+ }
+
+ public void removeSequence(String sequence) {
+
+ }
+
+ public static void setupNewClientSequence(
+ MessageContext firstAplicationMsgCtx, String internalSequenceId, String specVersion,StorageManager storageManager)
+ throws SandeshaException {
+
+ ConfigurationContext configurationContext = firstAplicationMsgCtx
+ .getConfigurationContext();
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager
+ .getSequencePropretyBeanMgr();
+
+ //setting the addressing version
+ String addressingNamespace = (String) firstAplicationMsgCtx.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+
+ if (addressingNamespace==null) {
+ OperationContext opCtx = firstAplicationMsgCtx.getOperationContext();
+ if (opCtx!=null) {
+ try {
+ MessageContext requestMsg = opCtx.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if (requestMsg!=null)
+ addressingNamespace = (String) requestMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+ }
+ }
+
+ if (addressingNamespace==null)
+ addressingNamespace = AddressingConstants.Final.WSA_NAMESPACE; //defaults to Final. Make sure this is synchronized with addressing.
+
+ SequencePropertyBean addressingNamespaceBean = new SequencePropertyBean (
+ internalSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,addressingNamespace);
+ seqPropMgr.insert(addressingNamespaceBean);
+
+ String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
+
+ EndpointReference toEPR = firstAplicationMsgCtx.getTo();
+ String acksTo = (String) firstAplicationMsgCtx
+ .getProperty(SandeshaClientConstants.AcksTo);
+
+ if (toEPR == null) {
+ String message = "WS-Addressing To is null";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ SequencePropertyBean toBean = new SequencePropertyBean(internalSequenceId,
+ Sandesha2Constants.SequenceProperties.TO_EPR, toEPR.getAddress());
+ SequencePropertyBean replyToBean = null;
+ SequencePropertyBean acksToBean = null;
+
+ if (firstAplicationMsgCtx.isServerSide()) {
+ //setting replyTo value, if this is the server side.
+ OperationContext opContext = firstAplicationMsgCtx.getOperationContext();
+ try {
+ MessageContext requestMessage = opContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if (requestMessage==null) {
+ String message = "Cannot find the request message from the operation context";
+ log.error(message);
+ throw new SandeshaException (message);
+ }
+
+ EndpointReference replyToEPR = requestMessage.getTo(); //'replyTo' of the response msg is the 'to' value of the req msg.
+ if (replyToEPR!=null) {
+ replyToBean = new SequencePropertyBean (internalSequenceId,Sandesha2Constants.SequenceProperties.REPLY_TO_EPR,replyToEPR.getAddress());
+ acksToBean = new SequencePropertyBean (internalSequenceId,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,replyToEPR.getAddress());
+ } else {
+ String message = "To EPR is not present in the request message. Need this information to set acksTo & replyTo value of reply messages";
+ log.error(message);
+ throw new SandeshaException (message);
+ }
+ } catch (AxisFault e) {
+ String message = "Cannot get request message from the operation context";
+ log.error(message);
+ log.error(e.getStackTrace());
+ throw new SandeshaException (message);
+ }
+ }
+ //Default value for acksTo is anonymous (this happens only for the client side)
+ if (acksTo==null) {
+ acksTo = anonymousURI;
+ }
+
+ acksToBean = new SequencePropertyBean(
+ internalSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
+ acksTo);
+
+ //start the in listner for the client side, if acksTo is not anonymous.
+ if (!firstAplicationMsgCtx.isServerSide() && !anonymousURI.equals(acksTo)) {
+
+ String transportInProtocol = firstAplicationMsgCtx.getOptions().getTransportInProtocol();
+ if (transportInProtocol==null) {
+ throw new SandeshaException ("You must mention the transport in protocol for getting async acknowledgement messages");
+ }
+
+ try {
+ ListenerManager listenerManager =
+ firstAplicationMsgCtx.getConfigurationContext().getListenerManager();
+ TransportInDescription transportIn = firstAplicationMsgCtx.getConfigurationContext().getAxisConfiguration().getTransportIn(new QName(transportInProtocol));
+ //if acksTo is not anonymous start the in-transport
+ if (!listenerManager.isListenerRunning(transportIn.getName().getLocalPart())) {
+ listenerManager.addListener(transportIn, false);
+ }
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not stast the transport listner",e);
+ }
+
+
+ }
+
+ SequencePropertyBean msgsBean = new SequencePropertyBean();
+ msgsBean.setSequenceID(internalSequenceId);
+ msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+ msgsBean.setValue("");
+
+ seqPropMgr.insert(msgsBean);
+
+ seqPropMgr.insert(toBean);
+ if (acksToBean!=null)
+ seqPropMgr.insert(acksToBean);
+ if (replyToBean!=null)
+ seqPropMgr.insert(replyToBean);
+
+ //saving transportTo value;
+ String transportTo = (String) firstAplicationMsgCtx.getProperty(MessageContextConstants.TRANSPORT_URL);
+ if (transportTo!=null) {
+ SequencePropertyBean transportToBean = new SequencePropertyBean ();
+ transportToBean.setSequenceID(internalSequenceId);
+ transportToBean.setName(Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+ transportToBean.setValue(transportTo);
+
+ seqPropMgr.insert(transportToBean);
+ }
+
+
+ //setting the spec version for the client side.
+ SequencePropertyBean specVerionBean = new SequencePropertyBean ();
+ specVerionBean.setSequenceID(internalSequenceId);
+ specVerionBean.setName(Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+ specVerionBean.setValue(specVersion);
+ seqPropMgr.insert(specVerionBean);
+
+ //updating the last activated time.
+ updateLastActivatedTime(internalSequenceId,storageManager);
+
+ SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
+
+
+
+ updateClientSideListnerIfNeeded (firstAplicationMsgCtx,anonymousURI);
+
+ }
+
+ private static void updateClientSideListnerIfNeeded (MessageContext messageContext, String addressingAnonymousURI) throws SandeshaException {
+ if (messageContext.isServerSide())
+ return; //listners are updated only for the client side.
+
+ String transportInProtocol = messageContext.getOptions().getTransportInProtocol();
+
+ String acksTo = (String) messageContext.getProperty(SandeshaClientConstants.AcksTo);
+ String mep = messageContext.getAxisOperation().getMessageExchangePattern();
+
+ boolean startListnerForAsyncAcks = false;
+ boolean startListnerForAsyncControlMsgs = false; //For async createSerRes & terminateSeq.
+
+ if (acksTo!=null && !addressingAnonymousURI.equals(acksTo)) {
+ //starting listner for async acks.
+ startListnerForAsyncAcks = true;
+ }
+
+ if (mep!=null && !AxisOperation.MEP_URI_OUT_ONLY.equals(mep)) {
+ //starting listner for the async createSeqResponse & terminateSer messages.
+ startListnerForAsyncControlMsgs = true;
+ }
+
+ try {
+ if ((startListnerForAsyncAcks || startListnerForAsyncControlMsgs) && transportInProtocol==null)
+ throw new SandeshaException ("Cant start the listner since the TransportInProtocol is not set.");
+
+ } catch (AxisFault e) {
+ String message = "Cant start the listner for incoming messages";
+ log.error(e.getStackTrace());
+ throw new SandeshaException (message,e);
+ }
+
+ }
+
+ /**
+ * Takes the internalSeqID as the param. Not the sequenceID.
+ * @param internalSequenceID
+ * @param configContext
+ * @throws SandeshaException
+ */
+ public static void updateLastActivatedTime (String propertyKey, StorageManager storageManager) throws SandeshaException {
+ //Transaction lastActivatedTransaction = storageManager.getTransaction();
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean lastActivatedBean = sequencePropertyBeanMgr.retrieve(propertyKey, Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+
+ boolean added = false;
+
+ if (lastActivatedBean==null) {
+ added = true;
+ lastActivatedBean = new SequencePropertyBean ();
+ lastActivatedBean.setSequenceID(propertyKey);
+ lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+ }
+
+ long currentTime = System.currentTimeMillis();
+ lastActivatedBean.setValue(Long.toString(currentTime));
+
+ if (added)
+ sequencePropertyBeanMgr.insert(lastActivatedBean);
+ else
+ sequencePropertyBeanMgr.update(lastActivatedBean);
+
+ }
+
+
+ public static long getLastActivatedTime (String propertyKey, StorageManager storageManager) throws SandeshaException {
+
+ SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(propertyKey,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+
+ long lastActivatedTime = -1;
+
+ if (lastActivatedBean!=null) {
+ lastActivatedTime = Long.parseLong(lastActivatedBean.getValue());
+ }
+
+ return lastActivatedTime;
+ }
+
+ public static boolean hasSequenceTimedOut (String propertyKey, RMMsgContext rmMsgCtx,StorageManager storageManager) throws SandeshaException {
+
+ //operation is the lowest level, Sandesha2 could be engaged.
+ SandeshaPropertyBean propertyBean = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation());
+
+ if (propertyBean.getInactiveTimeoutInterval()<=0)
+ return false;
+
+ boolean sequenceTimedOut = false;
+
+ long lastActivatedTime = getLastActivatedTime(propertyKey,storageManager);
+ long timeNow = System.currentTimeMillis();
+ if (lastActivatedTime>0 && (lastActivatedTime+propertyBean.getInactiveTimeoutInterval()<timeNow))
+ sequenceTimedOut = true;
+
+ return sequenceTimedOut;
+ }
+
+ public static long getOutGoingSequenceAckedMessageCount (String internalSequenceID,StorageManager storageManager) throws SandeshaException {
+/// Transaction transaction = storageManager.getTransaction();
+ SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+ findSeqIDBean.setValue(internalSequenceID);
+ findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+
+ if (seqIDBeans.size()==0) {
+ String message = "A sequence with give data has not been created";
+ log.debug(message);
+ throw new SandeshaException (message);
+ }
+
+ if (seqIDBeans.size()>1) {
+ String message = "Sequence data is not unique. Cant generate report";
+ log.debug(message);
+ throw new SandeshaException (message);
+ }
+
+ SequencePropertyBean seqIDBean = (SequencePropertyBean) seqIDBeans.iterator().next();
+ String sequenceID = seqIDBean.getSequenceID();
+
+ SequencePropertyBean ackedMsgBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
+ if (ackedMsgBean==null)
+ return 0; //No acknowledgement has been received yet.
+
+ long noOfMessagesAcked = Long.parseLong(ackedMsgBean.getValue());
+/// transaction.commit();
+
+ return noOfMessagesAcked;
+ }
+
+ public static boolean isOutGoingSequenceCompleted (String internalSequenceID,StorageManager storageManager) throws SandeshaException {
+/// Transaction transaction = storageManager.getTransaction();
+ SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+ findSeqIDBean.setValue(internalSequenceID);
+ findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+
+ if (seqIDBeans.size()==0) {
+ String message = "A sequence with give data has not been created";
+ log.debug(message);
+ throw new SandeshaException (message);
+ }
+
+ if (seqIDBeans.size()>1) {
+ String message = "Sequence data is not unique. Cant generate report";
+ log.debug(message);
+ throw new SandeshaException (message);
+ }
+
+ SequencePropertyBean seqIDBean = (SequencePropertyBean) seqIDBeans.iterator().next();
+ String sequenceID = seqIDBean.getSequenceID();
+
+ SequencePropertyBean terminateAddedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ if (terminateAddedBean==null)
+ return false;
+
+ if ("true".equals(terminateAddedBean.getValue()))
+ return true;
+
+/// transaction.commit();
+ return false;
+ }
+
+ public static boolean isIncomingSequenceCompleted (String sequenceID, StorageManager storageManager) throws SandeshaException {
+
+/// Transaction transaction = storageManager.getTransaction();
+ SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean terminateReceivedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
+ boolean complete = false;
+
+ if (terminateReceivedBean!=null && "true".equals(terminateReceivedBean.getValue()))
+ complete = true;
+
+/// transaction.commit();
+ return complete;
+ }
+
+
+}
\ No newline at end of file
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,218 @@
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+
+public class SpecSpecificConstants {
+
+ private static String unknownSpecErrorMessage = "Unknown specification version";
+
+ public static String getSpecVersionString (String namespaceValue) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceValue))
+ return Sandesha2Constants.SPEC_VERSIONS.v1_0;
+ else if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceValue))
+ return Sandesha2Constants.SPEC_VERSIONS.v1_1;
+ else
+ throw new SandeshaException ("Unknows rm namespace value");
+ }
+
+ public static String getRMNamespaceValue (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.NS_URI;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.NS_URI;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getCreateSequenceAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CREATE_SEQUENCE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getCreateSequenceResponseAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE_RESPONSE;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CREATE_SEQUENCE_RESPONSE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getTerminateSequenceAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.Actions.ACTION_TERMINATE_SEQUENCE;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_TERMINATE_SEQUENCE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getTerminateSequenceResponseAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_TERMINATE_SEQUENCE_RESPONSE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getCloseSequenceAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ throw new SandeshaException ("This rm spec version does not define a sequenceClose action");
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CLOSE_SEQUENCE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getCloseSequenceResponseAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ throw new SandeshaException ("This rm spec version does not define a sequenceClose action");
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CLOSE_SEQUENCE_RESPONSE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getAckRequestAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ throw new SandeshaException ("this spec version does not define a ackRequest action");
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_ACK_REQUEST;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getSequenceAcknowledgementAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getCreateSequenceSOAPAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_CREATE_SEQUENCE;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_CREATE_SEQUENCE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getCreateSequenceResponseSOAPAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getTerminateSequenceSOAPAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_TERMINATE_SEQUENCE;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_TERMINATE_SEQUENCE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getTerminateSequenceResponseSOAPAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_TERMINATE_SEQUENCE_RESPONSE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getAckRequestSOAPAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ throw new SandeshaException ("this spec version does not define a ackRequest SOAP action");
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_ACK_REQUEST;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getSequenceAcknowledgementSOAPAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_SEQUENCE_ACKNOWLEDGEMENT;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_SEQUENCE_ACKNOWLEDGEMENT;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isTerminateSequenceResponseRequired (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return false;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return true;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isLastMessageIndicatorRequired (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return true;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return false;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isAckFinalAllowed (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return false;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return true;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isAckNoneAllowed (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return false;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return true;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isSequenceClosingAllowed (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
+ return false;
+ else if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
+ return true;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getDefaultSpecVersion () {
+ return Sandesha2Constants.SPEC_VERSIONS.v1_0;
+ }
+
+ public static String getAddressingAnonymousURI (String addressingNSURI) throws SandeshaException {
+ if (AddressingConstants.Submission.WSA_NAMESPACE.equals(addressingNSURI))
+ return AddressingConstants.Submission.WSA_ANONYMOUS_URL;
+ else if (AddressingConstants.Final.WSA_NAMESPACE.equals(addressingNSURI))
+ return AddressingConstants.Final.WSA_ANONYMOUS_URL;
+ else
+ throw new SandeshaException ("Unknown addressing version");
+ }
+
+ public static String getAddressingFaultAction (String addressingNSURI) throws SandeshaException {
+ if (AddressingConstants.Submission.WSA_NAMESPACE.equals(addressingNSURI))
+ return "http://schemas.xmlsoap.org/ws/2004/08/addressing/fault"; //this is not available in addressing constants )-:
+ else if (AddressingConstants.Final.WSA_NAMESPACE.equals(addressingNSURI))
+ return AddressingConstants.Final.WSA_FAULT_ACTION;
+ else
+ throw new SandeshaException ("Unknown addressing version");
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,441 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+
+/**
+ * Contains logic to remove all the storad data of a sequence.
+ * Methods of this are called by sending side and the receiving side when appropriate
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class TerminateManager {
+
+ private static Log log = LogFactory.getLog(TerminateManager.class);
+
+ private static String CLEANED_ON_TERMINATE_MSG = "CleanedOnTerminateMsg";
+ private static String CLEANED_AFTER_INVOCATION = "CleanedAfterInvocation";
+
+ public static HashMap receivingSideCleanMap = new HashMap ();
+ /**
+ * Called by the receiving side to remove data related to a sequence.
+ * e.g. After sending the TerminateSequence message. Calling this methods will complete all
+ * the data if InOrder invocation is not sequired.
+ *
+ * @param configContext
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void cleanReceivingSideOnTerminateMessage (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException {
+ //clean senderMap
+
+ //Currently in-order invocation is done for default values.
+ boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(configContext.getAxisConfiguration()).isInOrder();
+
+ if(!inOrderInvocation) {
+ //there is no invoking by Sandesha2. So clean invocations storages.
+ cleanReceivingSideAfterInvocation(configContext,sequenceID,storageManager);
+ }
+
+ String cleanStatus = (String) receivingSideCleanMap.get(sequenceID);
+ if (cleanStatus!=null && CLEANED_AFTER_INVOCATION.equals(cleanStatus))
+ completeTerminationOfReceivingSide(configContext,sequenceID,storageManager);
+ else {
+ receivingSideCleanMap.put(sequenceID,CLEANED_ON_TERMINATE_MSG);
+ }
+ }
+
+ /**
+ * When InOrder invocation is anabled this had to be called to clean the data left by the
+ * above method. This had to be called after the Invocation of the Last Message.
+ *
+ * @param configContext
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void cleanReceivingSideAfterInvocation (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException {
+ InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
+
+ //removing storageMap entries
+ InvokerBean findStorageMapBean = new InvokerBean ();
+ findStorageMapBean.setSequenceID(sequenceID);
+ findStorageMapBean.setInvoked(true);
+ Collection collection = storageMapBeanMgr.find(findStorageMapBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ InvokerBean storageMapBean = (InvokerBean) iterator.next();
+ storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
+
+ //removing the respective message context from the message store. If this is an in-only message.
+ //In-out message will be deleted when a ack is retrieved for the out message.
+ String messageStoreKey = storageMapBean.getMessageContextRefKey();
+ storageManager.removeMessageContext(messageStoreKey);
+
+ }
+
+ String cleanStatus = (String) receivingSideCleanMap.get(sequenceID);
+ if (cleanStatus!=null && CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
+ completeTerminationOfReceivingSide(configContext,sequenceID,storageManager);
+ else {
+ receivingSideCleanMap.put(sequenceID,CLEANED_AFTER_INVOCATION);
+ }
+ }
+
+ /**
+ * This has to be called by the lastly invocated one of the above two methods.
+ *
+ */
+ private static void completeTerminationOfReceivingSide (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException {
+ InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
+ NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+
+ //removing nextMsgMgr entries
+ NextMsgBean findNextMsgBean = new NextMsgBean ();
+ findNextMsgBean.setSequenceID(sequenceID);
+ Collection collection = nextMsgBeanMgr.find(findNextMsgBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
+// nextMsgBeanMgr.delete(nextMsgBean.getSequenceID());
+ }
+
+ //removing the HighestInMessage entry.
+ String highestInMessageKey = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,storageManager);
+ if (highestInMessageKey!=null) {
+ storageManager.removeMessageContext(highestInMessageKey);
+ }
+
+ removeReceivingSideProperties(configContext,sequenceID,storageManager);
+ }
+
+ private static void removeReceivingSideProperties (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException {
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean allSequenceBean = sequencePropertyBeanMgr.retrieve(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (allSequenceBean!=null) {
+ log.debug("AllSequence bean is null");
+
+ ArrayList allSequenceList = SandeshaUtil.getArrayListFromString(allSequenceBean.getValue());
+ allSequenceList.remove(sequenceID);
+
+ //updating
+ allSequenceBean.setValue(allSequenceList.toString());
+ sequencePropertyBeanMgr.update(allSequenceBean);
+ }
+ }
+
+ private static boolean isRequiredForResponseSide (String name) {
+ if (name==null && name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO))
+ return false;
+
+ if (name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO))
+ return false;
+
+ return false;
+ }
+
+
+ /**
+ * This is called by the sending side to clean data related to a sequence.
+ * e.g. after sending the TerminateSequence message.
+ *
+ * @param configContext
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void terminateSendingSide (ConfigurationContext configContext, String internalSequenceID,boolean serverSide,StorageManager storageManager) throws SandeshaException {
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
+ seqPropMgr.insert(seqTerminatedBean);
+
+ cleanSendingSideData(configContext,internalSequenceID,serverSide,storageManager);
+ }
+
+ private static void doUpdatesIfNeeded (String sequenceID, SequencePropertyBean propertyBean, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+
+ boolean addEntryWithSequenceID = false;
+
+ if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES)) {
+ addEntryWithSequenceID = true;
+ }
+
+ if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED)) {
+ addEntryWithSequenceID = true;
+ }
+
+ if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED)) {
+ addEntryWithSequenceID = true;
+ }
+
+ if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT)) {
+ addEntryWithSequenceID = true;
+ }
+
+ if (addEntryWithSequenceID && sequenceID!=null) {
+ //this value cannot be completely deleted since this data will be needed by SequenceReports
+ //so saving it with the sequenceID value being the out sequenceID.
+
+ SequencePropertyBean newBean = new SequencePropertyBean ();
+ newBean.setSequenceID(sequenceID);
+ newBean.setName(propertyBean.getName());
+ newBean.setValue(propertyBean.getValue());
+
+ seqPropMgr.insert(newBean);
+ //TODO amazingly this property does not seem to get deleted without following - in the hibernate impl
+ //(even though the lines efter current methodcall do this).
+ seqPropMgr.delete (propertyBean.getSequenceID(),propertyBean.getName());
+ }
+ }
+
+ private static boolean isProportyDeletable (String name) {
+ boolean deleatable = true;
+
+ if (Sandesha2Constants.SequenceProperties.TERMINATE_ADDED.equals(name))
+ deleatable = false;
+
+ if (Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED.equals(name))
+ deleatable = false;
+
+ if (Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
+ deleatable = false;
+
+// if (Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
+// deleatable = false;
+
+ if (Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED.equals(name))
+ deleatable = false;
+
+ if (Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED.equals(name))
+ deleatable = false;
+
+ if (Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT.equals(name))
+ deleatable = false;
+
+ return deleatable;
+ }
+
+ public static void timeOutSendingSideSequence (ConfigurationContext context,String internalSequenceID, boolean serverside,StorageManager storageManager) throws SandeshaException {
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT,Sandesha2Constants.VALUE_TRUE);
+ seqPropMgr.insert(seqTerminatedBean);
+
+ cleanSendingSideData(context,internalSequenceID,serverside,storageManager);
+ }
+
+ private static void cleanSendingSideData (ConfigurationContext configContext,String internalSequenceID, boolean serverSide,StorageManager storageManager) throws SandeshaException {
+
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+ SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
+ CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
+
+ String outSequenceID = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,storageManager);
+
+ if (!serverSide) {
+ boolean stopListnerForAsyncAcks = false;
+ SequencePropertyBean acksToBean = sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+ String addressingNamespace = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager);
+ String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
+
+ if (acksToBean!=null) {
+ String acksTo = acksToBean.getValue();
+ if (acksTo!=null && !anonymousURI.equals(acksTo)) {
+ stopListnerForAsyncAcks = true;
+ }
+ }
+ }
+
+ //removing retransmitterMgr entries and corresponding message contexts.
+ Collection collection = retransmitterBeanMgr.find(internalSequenceID);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SenderBean retransmitterBean = (SenderBean) iterator.next();
+ retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+ String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+ storageManager.removeMessageContext(messageStoreKey);
+ }
+
+ //removing the createSeqMgrEntry
+ CreateSeqBean findCreateSequenceBean = new CreateSeqBean ();
+ findCreateSequenceBean.setInternalSequenceID(internalSequenceID);
+ collection = createSeqBeanMgr.find(findCreateSequenceBean);
+ iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ CreateSeqBean createSeqBean = (CreateSeqBean) iterator.next();
+ createSeqBeanMgr.delete(createSeqBean.getCreateSeqMsgID());
+ }
+
+ //removing sequence properties
+ SequencePropertyBean findSequencePropertyBean1 = new SequencePropertyBean ();
+ findSequencePropertyBean1.setSequenceID(internalSequenceID);
+ collection = sequencePropertyBeanMgr.find(findSequencePropertyBean1);
+ iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+ doUpdatesIfNeeded (outSequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
+
+ //TODO all properties which hv the temm:Seq:id as the key should be deletable.
+ if (isProportyDeletable(sequencePropertyBean.getName())) {
+ sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
+ }
+ }
+ }
+
+ public static void addTerminateSequenceMessage(RMMsgContext referenceMessage,
+ String outSequenceId, String internalSequenceId,StorageManager storageManager)
+ throws SandeshaException {
+
+ ConfigurationContext configurationContext = referenceMessage.getMessageContext().getConfigurationContext();
+
+/// Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager
+ .getSequencePropretyBeanMgr();
+
+ SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceId,
+ Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+
+ if (terminated != null && terminated.getValue() != null
+ && "true".equals(terminated.getValue())) {
+ String message = "Terminate was added previously.";
+ log.debug (message);
+ }
+
+ RMMsgContext terminateRMMessage = RMMsgCreator
+ .createTerminateSequenceMessage(referenceMessage, outSequenceId,internalSequenceId,storageManager);
+ terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+ terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+ SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceId,
+ Sandesha2Constants.SequenceProperties.TO_EPR);
+
+ EndpointReference toEPR = new EndpointReference ( toBean.getValue());
+ if (toEPR == null) {
+ String message = "To EPR has an invalid value";
+ throw new SandeshaException(message);
+ }
+
+ terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
+
+ String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager);
+ String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
+
+ String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,storageManager);
+ if (rmVersion==null)
+ throw new SandeshaException ("Cant find the rmVersion of the given message");
+ terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+ terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+ SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+ if (transportToBean!=null) {
+ terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
+ }
+
+ try {
+ terminateRMMessage.addSOAPEnvelope();
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean terminateBean = new SenderBean();
+ terminateBean.setMessageContextRefKey(key);
+
+
+ storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext());
+
+ //Set a retransmitter lastSentTime so that terminate will be send with
+ // some delay.
+ //Otherwise this get send before return of the current request (ack).
+ //TODO: refine the terminate delay.
+ terminateBean.setTimeToSend(System.currentTimeMillis()
+ + Sandesha2Constants.TERMINATE_DELAY);
+
+ terminateBean.setMessageID(terminateRMMessage.getMessageId());
+
+ //this will be set to true at the sender.
+ terminateBean.setSend(true);
+
+ terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+ Sandesha2Constants.VALUE_FALSE);
+
+ terminateBean.setReSend(false);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+ retramsmitterMgr.insert(terminateBean);
+
+ SequencePropertyBean terminateAdded = new SequencePropertyBean();
+ terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ terminateAdded.setSequenceID(outSequenceId);
+ terminateAdded.setValue("true");
+
+ seqPropMgr.insert(terminateAdded);
+
+ //This should be dumped to the storage by the sender
+ TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
+ terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+ terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+ terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+ terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
+/// addTerminateSeqTransaction.commit();
+
+ AxisEngine engine = new AxisEngine (configurationContext);
+ try {
+ engine.send(terminateRMMessage.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException (e.getMessage());
+ }
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,264 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * This is used when InOrder invocation is required. This is a seperated Thread that keep running
+ * all the time. At each iteration it checks the InvokerTable to find weather there are any messages to
+ * me invoked.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class InOrderInvoker extends Thread {
+
+ private boolean runInvoker = false;
+ private ArrayList workingSequences = new ArrayList();
+ private ConfigurationContext context = null;
+ private static final Log log = LogFactory.getLog(InOrderInvoker.class);
+
+ public synchronized void stopInvokerForTheSequence(String sequenceID) {
+ workingSequences.remove(sequenceID);
+ if (workingSequences.size()==0) {
+ runInvoker = false;
+ }
+ }
+
+ public synchronized void stopInvoking () {
+ runInvoker = false;
+ }
+
+ public synchronized boolean isInvokerStarted() {
+ return runInvoker;
+ }
+
+ public void setConfugurationContext(ConfigurationContext context) {
+ this.context = context;
+ }
+
+ public synchronized void runInvokerForTheSequence(ConfigurationContext context, String sequenceID) {
+
+ if (!workingSequences.contains(sequenceID))
+ workingSequences.add(sequenceID);
+
+ if (!isInvokerStarted()) {
+ this.context = context;
+ runInvoker = true; //so that isSenderStarted()=true.
+ super.start();
+ }
+ }
+
+ public void run() {
+
+ while (isInvokerStarted()) {
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ log.debug("Invoker was Inturrepted....");
+ log.debug(ex.getMessage());
+ }
+
+ Transaction transaction = null;
+ boolean rolebacked = false;
+
+ try {
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+ NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+
+ InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
+
+ SequencePropertyBeanMgr sequencePropMgr = storageManager
+ .getSequencePropretyBeanMgr();
+
+ transaction = storageManager.getTransaction();
+
+ //Getting the incomingSequenceIdList
+ SequencePropertyBean allSequencesBean = (SequencePropertyBean) sequencePropMgr
+ .retrieve(
+ Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+ Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (allSequencesBean == null) {
+ continue;
+ }
+ ArrayList allSequencesList = SandeshaUtil.getArrayListFromString (allSequencesBean.getValue());
+
+ Iterator allSequencesItr = allSequencesList.iterator();
+
+ currentIteration: while (allSequencesItr.hasNext()) {
+ String sequenceId = (String) allSequencesItr.next();
+
+ //commiting the old transaction
+ transaction.commit();
+
+ //starting a new transaction for the new iteration.
+ transaction = storageManager.getTransaction();
+
+ NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+ if (nextMsgBean == null) {
+ String message = "Next message not set correctly. Removing invalid entry.";
+ log.debug(message);
+ allSequencesItr.remove();
+
+ //cleaning the invalid data of the all sequences.
+ allSequencesBean.setValue(allSequencesList.toString());
+ sequencePropMgr.update(allSequencesBean);
+ continue;
+ }
+
+ long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
+ if (nextMsgno <= 0) {
+ String message = "Invalid message number as the Next Message Number.";
+ throw new SandeshaException(message);
+ }
+
+ Iterator stMapIt = storageMapMgr.find(
+ new InvokerBean(null, nextMsgno, sequenceId))
+ .iterator();
+
+ boolean invoked = false;
+
+ while (stMapIt.hasNext()) {
+
+ InvokerBean stMapBean = (InvokerBean) stMapIt.next();
+ String key = stMapBean.getMessageContextRefKey();
+
+ MessageContext msgToInvoke = storageManager.retrieveMessageContext(key,context);
+ RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
+
+ //have to commit the transaction before invoking. This may get changed when WS-AT is available.
+ transaction.commit();
+
+ try {
+ //Invoking the message.
+ msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+
+ boolean postFailureInvocation = false;
+
+ //StorageManagers should st following property to true, to indicate that the message received comes after a failure.
+ String postFaulureProperty = (String) msgToInvoke.getProperty(Sandesha2Constants.POST_FAILURE_MESSAGE);
+ if (postFaulureProperty!=null && Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
+ postFailureInvocation = true;
+
+ AxisEngine engine = new AxisEngine (context);
+ if (postFailureInvocation) {
+ makeMessageReadyForReinjection (msgToInvoke);
+ engine.receive(msgToInvoke);
+ } else {
+ engine.resume(msgToInvoke);
+ }
+
+ invoked = true;
+
+ } catch (Exception e) {
+ throw new SandeshaException(e);
+ } finally {
+ transaction = storageManager.getTransaction();
+ }
+
+ //Service will be invoked only once. I.e. even if an exception get thrown in invocation
+ //the service will not be invoked again.
+ storageMapMgr.delete(key);
+
+ //removing the corresponding message context as well.
+ MessageContext msgCtx = storageManager.retrieveMessageContext(key,context);
+ if (msgCtx!=null) {
+ storageManager.removeMessageContext(key);
+ }
+
+
+ //undating the next msg to invoke
+
+ if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+ Sequence sequence = (Sequence) rmMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (sequence.getLastMessage() != null) {
+ TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId,storageManager);
+ //exit from current iteration. (since an entry was removed)
+ break currentIteration;
+ }
+ }
+ }
+
+ if (invoked) {
+ nextMsgno++;
+ nextMsgBean.setNextMsgNoToProcess(nextMsgno);
+ nextMsgMgr.update(nextMsgBean);
+ }
+ }
+
+ } catch (Exception e) {
+ if (transaction!=null) {
+ try {
+ transaction.rollback();
+ rolebacked = true;
+ } catch (Exception e1) {
+ String message = "Exception thrown when trying to roleback the transaction.";
+ log.debug(message,e1);
+ }
+ }
+ String message = "Sandesha2 got an exception when trying to invoke the message";
+ log.debug(message,e);
+ } finally {
+ if (!rolebacked && transaction!=null) {
+ try {
+ transaction.commit();
+ } catch (Exception e) {
+ String message = "Exception thrown when trying to commit the transaction.";
+ log.debug(message,e);
+ }
+ }
+ }
+ }
+ }
+
+ private void makeMessageReadyForReinjection (MessageContext messageContext) {
+ messageContext.setProperty(AddressingConstants.WS_ADDRESSING_VERSION,null);
+ messageContext.getOptions().setMessageId(null);
+ messageContext.getOptions().setTo(null);
+ messageContext.getOptions().setAction(null);
+ messageContext.setProperty(Sandesha2Constants.REINJECTED_MESSAGE,Sandesha2Constants.VALUE_TRUE);
+ }
+}
\ No newline at end of file
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,387 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+/**
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class Sender extends Thread {
+
+ private boolean runSender = false;
+ private ArrayList workingSequences = new ArrayList();
+ private ConfigurationContext context = null;
+ private static final Log log = LogFactory.getLog(Sender.class);
+
+ public synchronized void stopSenderForTheSequence(String sequenceID) {
+ workingSequences.remove(sequenceID);
+ if (workingSequences.size() == 0) {
+ runSender = false;
+ }
+ }
+
+ public synchronized void stopSending () {
+ runSender = false;
+ }
+
+ public synchronized boolean isSenderStarted() {
+ return runSender;
+ }
+
+ public void run() {
+
+ StorageManager storageManager = null;
+
+ try {
+ storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+ } catch (SandeshaException e2) {
+ // TODO Auto-generated catch block
+ log.debug("ERROR: Could not start sender");
+ e2.printStackTrace();
+ return;
+ }
+
+ while (isSenderStarted()) {
+
+ try {
+ Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
+ } catch (InterruptedException e1) {
+ // e1.printStackTrace();
+ log.debug("Sender was interupted...");
+ log.debug(e1.getMessage());
+ log.debug("End printing Interrupt...");
+ }
+
+ Transaction transaction = null;
+ boolean rolebacked = false;
+
+ try {
+ if (context == null) {
+ String message = "Can't continue the Sender. Context is null";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ transaction = storageManager.getTransaction();
+
+ SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr();
+ SenderBean senderBean = mgr.getNextMsgToSend();
+ if (senderBean==null) {
+ continue;
+ }
+
+ String key = (String) senderBean.getMessageContextRefKey();
+ MessageContext msgCtx = storageManager.retrieveMessageContext(key, context);
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+
+ MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
+ boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context,storageManager);
+ if (!continueSending) {
+ continue;
+ }
+
+ if (msgCtx == null) {
+ String message = "Message context is not present in the storage";
+ }
+
+ // sender will not send the message if following property is
+ // set and not true.
+ // But it will set if it is not set (null)
+
+ // This is used to make sure that the mesage get passed the Sandesha2TransportSender.
+
+ String qualifiedForSending = (String) msgCtx.getProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING);
+ if (qualifiedForSending != null && !qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
+ continue;
+ }
+
+ if (msgCtx == null) {
+ log.debug("ERROR: Sender has an Unavailable Message entry");
+ break;
+ }
+
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+ //operation is the lowest level Sandesha2 should be attached
+ ArrayList msgsNotToSend = SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation()).getMsgTypesToDrop();
+
+ if (msgsNotToSend != null && msgsNotToSend.contains(new Integer(rmMsgCtx.getMessageType()))) {
+ continue;
+ }
+
+ updateMessage(msgCtx);
+
+ int messageType = rmMsgCtx.getMessageType();
+ if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceID = sequence.getIdentifier().getIdentifier();
+ }
+
+ //checking weather this message can carry piggybacked acks
+ if (isAckPiggybackableMsgType(messageType) && !isAckAlreadyPiggybacked(rmMsgCtx)) {
+ // piggybacking if an ack if available for the same sequence.
+ //TODO do piggybacking based on wsa:To
+ AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx,storageManager);
+ }
+
+ //sending the message
+ TransportOutDescription transportOutDescription = msgCtx.getTransportOut();
+ TransportSender transportSender = transportOutDescription.getSender();
+
+ boolean successfullySent = false;
+ if (transportSender != null) {
+
+ //have to commit the transaction before sending. This may get changed when WS-AT is available.
+ transaction.commit();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
+ try {
+
+ //had to fully build the SOAP envelope to support retransmissions.
+ //Otherwise a 'parserAlreadyAccessed' exception could get thrown in retransmissions.
+ //But this has a performance reduction.
+ msgCtx.getEnvelope().build();
+
+ //TODO change this to cater for security.
+ transportSender.invoke(msgCtx);
+ successfullySent = true;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ String message = "Sandesha2 got an exception when trying to send the message";
+ log.debug(message,e);
+ } finally {
+ transaction = storageManager.getTransaction();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+ }
+ }
+
+ // update or delete only if the object is still present.
+ SenderBean bean1 = mgr.retrieve(senderBean.getMessageID());
+ if (bean1 != null) {
+ if (senderBean.isReSend()) {
+ bean1.setSentCount(senderBean.getSentCount());
+ bean1.setTimeToSend(senderBean.getTimeToSend());
+ mgr.update(bean1);
+ } else {
+ mgr.delete(bean1.getMessageID());
+
+ //removing the message from the storage.
+ String messageStoredKey = bean1.getMessageContextRefKey();
+ storageManager.removeMessageContext(messageStoredKey);
+ }
+ }
+
+ if (successfullySent) {
+ if (!msgCtx.isServerSide())
+ checkForSyncResponses(msgCtx);
+ }
+
+ if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+ // terminate sending side.
+ TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ String sequenceID = terminateSequence.getIdentifier().getIdentifier();
+ ConfigurationContext configContext = msgCtx.getConfigurationContext();
+
+ String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,storageManager);
+ TerminateManager.terminateSendingSide(configContext,internalSequenceID, msgCtx.isServerSide(),storageManager);
+ }
+
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
+
+ } catch (Exception e) {
+
+ // TODO : when this is the client side throw the exception to
+ // the client when necessary.
+
+ if (transaction!=null) {
+ try {
+ transaction.rollback();
+ rolebacked = true;
+ } catch (Exception e1) {
+ String message = "Exception thrown when trying to roleback the transaction.";
+ log.debug(message,e1);
+ }
+ }
+
+ String message = "An Exception was throws in sending";
+ log.debug(message,e);
+ } finally {
+ if (transaction!=null && !rolebacked) {
+ try {
+ transaction.commit();
+ } catch (Exception e) {
+ String message = "Exception thrown when trying to commit the transaction.";
+ log.debug(message,e);
+ }
+ }
+ }
+ }
+ }
+
+ public synchronized void runSenderForTheSequence(
+ ConfigurationContext context, String sequenceID) {
+
+ if (sequenceID != null && !workingSequences.contains(sequenceID))
+ workingSequences.add(sequenceID);
+
+ if (!isSenderStarted()) {
+ this.context = context;
+ runSender = true; // so that isSenderStarted()=true.
+ super.start();
+ }
+ }
+
+ private void updateMessage(MessageContext msgCtx1) throws SandeshaException {
+ //do updates if required.
+ }
+
+ private void checkForSyncResponses(MessageContext msgCtx)
+ throws SandeshaException {
+
+ try {
+
+ boolean responsePresent = (msgCtx
+ .getProperty(MessageContext.TRANSPORT_IN) != null);
+ if (!responsePresent)
+ return;
+
+ // create the responseMessageContext
+
+ MessageContext responseMessageContext = new MessageContext();
+ responseMessageContext.setServerSide(false);
+ responseMessageContext.setConfigurationContext(msgCtx
+ .getConfigurationContext());
+ responseMessageContext.setTransportIn(msgCtx.getTransportIn());
+ responseMessageContext.setTransportOut(msgCtx.getTransportOut());
+
+ responseMessageContext.setProperty(MessageContext.TRANSPORT_IN,
+ msgCtx.getProperty(MessageContext.TRANSPORT_IN));
+ responseMessageContext.setServiceContext(msgCtx.getServiceContext());
+ responseMessageContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
+
+ //copying required properties from op. context to the response msg ctx.
+ OperationContext requestMsgOpCtx = msgCtx.getOperationContext();
+ if (requestMsgOpCtx!=null) {
+ if (responseMessageContext.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE)==null) {
+ responseMessageContext.setProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE,
+ requestMsgOpCtx.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE));
+ }
+
+ if (responseMessageContext.getProperty(HTTPConstants.CHAR_SET_ENCODING)==null) {
+ responseMessageContext.setProperty(HTTPConstants.CHAR_SET_ENCODING,
+ requestMsgOpCtx.getProperty(HTTPConstants.CHAR_SET_ENCODING));
+ }
+ }
+
+ // If request is REST we assume the responseMessageContext is REST,
+ // so set the variable
+
+ responseMessageContext.setDoingREST(msgCtx.isDoingREST());
+
+ SOAPEnvelope resenvelope = null;
+ try {
+ resenvelope = TransportUtils.createSOAPMessage(msgCtx,
+ msgCtx.getEnvelope().getNamespace().getName());
+
+ } catch (AxisFault e) {
+ // TODO Auto-generated catch block
+ log.debug("Valid SOAP envelope not found");
+ log.debug(e.getStackTrace().toString());
+ }
+
+ //if the request msg ctx is withina a transaction, processing if the response should also happen
+ //withing the same transaction
+ responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION
+ ,msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
+
+ if (resenvelope != null) {
+ responseMessageContext.setEnvelope(resenvelope);
+ AxisEngine engine = new AxisEngine(msgCtx
+ .getConfigurationContext());
+
+ if (isFaultEnvelope(resenvelope)) {
+ engine.receiveFault(responseMessageContext);
+ }else {
+ engine.receive(responseMessageContext);
+ }
+ }
+
+ } catch (Exception e) {
+ String message = "No valid Sync response...";
+ log.debug(message);
+ throw new SandeshaException(message, e);
+ }
+ }
+
+ private boolean isAckPiggybackableMsgType(int messageType) {
+ boolean piggybackable = true;
+
+ if (messageType==Sandesha2Constants.MessageTypes.ACK)
+ piggybackable = false;
+
+ return piggybackable;
+ }
+
+ private boolean isAckAlreadyPiggybacked (RMMsgContext rmMessageContext) {
+ if (rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT)!=null)
+ return true;
+
+ return false;
+ }
+
+ private boolean isFaultEnvelope (SOAPEnvelope envelope) throws SandeshaException {
+ SOAPFault fault = envelope.getBody().getFault();
+ if (fault!=null)
+ return true;
+ else
+ return false;
+ }
+
+}
\ No newline at end of file
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/Accept.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/Accept.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/Accept.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/Accept.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,110 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ * @author Sanka Samaranayaka <ss...@gmail.com>
+ * @author Saminda Abeyruwan <sa...@opensource.lk>
+ */
+
+public class Accept implements IOMRMElement {
+
+ private AcksTo acksTo;
+
+ private OMFactory defaultFactory;
+
+ private String rmNamespaceValue;
+
+ private String addressingNamespaceValue;
+
+
+ public Accept(OMFactory factory, String rmNamespaceValue, String addressingNamespaceValue) throws SandeshaException {
+ if (!isNamespaceSupported(rmNamespaceValue))
+ throw new SandeshaException ("Unsupported namespace");
+
+ this.defaultFactory = factory;
+ this.addressingNamespaceValue = addressingNamespaceValue;
+ this.rmNamespaceValue = rmNamespaceValue;
+ }
+
+ public String getNamespaceValue(){
+ return rmNamespaceValue;
+ }
+
+ public Object fromOMElement(OMElement element) throws OMException,SandeshaException {
+
+ OMFactory factory = element.getOMFactory();
+ if (factory==null)
+ factory = defaultFactory;
+
+ OMElement acceptPart = element.getFirstChildWithName(new QName(
+ rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.ACCEPT));
+ if (acceptPart == null)
+ throw new OMException("Passed element does not contain an Accept part");
+
+ acksTo = new AcksTo(defaultFactory,rmNamespaceValue,addressingNamespaceValue);
+ acksTo.fromOMElement(acceptPart);
+
+ return this;
+ }
+
+ public OMElement toOMElement(OMElement element) throws OMException {
+
+ OMFactory factory = element.getOMFactory();
+ if (factory==null)
+ factory = defaultFactory;
+
+ if (acksTo == null)
+ throw new OMException("Cant add Accept part since AcksTo object is null");
+
+ OMNamespace rmNamespace = factory.createOMNamespace(rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ OMElement acceptElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.ACCEPT, rmNamespace);
+
+ acksTo.toOMElement(acceptElement);
+ element.addChild(acceptElement);
+
+ return element;
+ }
+
+ public void setAcksTo(AcksTo acksTo) {
+ this.acksTo = acksTo;
+ }
+
+ public AcksTo getAcksTo() {
+ return acksTo;
+ }
+
+ public boolean isNamespaceSupported (String namespaceName) {
+ if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+ return true;
+
+ if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceName))
+ return true;
+
+ return false;
+ }
+}
\ No newline at end of file
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckFinal.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckFinal.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckFinal.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckFinal.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ *
+ * This represent the wsrm:final element that may be present withing a sequence acknowledgement.
+ */
+public class AckFinal implements IOMRMElement {
+
+ private OMFactory defaultfactory;
+
+ private String namespaceValue = null;
+
+ public AckFinal(OMFactory factory,String namespaceValue) throws SandeshaException {
+ if (!isNamespaceSupported(namespaceValue))
+ throw new SandeshaException ("Unsupported namespace");
+
+ this.defaultfactory = factory;
+ this.namespaceValue = namespaceValue;
+ }
+
+ public String getNamespaceValue(){
+ return namespaceValue;
+ }
+
+ public Object fromOMElement(OMElement element) throws OMException {
+
+ OMFactory factory = element.getOMFactory();
+ if (factory==null)
+ factory = defaultfactory;
+
+ OMElement finalPart = element.getFirstChildWithName(new QName(
+ namespaceValue, Sandesha2Constants.WSRM_COMMON.FINAL));
+ if (finalPart == null)
+ throw new OMException("The passed element does not contain a 'Final' part");
+
+ return this;
+ }
+
+ public OMElement toOMElement(OMElement sequenceAckElement) throws OMException {
+ //soapheaderblock element will be given
+
+ OMFactory factory = sequenceAckElement.getOMFactory();
+ if (factory==null)
+ factory = defaultfactory;
+
+ OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ OMElement finalElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.FINAL, rmNamespace);
+ sequenceAckElement.addChild(finalElement);
+
+ return sequenceAckElement;
+ }
+
+
+ //this element is only supported in 2005_05_10 spec.
+ public boolean isNamespaceSupported (String namespaceName) {
+ if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+ return false;
+
+ if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceName))
+ return true;
+
+ return false;
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckNone.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckNone.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckNone.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckNone.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,88 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ *
+ * This represent the wsrm:none element that may be present withing a sequence acknowledgement.
+ */
+public class AckNone implements IOMRMElement {
+
+ private OMNamespace rmNamespace = null;
+
+ private OMFactory defaultFactory;
+
+ private String namespaceValue = null;
+
+ public AckNone(OMFactory factory,String namespaceValue) throws SandeshaException {
+ if (!isNamespaceSupported(namespaceValue))
+ throw new SandeshaException ("Unsupported namespace");
+
+ this.defaultFactory = factory;
+ this.namespaceValue = namespaceValue;
+ }
+
+ public String getNamespaceValue(){
+ return namespaceValue;
+ }
+
+ public Object fromOMElement(OMElement element) throws OMException {
+
+ OMElement nonePart = element.getFirstChildWithName(new QName(
+ namespaceValue, Sandesha2Constants.WSRM_COMMON.NONE));
+ if (nonePart == null)
+ throw new OMException("The passed element does not contain a 'None' part");
+
+ return this;
+ }
+
+ public OMElement toOMElement(OMElement sequenceAckElement) throws OMException {
+
+ OMFactory factory = sequenceAckElement.getOMFactory();
+ if (factory==null)
+ factory = defaultFactory;
+
+ OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ OMElement noneElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.NONE, rmNamespace);
+
+ sequenceAckElement.addChild(noneElement);
+ return sequenceAckElement;
+ }
+
+ //this element is only supported in 2005_05_10 spec.
+ public boolean isNamespaceSupported (String namespaceName) {
+ if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+ return false;
+
+ if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceName))
+ return true;
+
+ return false;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org