You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/03/17 06:02:01 UTC
svn commit: r386537 [2/2] - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/
client/reports/ handlers/ msgprocessors/ storage/beanmanagers/
storage/inmemory/ util/ workers/
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Mar 16 21:01:58 2006
@@ -24,13 +24,18 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisOperationFactory;
import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportSender;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.AcknowledgementManager;
@@ -38,12 +43,15 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.SpecSpecificConstants;
+import org.apache.sandesha2.client.Sandesha2ClientAPI;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
import org.apache.sandesha2.storage.beans.InvokerBean;
import org.apache.sandesha2.storage.beans.NextMsgBean;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -58,11 +66,17 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CreateSequence;
+import org.apache.sandesha2.wsrm.Identifier;
import org.apache.sandesha2.wsrm.LastMessage;
+import org.apache.sandesha2.wsrm.MessageNumber;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.SequenceOffer;
+import org.apache.ws.commons.soap.SOAPBody;
import org.apache.ws.commons.soap.SOAPEnvelope;
import org.apache.ws.commons.soap.SOAPFactory;
+import org.apache.wsdl.WSDLConstants;
/**
* Responsible for processing an incoming Application message.
@@ -76,14 +90,14 @@
private Log log = LogFactory.getLog(getClass());
- public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
//Processing for ack if any
SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) rmMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
if (sequenceAck != null) {
AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
- ackProcessor.processMessage(rmMsgCtx);
+ ackProcessor.processInMessage(rmMsgCtx);
}
//TODO process embedded ack requests
@@ -204,14 +218,49 @@
throw new SandeshaException(message);
}
- String messagesStr = (String) msgsBean.getValue();
+ String key = SandeshaUtil.getUUID(); //key to store the message.
+
+ //updating the Highest_In_Msg_No property which gives the highest message number retrieved from this sequence.
+ String highetsInMsgNoStr = SandeshaUtil.getSequenceProperty(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER,configCtx);
+ String highetsInMsgKey = SandeshaUtil.getSequenceProperty(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,configCtx);
+
+ long highestInMsgNo=0;
+ if (highetsInMsgNoStr!=null) {
+ highestInMsgNo = Long.parseLong(highetsInMsgNoStr);
+ }
+
+ if (msgNo>highestInMsgNo) {
+ highestInMsgNo = msgNo;
+
+ String str = new Long(msgNo).toString();
+ SequencePropertyBean highestMsgNoBean = new SequencePropertyBean (sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER,str);
+ SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean (sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,key);
+
+ if (highetsInMsgNoStr!=null) {
+ seqPropMgr.update(highestMsgNoBean);
+ seqPropMgr.update(highestMsgKeyBean);
+ }else{
+ seqPropMgr.insert(highestMsgNoBean);
+ seqPropMgr.insert(highestMsgKeyBean);
+ }
+ }
+
+ String messagesStr = "";
+ if (msgsBean!=null)
+ messagesStr = (String) msgsBean.getValue();
+ else {
+ msgsBean = new SequencePropertyBean ();
+ msgsBean.setSequenceID(sequenceId);
+ msgsBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+ msgsBean.setValue(messagesStr);
+ }
+
if (msgNoPresentInList(messagesStr, msgNo)
&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
//this is a duplicate message and the invocation type is
// EXACTLY_ONCE.
rmMsgCtx.pause();
-
}
if (messagesStr != "" && messagesStr != null)
@@ -286,7 +335,6 @@
//saving the message.
try {
- String key = SandeshaUtil.getUUID();
storageManager.storeMessageContext(key,rmMsgCtx
.getMessageContext());
storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
@@ -380,4 +428,700 @@
throw new SandeshaException (message,e);
}
}
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
+ ConfigurationContext configContext = msgContext .getConfigurationContext();
+
+ //retrieving the the storage manager
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ Parameter policyParam = msgContext.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
+ if (policyParam == null) {
+ SandeshaPropertyBean propertyBean = PropertyManager.getInstance().getPropertyBean();
+ Parameter parameter = new Parameter();
+ parameter.setName(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
+ parameter.setValue(propertyBean);
+
+ // TODO this should be addede to the AxisMessage
+ try {
+ if (msgContext.getAxisOperation() != null)
+ msgContext.getAxisOperation().addParameter(parameter);
+ else if (msgContext.getAxisService() != null)
+ msgContext.getAxisService().addParameter(parameter);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+ }
+
+ Transaction outHandlerTransaction = storageManager.getTransaction();
+ boolean serverSide = msgContext.isServerSide();
+
+ // setting message Id if null
+ if (msgContext.getMessageID() == null)
+ msgContext.setMessageID(SandeshaUtil.getUUID());
+
+ // find internal sequence id
+ String internalSequenceId = null;
+
+ String storageKey = SandeshaUtil.getUUID(); //the key which will be used to store this message.
+
+ /* Internal sequence id is the one used to refer to the sequence (since
+ actual sequence id is not available when first msg arrives)
+ server side - a derivation of the sequenceId of the incoming sequence
+ client side - a derivation of wsaTo & SeequenceKey */
+
+ boolean lastMessage = false;
+ if (serverSide) {
+ // getting the request message and rmMessage.
+ MessageContext reqMsgCtx;
+ try {
+ reqMsgCtx = msgContext.getOperationContext().getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+
+ RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(reqMsgCtx);
+
+ Sequence reqSequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (reqSequence == null) {
+ String message = "Sequence part is null";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String incomingSeqId = reqSequence.getIdentifier().getIdentifier();
+ if (incomingSeqId == null || incomingSeqId == "") {
+ String message = "Invalid seqence Id";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ long requestMsgNo = reqSequence.getMessageNumber().getMessageNumber();
+
+ internalSequenceId = SandeshaUtil.getInternalSequenceID(incomingSeqId);
+
+ //deciding weather the last message.
+ String requestLastMsgNoStr = SandeshaUtil.getSequenceProperty(incomingSeqId,Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO,configContext);
+ if (requestLastMsgNoStr!=null) {
+ long requestLastMsgNo = Long.parseLong(requestLastMsgNoStr);
+ if (requestLastMsgNo==requestMsgNo)
+ lastMessage = true;
+ }
+
+ } else {
+ // set the internal sequence id for the client side.
+ EndpointReference toEPR = msgContext.getTo();
+ if (toEPR == null || toEPR.getAddress() == null || "".equals(toEPR.getAddress())) {
+ String message = "TO End Point Reference is not set correctly. This is a must for the sandesha client side.";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String to = toEPR.getAddress();
+ String sequenceKey = (String) msgContext.getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
+ internalSequenceId = SandeshaUtil.getInternalSequenceID(to,sequenceKey);
+ }
+
+ /* checking weather the user has given the messageNumber (most of the cases this will not be the case where
+ the system will generate the message numbers */
+
+ //User should set it as a long object.
+ Long messageNumberLng = (Long) msgContext.getProperty(Sandesha2ClientAPI.MESSAGE_NUMBER);
+
+ long givenMessageNumber = -1;
+ if (messageNumberLng!=null) {
+ givenMessageNumber = messageNumberLng.longValue();
+ if (givenMessageNumber<=0) {
+ throw new SandeshaException ("The givem message number value is invalid (has to be larger than zero)");
+ }
+ }
+
+ //the message number that was last used.
+ long systemMessageNumber = getPreviousMsgNo(configContext, internalSequenceId);
+
+ //The number given by the user has to be larger than the last stored number.
+ if (givenMessageNumber>0 && givenMessageNumber<=systemMessageNumber) {
+ String message = "The given message number is not larger than value of the last sent message.";
+ throw new SandeshaException (message);
+ }
+
+ //Finding the correct message number.
+ long messageNumber = -1;
+ if (givenMessageNumber>0) // if given message number is valid use it. (this is larger than the last stored due to the last check)
+ messageNumber = givenMessageNumber;
+ else if (systemMessageNumber>0) { //if system message number is valid use it.
+ messageNumber = systemMessageNumber+1;
+ } else { //This is the fist message (systemMessageNumber = -1)
+ messageNumber = 1;
+ }
+
+ //A dummy message is a one which will not be processed as a actual application message.
+ //The RM handlers will simply let these go.
+ String dummyMessageString = (String) msgContext.getOptions().getProperty(Sandesha2ClientAPI.DUMMY_MESSAGE);
+ boolean dummyMessage = false;
+ if (dummyMessageString!=null && Sandesha2ClientAPI.VALUE_TRUE.equals(dummyMessageString))
+ dummyMessage = true;
+
+ //saving the used message number
+ if (!dummyMessage)
+ setNextMsgNo(configContext,internalSequenceId,messageNumber);
+
+
+ //set this as the response highest message.
+ SequencePropertyBean responseHighestMsgBean = new SequencePropertyBean (
+ internalSequenceId,
+ Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_NUMBER,
+ new Long (messageNumber).toString()
+ );
+ seqPropMgr.insert(responseHighestMsgBean);
+
+ if (lastMessage) {
+
+ SequencePropertyBean responseHighestMsgKeyBean = new SequencePropertyBean (
+ internalSequenceId,
+ Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_KEY,
+ storageKey
+ );
+
+ SequencePropertyBean responseLastMsgKeyBean = new SequencePropertyBean (
+ internalSequenceId,
+ Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,
+ new Long (messageNumber).toString()
+ );
+
+ seqPropMgr.insert(responseHighestMsgKeyBean);
+ seqPropMgr.insert(responseLastMsgKeyBean);
+ }
+
+ boolean sendCreateSequence = false;
+
+ SequencePropertyBean outSeqBean = seqPropMgr.retrieve(
+ internalSequenceId,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+
+ // setting async ack endpoint for the server side. (if present)
+ if (serverSide) {
+ String incomingSequenceID = SandeshaUtil
+ .getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
+ SequencePropertyBean incomingToBean = seqPropMgr.retrieve(
+ incomingSequenceID,
+ Sandesha2Constants.SequenceProperties.TO_EPR);
+ if (incomingToBean != null) {
+ String incomingTo = incomingToBean.getValue();
+ msgContext.setProperty(Sandesha2ClientAPI.AcksTo, incomingTo);
+ }
+ }
+
+
+ //FINDING THE SPEC VERSION
+ String specVersion = null;
+ if (msgContext.isServerSide()) {
+ //in the server side, get the RM version from the request sequence.
+ MessageContext requestMessageContext;
+ try {
+ requestMessageContext = msgContext.getOperationContext().getMessageContext(AxisOperationFactory.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+
+ if (requestMessageContext==null)
+ throw new SandeshaException ("Request message context is null, cant find out the request side sequenceID");
+
+ RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(requestMessageContext);
+ Sequence sequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+
+ String requestSequenceID = sequence.getIdentifier().getIdentifier();
+ SequencePropertyBean specVersionBean = seqPropMgr.retrieve(requestSequenceID,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+ if (specVersionBean==null)
+ throw new SandeshaException ("SpecVersion sequence property bean is not available for the incoming sequence. Cant find the RM version for outgoing side");
+
+ specVersion = specVersionBean.getValue();
+ } else {
+ //in the client side, user will set the RM version.
+ specVersion = (String) msgContext.getProperty(Sandesha2ClientAPI.RM_SPEC_VERSION);
+ }
+
+ if (specVersion==null)
+ specVersion = SpecSpecificConstants.getDefaultSpecVersion(); //TODO change the default to WSRX.
+
+ if (messageNumber == 1) {
+ if (outSeqBean == null) { // out sequence will be set for the server side, in the case of an offer.
+ sendCreateSequence = true; // message number being one and not having an out sequence, implies that a create sequence has to be send.
+ }
+
+ // if fist message - setup the sending side sequence - both for the server and the client sides
+ SequenceManager.setupNewClientSequence(msgContext, internalSequenceId,specVersion);
+ }
+
+ ServiceContext serviceContext = msgContext.getServiceContext();
+ OperationContext operationContext = msgContext.getOperationContext();
+
+ //SENDING THE CREATE SEQUENCE.
+ if (sendCreateSequence) {
+ SequencePropertyBean responseCreateSeqAdded = seqPropMgr
+ .retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT);
+
+ if (responseCreateSeqAdded == null) {
+ responseCreateSeqAdded = new SequencePropertyBean(
+ internalSequenceId,Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT,"true");
+ seqPropMgr.insert(responseCreateSeqAdded);
+
+ String acksTo = null;
+ if (serviceContext != null)
+ acksTo = (String) msgContext.getProperty(Sandesha2ClientAPI.AcksTo);
+
+ if (msgContext.isServerSide()) {
+ // we do not set acksTo value to anonymous when the create
+ // sequence is send from the server.
+ MessageContext requestMessage;
+ try {
+ requestMessage = operationContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+
+ if (requestMessage == null) {
+ String message = "Request message is not present";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+ acksTo = requestMessage.getTo().getAddress();
+
+ } else {
+ if (acksTo == null)
+ acksTo = Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
+ }
+
+ if (!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo) && !serverSide) {
+ String transportIn = (String) configContext //TODO verify
+ .getProperty(MessageContext.TRANSPORT_IN);
+ if (transportIn == null)
+ transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
+ } else if (acksTo == null && serverSide) {
+ String incomingSequencId = SandeshaUtil.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
+ SequencePropertyBean bean = seqPropMgr.retrieve(
+ incomingSequencId,Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
+ if (bean != null) {
+ EndpointReference acksToEPR = new EndpointReference(bean.getValue());
+ if (acksToEPR != null)
+ acksTo = (String) acksToEPR.getAddress();
+ }
+ } else if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS
+ .equals(acksTo)) {
+ // set transport in.
+ Object trIn = msgContext.getProperty(MessageContext.TRANSPORT_IN);
+ if (trIn == null) {
+ //TODO
+ }
+ }
+ addCreateSequenceMessage(rmMsgCtx, internalSequenceId, acksTo);
+ }
+ }
+
+ SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
+ if (env == null) {
+ SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(
+ SandeshaUtil.getSOAPVersion(env)).getDefaultEnvelope();
+ rmMsgCtx.setSOAPEnvelop(envelope);
+ }
+
+ SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
+ if (soapBody == null) {
+ String message = "Invalid SOAP message. Body is not present";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String messageId1 = SandeshaUtil.getUUID();
+ if (rmMsgCtx.getMessageId() == null) {
+ rmMsgCtx.setMessageId(messageId1);
+ }
+
+
+ if (serverSide) {
+ // let the request end with 202 if a ack has not been
+ // written in the incoming thread.
+
+ MessageContext reqMsgCtx = null;
+ try {
+ reqMsgCtx = msgContext.getOperationContext().getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+
+ if (reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN) == null
+ || !"true".equals(reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN)))
+ reqMsgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ }
+
+ EndpointReference toEPR = msgContext.getTo();
+ if (toEPR == null) {
+ String message = "To EPR is not found";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ //setting default actions.
+ String to = toEPR.getAddress();
+ String operationName = msgContext.getOperationContext().getAxisOperation().getName().getLocalPart();
+ if (msgContext.getWSAAction() == null) {
+ msgContext.setWSAAction(to + "/" + operationName);
+ }
+ if (msgContext.getSoapAction() == null) {
+ msgContext.setSoapAction("\"" + to + "/" + operationName + "\"");
+ }
+
+ // processing the response if not an dummy.
+ if (!dummyMessage)
+ processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber,storageKey);
+
+ msgContext.pause(); // the execution will be stopped.
+ outHandlerTransaction.commit();
+
+ }
+
+ private void addCreateSequenceMessage(RMMsgContext applicationRMMsg,
+ String internalSequenceId, String acksTo) throws SandeshaException {
+
+ MessageContext applicationMsg = applicationRMMsg.getMessageContext();
+ ConfigurationContext configCtx = applicationMsg.getConfigurationContext();
+
+ //generating a new create sequeuce message.
+ RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(applicationRMMsg, internalSequenceId, acksTo);
+
+ createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
+ CreateSequence createSequencePart = (CreateSequence) createSeqRMMessage.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+
+ //retrieving the storage manager.
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
+ SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+ SequenceOffer offer = createSequencePart.getSequenceOffer();
+ if (offer != null) {
+ String offeredSequenceId = offer.getIdentifer().getIdentifier();
+
+ SequencePropertyBean offeredSequenceBean = new SequencePropertyBean();
+ offeredSequenceBean.setName(Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
+ offeredSequenceBean.setSequenceID(internalSequenceId);
+ offeredSequenceBean.setValue(offeredSequenceId);
+
+ seqPropMgr.insert(offeredSequenceBean);
+ }
+
+ MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
+ createSeqMsg.setRelatesTo(null); // create seq msg does not relateTo anything
+
+ CreateSeqBean createSeqBean = new CreateSeqBean(internalSequenceId,createSeqMsg.getMessageID(), null);
+ createSeqMgr.insert(createSeqBean);
+
+ if (createSeqMsg.getReplyTo() == null)
+ createSeqMsg.setReplyTo(new EndpointReference(Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+
+ String key = SandeshaUtil.getUUID(); //the key used to store the create sequence message.
+
+ SenderBean createSeqEntry = new SenderBean();
+ createSeqEntry.setMessageContextRefKey(key);
+ createSeqEntry.setTimeToSend(System.currentTimeMillis());
+ createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
+
+ // this will be set to true in the sender
+ createSeqEntry.setSend(true);
+
+ createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_FALSE);
+ createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
+ retransmitterMgr.insert(createSeqEntry);
+
+ storageManager.storeMessageContext(key,createSeqMsg); //storing the message.
+
+
+ // message will be stored in the Sandesha2TransportSender
+ createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, key);
+
+ TransportOutDescription transportOut = createSeqMsg.getTransportOut();
+
+ createSeqMsg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+ createSeqMsg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+ createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, key);
+
+ Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc ();
+ createSeqMsg.setTransportOut(sandesha2TransportOutDesc);
+
+ // sending the message once through Sandesha2TransportSender.
+ AxisEngine engine = new AxisEngine(createSeqMsg.getConfigurationContext());
+ try {
+ log.info ("Sending create seq msg...");
+ engine.send(createSeqMsg);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e.getMessage());
+ }
+ }
+
+ private void processResponseMessage(RMMsgContext rmMsg,
+ String internalSequenceId, long messageNumber, String storageKey) throws SandeshaException {
+
+ MessageContext msg = rmMsg.getMessageContext();
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(rmMsg.getSOAPEnvelope()));
+ ConfigurationContext configurationContext = rmMsg.getMessageContext().getConfigurationContext();
+
+ //retrieving storage manager
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msg.getConfigurationContext());
+ SequencePropertyBeanMgr sequencePropertyMgr = storageManager.getSequencePropretyBeanMgr();
+ SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+ SequencePropertyBean toBean = sequencePropertyMgr.retrieve(
+ internalSequenceId,Sandesha2Constants.SequenceProperties.TO_EPR);
+ SequencePropertyBean replyToBean = sequencePropertyMgr.retrieve(
+ internalSequenceId,Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
+
+ // again - looks weird in the client side - but consistent
+ SequencePropertyBean outSequenceBean = sequencePropertyMgr.retrieve(
+ internalSequenceId,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+
+ if (toBean == null) {
+ String message = "To is null";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ EndpointReference toEPR = new EndpointReference(toBean.getValue());
+
+ EndpointReference replyToEPR = null;
+ if (replyToBean != null) {
+ replyToEPR = new EndpointReference(replyToBean.getValue());
+ }
+
+ if (toEPR == null || toEPR.getAddress() == null || toEPR.getAddress() == "") {
+ String message = "To Property has an invalid value";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String newToStr = null;
+ if (msg.isServerSide()) {
+ try {
+ MessageContext requestMsg = msg.getOperationContext().getMessageContext(
+ OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if (requestMsg != null) {
+ newToStr = requestMsg.getReplyTo().getAddress();
+ }
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+ }
+
+ if (newToStr != null)
+ rmMsg.setTo(new EndpointReference(newToStr));
+ else
+ rmMsg.setTo(toEPR);
+
+ if (replyToEPR != null)
+ rmMsg.setReplyTo(replyToEPR);
+
+ String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,configurationContext);
+ if (rmVersion==null)
+ throw new SandeshaException ("Cant find the rmVersion of the given message");
+
+ String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
+
+ Sequence sequence = new Sequence(factory,rmNamespaceValue);
+
+ MessageNumber msgNumber = new MessageNumber(factory,rmNamespaceValue);
+ msgNumber.setMessageNumber(messageNumber);
+ sequence.setMessageNumber(msgNumber);
+
+ boolean lastMessage = false;
+ // setting last message
+ if (msg.isServerSide()) {
+ MessageContext requestMsg = null;
+
+ try {
+ requestMsg = msg.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+
+ RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsg);
+ Sequence requestSequence = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (requestSequence == null) {
+ String message = "Request Sequence is null";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ if (requestSequence.getLastMessage() != null) {
+ lastMessage = true;
+ sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
+
+// // saving the last message no.
+// SequencePropertyBean lastOutMsgBean = new SequencePropertyBean(
+// internalSequenceId,
+// Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE,
+// new Long(messageNumber).toString());
+// sequencePropertyMgr.insert(lastOutMsgBean);
+ }
+
+ } else {
+ // client side
+
+ OperationContext operationContext = msg.getOperationContext();
+ if (operationContext != null) {
+ Object obj = msg.getProperty(Sandesha2ClientAPI.LAST_MESSAGE);
+ if (obj != null && "true".equals(obj)) {
+ lastMessage = true;
+
+ SequencePropertyBean specVersionBean = sequencePropertyMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+ if (specVersionBean==null)
+ throw new SandeshaException ("Spec version bean is not set");
+
+ String specVersion = specVersionBean.getValue();
+ if (SpecSpecificConstants.isLastMessageIndicatorRequired(specVersion))
+ sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
+
+// // saving the last message no.
+// SequencePropertyBean lastOutMsgBean = new SequencePropertyBean(
+// internalSequenceId,
+// Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE,
+// new Long(messageNumber).toString());
+// sequencePropertyMgr.insert(lastOutMsgBean);
+ }
+ }
+ }
+
+ AckRequested ackRequested = null;
+
+ boolean addAckRequested = false;
+ //if (!lastMessage)
+ addAckRequested = true; //TODO decide the policy to add the ackRequested tag
+
+ // setting the Sequnece id.
+ // Set send = true/false depending on the availability of the out
+ // sequence id.
+ String identifierStr = null;
+ if (outSequenceBean == null || outSequenceBean.getValue() == null) {
+ identifierStr = Sandesha2Constants.TEMP_SEQUENCE_ID;
+
+ } else {
+ identifierStr = (String) outSequenceBean.getValue();
+ }
+
+ Identifier id1 = new Identifier(factory,rmNamespaceValue);
+ id1.setIndentifer(identifierStr);
+ sequence.setIdentifier(id1);
+ rmMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE,sequence);
+
+ if (addAckRequested) {
+ ackRequested = new AckRequested(factory,rmNamespaceValue);
+ Identifier id2 = new Identifier(factory,rmNamespaceValue);
+ id2.setIndentifer(identifierStr);
+ ackRequested.setIdentifier(id2);
+ rmMsg.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST,
+ ackRequested);
+ }
+
+ try {
+ rmMsg.addSOAPEnvelope();
+ } catch (AxisFault e1) {
+ throw new SandeshaException(e1.getMessage());
+ }
+
+ //Retransmitter bean entry for the application message
+ SenderBean appMsgEntry = new SenderBean();
+
+ appMsgEntry.setMessageContextRefKey(storageKey);
+
+ appMsgEntry.setTimeToSend(System.currentTimeMillis());
+ appMsgEntry.setMessageID(rmMsg.getMessageId());
+ appMsgEntry.setMessageNumber(messageNumber);
+ appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ if (outSequenceBean == null || outSequenceBean.getValue() == null) {
+ appMsgEntry.setSend(false);
+ } else {
+ appMsgEntry.setSend(true);
+ // Send will be set to true at the sender.
+ msg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,
+ Sandesha2Constants.VALUE_TRUE);
+ }
+
+ appMsgEntry.setInternalSequenceID(internalSequenceId);
+ storageManager.storeMessageContext(storageKey,msg);
+ retransmitterMgr.insert(appMsgEntry);
+ msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_FALSE);
+
+ // changing the sender. This will set send to true.
+ TransportSender sender = msg.getTransportOut().getSender();
+
+ if (sender != null) {
+ Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc ();
+ msg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, storageKey);
+ msg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,
+ msg.getTransportOut());
+ msg.setTransportOut(sandesha2TransportOutDesc);
+
+ }
+
+ //increasing the current handler index, so that the message will not be going throught the SandeshaOutHandler again.
+ msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex()+1);
+
+ //sending the message through, other handlers and the Sandesha2TransportSender so that it get dumped to the storage.
+ AxisEngine engine = new AxisEngine (msg.getConfigurationContext());
+ try {
+ engine.resumeSend(msg);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+ }
+
+ private long getPreviousMsgNo(ConfigurationContext context,
+ String internalSequenceId) throws SandeshaException {
+
+ //retrieving the storage managers
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(
+ internalSequenceId,Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
+
+ long nextMsgNo = -1;
+ if (nextMsgNoBean != null) {
+ Long nextMsgNoLng = new Long(nextMsgNoBean.getValue());
+ nextMsgNo = nextMsgNoLng.longValue();
+ }
+
+ return nextMsgNo;
+ }
+
+ private void setNextMsgNo(ConfigurationContext context,
+ String internalSequenceId, long msgNo) throws SandeshaException {
+
+ if (msgNo<=0) {
+ String message = "Message number '" + msgNo + "' is invalid. Has to be larger than zero.";
+ throw new SandeshaException (message);
+ }
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(
+ internalSequenceId,Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
+
+ boolean update = true;
+ if (nextMsgNoBean == null) {
+ update = false;
+ nextMsgNoBean = new SequencePropertyBean();
+ nextMsgNoBean.setSequenceID(internalSequenceId);
+ nextMsgNoBean.setName(Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
+ }
+
+ nextMsgNoBean.setValue(new Long(msgNo).toString());
+ if (update)
+ seqPropMgr.update(nextMsgNoBean);
+ else
+ seqPropMgr.insert(nextMsgNoBean);
+
+ }
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Thu Mar 16 21:01:58 2006
@@ -26,7 +26,7 @@
public class CloseSequenceProcessor implements MsgProcessor {
- public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
CloseSequence closeSequence = (CloseSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
@@ -114,7 +114,9 @@
closeSequenceTransaction.commit();
}
-
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+ }
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Thu Mar 16 21:01:58 2006
@@ -17,6 +17,9 @@
package org.apache.sandesha2.msgprocessors;
+import java.util.ArrayList;
+import java.util.Collection;
+
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
@@ -53,12 +56,11 @@
private Log log = LogFactory.getLog(getClass());
- public void processMessage(RMMsgContext createSeqRMMsg)
+ public void processInMessage(RMMsgContext createSeqRMMsg)
throws SandeshaException {
MessageContext createSeqMsg = createSeqRMMsg.getMessageContext();
- CreateSequence createSeqPart = (CreateSequence) createSeqRMMsg
- .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+ CreateSequence createSeqPart = (CreateSequence) createSeqRMMsg.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
if (createSeqPart == null) {
String message = "No create sequence part is present in the create sequence message";
log.debug(message);
@@ -80,91 +82,77 @@
return;
}
+
MessageContext outMessage = null;
- outMessage = Utils.createOutMessageContext(createSeqMsg);
+ outMessage = Utils.createOutMessageContext(createSeqMsg); //createing a new response message.
- ConfigurationContext context = createSeqRMMsg.getMessageContext()
- .getConfigurationContext();
+ ConfigurationContext context = createSeqMsg.getConfigurationContext();
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(context);
-
- Transaction createSequenceTransaction = storageManager.getTransaction();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ Transaction createSequenceTransaction = storageManager.getTransaction(); //begining of a new transaction
try {
- String newSequenceId = SequenceManager
- .setupNewSequence(createSeqRMMsg);
- if (newSequenceId == null)
- throw new AxisFault(
- "Internal error - Generated sequence id is null");
-
- RMMsgContext createSeqResponse = RMMsgCreator
- .createCreateSeqResponseMsg(createSeqRMMsg, outMessage,
- newSequenceId);
+ String newSequenceId = SequenceManager.setupNewSequence(createSeqRMMsg); //newly created sequnceID.
+
+ RMMsgContext createSeqResponse = RMMsgCreator.createCreateSeqResponseMsg(
+ createSeqRMMsg, outMessage,newSequenceId); // converting the blank out message in to a create
+ // sequence response.
createSeqResponse.setFlow(MessageContext.OUT_FLOW);
- createSeqResponse.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
- CreateSequenceResponse createSeqResPart = (CreateSequenceResponse) createSeqResponse
- .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
-
- //If an offer is accepted do necessary procesing.
- Accept accept = createSeqResPart.getAccept();
- if (accept != null) {
- SequenceOffer offer = createSeqPart.getSequenceOffer();
- if (offer == null) {
- String message = "Internal error - no offer for the response message with Accept";
+ createSeqResponse.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true"); //for making sure that this wont be processed again.
+ CreateSequenceResponse createSeqResPart = (CreateSequenceResponse) createSeqResponse.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+
+
+ //OFFER PROCESSING
+ SequenceOffer offer = createSeqPart.getSequenceOffer();
+ if (offer != null) {
+ Accept accept = createSeqResPart.getAccept();
+ if (accept == null) {
+ String message = "An accept part has not been generated for the create seq request with an offer part";
log.debug(message);
throw new SandeshaException(message);
}
- //Setting the CreateSequence table entry.
- String incomingSeqId = createSeqResPart.getIdentifier()
- .getIdentifier();
- String outSequenceId = offer.getIdentifer().getIdentifier();
- CreateSeqBean createSeqBean = new CreateSeqBean();
- createSeqBean.setSequenceID(outSequenceId);
- createSeqBean.setInternalSequenceID(newSequenceId);
- createSeqBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); //this is a dummy value.
+ String offeredSequenceID = offer.getIdentifer().getIdentifier(); //offered seq. id.
+
+ boolean offerEcepted = offerAccepted (offeredSequenceID,context);
- CreateSeqBeanMgr createSeqMgr = storageManager
- .getCreateSeqBeanMgr();
+ if (offerEcepted) {
+ //Setting the CreateSequence table entry for the outgoing side.
+ CreateSeqBean createSeqBean = new CreateSeqBean();
+ createSeqBean.setSequenceID(offeredSequenceID);
+ String outgoingSideInternalSequenceID = SandeshaUtil.getInternalSequenceID(newSequenceId);
+ createSeqBean.setInternalSequenceID(outgoingSideInternalSequenceID);
+ createSeqBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); //this is a dummy value.
+
+ CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
+ createSeqMgr.insert(createSeqBean);
+
+ //Setting sequence properties for the outgoing sequence.
+ //Only will be used by the server side response path. Will be wasted properties for the client side.
+
+ //setting the out_sequence_id
+ SequencePropertyBean outSequenceBean = new SequencePropertyBean();
+ outSequenceBean.setName(Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+ outSequenceBean.setValue(offeredSequenceID);
+ outSequenceBean.setSequenceID(outgoingSideInternalSequenceID);
+ seqPropMgr.insert(outSequenceBean);
- //Setting sequence properties.
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
- SequencePropertyBean outSequenceBean = new SequencePropertyBean();
- outSequenceBean
- .setName(Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
- outSequenceBean.setValue(outSequenceId);
- outSequenceBean.setSequenceID(newSequenceId);
- seqPropMgr.insert(outSequenceBean);
-
- //Temp sequence id should be set for the server side.
- //If internal sequence id is not set. this implies server side.
- SequencePropertyBean internalSeqBean = seqPropMgr.retrieve(
- outSequenceId,
- Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
- if (internalSeqBean == null) {
+ //setting the internal_sequence_id
SequencePropertyBean internalSequenceBean = new SequencePropertyBean();
- internalSequenceBean
- .setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
- internalSequenceBean.setSequenceID(outSequenceId);
- internalSequenceBean.setValue(newSequenceId);
+ internalSequenceBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ internalSequenceBean.setSequenceID(offeredSequenceID);
+ internalSequenceBean.setValue(outgoingSideInternalSequenceID);
seqPropMgr.insert(internalSequenceBean);
+ } else {
+ //removing the accept part.
+ createSeqResPart.setAccept(null);
+ createSeqResponse.addSOAPEnvelope();
}
-
}
- CreateSequence createSeq = (CreateSequence) createSeqRMMsg
- .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
- if (createSeq == null) {
- String message = "Create sequence part not present in the create sequence message";
- log.debug(message);
- throw new AxisFault(message);
- }
-
- EndpointReference acksTo = createSeq.getAcksTo().getAddress()
- .getEpr();
+ EndpointReference acksTo = createSeqPart.getAcksTo().getAddress().getEpr();
if (acksTo == null || acksTo.getAddress() == null
|| acksTo.getAddress() == "") {
String message = "Acks to not present in the create sequence message";
@@ -172,17 +160,12 @@
throw new AxisFault(message);
}
- SequencePropertyBean seqPropBean = new SequencePropertyBean(
- newSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
- acksTo.getAddress());
-
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
- seqPropMgr.insert(seqPropBean);
- outMessage.setResponseWritten(true);
+ SequencePropertyBean acksToBean = new SequencePropertyBean(
+ newSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,acksTo.getAddress());
- Object obj1 = createSeqMsg.getOperationContext().getProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN);
+ seqPropMgr.insert(acksToBean);
+
+ outMessage.setResponseWritten(true);
//commiting tr. before sending the response msg.
createSequenceTransaction.commit();
@@ -193,12 +176,8 @@
AxisEngine engine = new AxisEngine(context);
engine.send(outMessage);
-
- Object obj = createSeqMsg.getOperationContext().getProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN);
SequencePropertyBean toBean = seqPropMgr.retrieve(newSequenceId,Sandesha2Constants.SequenceProperties.TO_EPR);
-
if (toBean==null) {
String message = "Internal Error: wsa:To value is not set";
log.debug(message);
@@ -207,20 +186,38 @@
EndpointReference toEPR = new EndpointReference (toBean.getValue());
- if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(toEPR
- .getAddress())) {
- createSeqMsg.getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+ if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(
+ toEPR.getAddress())) {
+ createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
} else {
- createSeqMsg.getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
}
+
} catch (AxisFault e1) {
throw new SandeshaException(e1);
}
createSeqRMMsg.pause();
+ }
+
+ private boolean offerAccepted (String sequenceID, ConfigurationContext configCtx) throws SandeshaException {
+ if ("".equals(sequenceID))
+ return false;
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx);
+ CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
+
+ CreateSeqBean createSeqFindBean = new CreateSeqBean ();
+ createSeqFindBean.setSequenceID(sequenceID);
+ Collection arr = createSeqMgr.find(createSeqFindBean);
+
+ if (arr.size()>0)
+ return false;
+
+ return true;
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
- //createSequenceTransaction.commit();
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Mar 16 21:01:58 2006
@@ -62,12 +62,13 @@
private Log log = LogFactory.getLog(getClass());
- public void processMessage(RMMsgContext createSeqResponseRMMsgCtx)
+ public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx)
throws SandeshaException {
SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
.getSOAPVersion(createSeqResponseRMMsgCtx.getSOAPEnvelope()));
+ MessageContext createSeqResponseMsg = createSeqResponseRMMsgCtx.getMessageContext();
ConfigurationContext configCtx = createSeqResponseRMMsgCtx
.getMessageContext().getConfigurationContext();
StorageManager storageManager = SandeshaUtil
@@ -80,7 +81,7 @@
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
if (sequenceAck != null) {
AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
- ackProcessor.processMessage(createSeqResponseRMMsgCtx);
+ ackProcessor.processInMessage(createSeqResponseRMMsgCtx);
}
ackProcessTransaction.commit();
@@ -169,7 +170,7 @@
//TODO this should be detected in the Fault manager.
if (offeredSequenceBean == null) {
- String message = "No offered sequence. But an accept was received";
+ String message = "No offered sequence entry. But an accept was received";
log.debug(message);
throw new SandeshaException(message);
}
@@ -191,6 +192,22 @@
NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
nextMsgMgr.insert(nextMsgBean);
+
+ String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
+
+ SequencePropertyBean specVersionBean = new SequencePropertyBean (
+ offeredSequenceId,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION,rmSpecVersion);
+ sequencePropMgr.insert(specVersionBean);
+
+ SequencePropertyBean receivedMsgBean = new SequencePropertyBean(
+ offeredSequenceId, Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
+ sequencePropMgr.insert(receivedMsgBean);
+
+ SequencePropertyBean msgsBean = new SequencePropertyBean();
+ msgsBean.setSequenceID(offeredSequenceId);
+ msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+ msgsBean.setValue("");
+ sequencePropMgr.insert(msgsBean);
}
offerProcessTransaction.commit();
@@ -272,5 +289,9 @@
"false");
createSeqResponseRMMsgCtx.pause();
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java Thu Mar 16 21:01:58 2006
@@ -28,5 +28,6 @@
*/
public interface MsgProcessor {
- public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException;
+ public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException;
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException;
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Thu Mar 16 21:01:58 2006
@@ -18,26 +18,40 @@
package org.apache.sandesha2.msgprocessors;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.util.Utils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.AcknowledgementManager;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.SpecSpecificConstants;
import org.apache.sandesha2.TerminateManager;
+import org.apache.sandesha2.client.Sandesha2ClientAPI;
+import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.TerminateSequence;
@@ -51,7 +65,7 @@
private Log log = LogFactory.getLog(getClass());
- public void processMessage(RMMsgContext terminateSeqRMMsg)
+ public void processInMessage(RMMsgContext terminateSeqRMMsg)
throws SandeshaException {
MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
@@ -60,7 +74,7 @@
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
if (sequenceAck != null) {
AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
- ackProcessor.processMessage(terminateSeqRMMsg);
+ ackProcessor.processInMessage(terminateSeqRMMsg);
}
//Processing the terminate message
@@ -112,6 +126,10 @@
addTerminateSequenceResponse (terminateSeqRMMsg);
+
+ setUpHighestMsgNumbers(context,storageManager,sequenceId,terminateSeqRMMsg);
+
+
terminateReceivedTransaction.commit();
Transaction terminateTransaction = storageManager.getTransaction();
@@ -123,6 +141,7 @@
sequencePropertyBeanMgr.insert(terminatedBean);
+
terminateTransaction.commit();
SandeshaUtil.stopSenderForTheSequence(sequenceId);
@@ -142,6 +161,74 @@
terminateSeqRMMsg.pause();
}
+
+ private void setUpHighestMsgNumbers (ConfigurationContext configCtx, StorageManager storageManager, String sequenceID, RMMsgContext terminateRMMsg) throws SandeshaException {
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ String highestImMsgNumberStr = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER,configCtx);
+ String highestImMsgKey = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,configCtx);
+
+ long highestInMsgNo = 0;
+ if (highestImMsgNumberStr!=null) {
+ if (highestImMsgKey==null)
+ throw new SandeshaException ("Key of the highest message number has not been stored");
+
+ highestInMsgNo = Long.parseLong(highestImMsgNumberStr);
+ }
+
+ //following will be valid only for the server side, since the obtained int. seq ID is only valid there.
+ String responseSideInternalSequenceID = SandeshaUtil.getInternalSequenceID(sequenceID);
+
+ long highestOutMsgNo = 0;
+ try {
+ boolean addResponseSideTerminate = false;
+ if (highestInMsgNo==0) {
+ addResponseSideTerminate=false;
+ } else {
+
+ //setting the last in message property
+ SequencePropertyBean lastInMsgBean = new SequencePropertyBean (
+ sequenceID,Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO,highestImMsgNumberStr);
+ seqPropMgr.insert(lastInMsgBean);
+
+ MessageContext highestInMsg = storageManager.retrieveMessageContext(highestImMsgKey,configCtx);
+ MessageContext highestOutMessage = highestInMsg.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
+
+ if (highestOutMessage!=null) {
+ RMMsgContext highestOutRMMsg = MsgInitializer.initializeMessage(highestOutMessage);
+ Sequence seqPartOfOutMsg = (Sequence) highestOutRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+
+ if (seqPartOfOutMsg!=null) {
+
+ //response message of the last in message can be considered as the last out message.
+ highestOutMsgNo = seqPartOfOutMsg.getMessageNumber().getMessageNumber();
+ SequencePropertyBean highestOutMsgBean = new SequencePropertyBean (
+ responseSideInternalSequenceID,
+ Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,
+ new Long(highestOutMsgNo).toString() );
+
+ seqPropMgr.insert(highestOutMsgBean);
+ addResponseSideTerminate = true;
+ }
+ }
+ }
+
+ // If all the out message have been acked, add the outgoing terminate seq msg.
+ String outgoingSqunceID = SandeshaUtil.getSequenceProperty(responseSideInternalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,configCtx);
+ if (addResponseSideTerminate && highestOutMsgNo>0
+ && responseSideInternalSequenceID!=null && outgoingSqunceID!=null ) {
+ boolean allAcked = SandeshaUtil.isAllMsgsAckedUpto (highestOutMsgNo, responseSideInternalSequenceID, configCtx);
+
+ if (allAcked)
+ TerminateManager.addTerminateSequenceMessage(terminateRMMsg, outgoingSqunceID,responseSideInternalSequenceID);
+ }
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+
+ }
+
private void addTerminateSequenceResponse (RMMsgContext terminateSeqRMMsg) throws SandeshaException {
MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
@@ -164,6 +251,135 @@
} catch (AxisFault e) {
String message = "Could not send the terminate sequence response";
throw new SandeshaException (message,e);
+ }
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
+ ConfigurationContext configurationContext = msgContext.getConfigurationContext();
+ Options options = msgContext.getOptions();
+
+ StorageManager storageManager = SandeshaUtil
+ .getSandeshaStorageManager(configurationContext);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ String toAddress = rmMsgCtx.getTo().getAddress();
+ String sequenceKey = (String) options.getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
+ String internalSeqenceID = SandeshaUtil.getInternalSequenceID(toAddress,sequenceKey);
+
+ String outSequenceID = SandeshaUtil.getSequenceProperty(internalSeqenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,configurationContext);
+ if (outSequenceID==null)
+ throw new SandeshaException ("SequenceID was not found. Cannot send the terminate message");
+
+ Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+
+ String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
+ Sandesha2Constants.SequenceProperties.TERMINATE_ADDED,configurationContext);
+
+ if (terminated != null
+ && "true".equals(terminated)) {
+ String message = "Terminate was added previously.";
+ log.info(message);
+ return;
+ }
+
+// RMMsgContext terminateRMMessage = RMMsgCreator
+// .createTerminateSequenceMessage(incomingAckRMMsg, outSequenceId,internalSequenceId);
+
+ TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
+
+ rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+ msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+// String toAddress = SandeshaUtil.getSequenceProperty(internalSeqenceID,Sandesha2Constants.SequenceProperties.TO_EPR,configurationContext);
+
+// EndpointReference toEPR = new EndpointReference ( toBean.getValue());
+// if (toEPR == null) {
+// String message = "To EPR has an invalid value";
+// throw new SandeshaException(message);
+// }
+
+ rmMsgCtx.setTo(new EndpointReference(toAddress));
+
+
+// terminateRMMessage.setFrom(new EndpointReference(
+// Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+// terminateRMMessage.setFaultTo(new EndpointReference(
+// Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+
+ String rmVersion = SandeshaUtil.getRMVersion(internalSeqenceID,configurationContext);
+ if (rmVersion==null)
+ throw new SandeshaException ("Cant find the rmVersion of the given message");
+
+ rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+ rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+ //SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+ String transportTo = SandeshaUtil.getSequenceProperty(internalSeqenceID,Sandesha2Constants.SequenceProperties.TRANSPORT_TO,configurationContext);
+ if (transportTo!=null) {
+ rmMsgCtx.setProperty(MessageContextConstants.TRANSPORT_URL,transportTo);
+ }
+
+ try {
+ rmMsgCtx.addSOAPEnvelope();
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean terminateBean = new SenderBean();
+ terminateBean.setMessageContextRefKey(key);
+
+
+ storageManager.storeMessageContext(key,msgContext);
+
+
+ //Set a retransmitter lastSentTime so that terminate will be send with
+ // some delay.
+ //Otherwise this get send before return of the current request (ack).
+ //TODO: refine the terminate delay.
+ terminateBean.setTimeToSend(System.currentTimeMillis()
+ + Sandesha2Constants.TERMINATE_DELAY);
+
+ terminateBean.setMessageID(msgContext.getMessageID());
+
+ //this will be set to true at the sender.
+ terminateBean.setSend(true);
+
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+ Sandesha2Constants.VALUE_FALSE);
+
+ terminateBean.setReSend(false);
+
+ SenderBeanMgr retramsmitterMgr = storageManager
+ .getRetransmitterBeanMgr();
+
+ retramsmitterMgr.insert(terminateBean);
+
+ SequencePropertyBean terminateAdded = new SequencePropertyBean();
+ terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ terminateAdded.setSequenceID(outSequenceID);
+ terminateAdded.setValue("true");
+
+
+ seqPropMgr.insert(terminateAdded);
+
+ //This should be dumped to the storage by the sender
+ TransportOutDescription transportOut = msgContext.getTransportOut();
+ rmMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+ rmMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+ rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+ rmMsgCtx.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
+ addTerminateSeqTransaction.commit();
+
+ AxisEngine engine = new AxisEngine (configurationContext);
+ try {
+ engine.send(msgContext);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e.getMessage());
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Thu Mar 16 21:01:58 2006
@@ -31,10 +31,14 @@
private Log log = LogFactory.getLog(getClass());
- public void processMessage(RMMsgContext createSeqRMMsg)
+ public void processInMessage(RMMsgContext createSeqRMMsg)
throws SandeshaException {
//TODO add processing logic
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java Thu Mar 16 21:01:58 2006
@@ -45,6 +45,8 @@
public SequencePropertyBean findUnique (SequencePropertyBean bean) throws SandeshaException;
public boolean update(SequencePropertyBean bean) throws SandeshaStorageException;
+
+ public boolean updateOrInsert(SequencePropertyBean bean) throws SandeshaStorageException;
public Collection retrieveAll ();
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java Thu Mar 16 21:01:58 2006
@@ -65,7 +65,7 @@
}
public synchronized boolean update(CreateSeqBean bean) {
- if (!table.contains(bean))
+ if (table.get(bean.getCreateSeqMsgID())==null)
return false;
return table.put(bean.getCreateSeqMsgID(), bean) != null;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java Thu Mar 16 21:01:58 2006
@@ -96,7 +96,7 @@
}
public synchronized boolean update(InvokerBean bean) {
- if (!table.contains(bean))
+ if (table.get(bean.getMessageContextRefKey())==null)
return false;
return table.put(bean.getMessageContextRefKey(), bean) != null;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java Thu Mar 16 21:01:58 2006
@@ -98,7 +98,7 @@
}
public synchronized boolean update(NextMsgBean bean) {
- if (!table.contains(bean))
+ if (table.get(bean.getSequenceID())==null)
return false;
return table.put(bean.getSequenceID(), bean) != null;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Thu Mar 16 21:01:58 2006
@@ -167,7 +167,7 @@
}
public synchronized boolean update(SenderBean bean) {
- if (!table.contains(bean))
+ if (table.get(bean.getMessageID())==null)
return false;
return true; //No need to update. Being a reference does the job.
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java Thu Mar 16 21:01:58 2006
@@ -105,8 +105,17 @@
public synchronized boolean update(SequencePropertyBean bean) {
- if (!table.contains(bean))
+ if (table.get(getId(bean))==null)
return false;
+
+ return table.put(getId(bean), bean) != null;
+
+ }
+
+ public synchronized boolean updateOrInsert(SequencePropertyBean bean) {
+
+ if (table.get(getId(bean))==null)
+ table.put(getId(bean), bean);
return table.put(getId(bean), bean) != null;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java Thu Mar 16 21:01:58 2006
@@ -98,39 +98,43 @@
boolean refuseSequence = false;
String reason = "";
- SequenceOffer offer = createSequence.getSequenceOffer();
- if (offer != null) {
-
- String offeredSequenceId = offer.getIdentifer().getIdentifier();
- if (offeredSequenceId == null || "".equals(offeredSequenceId)) {
- refuseSequence = true;
- reason = "Offered sequenceId is invalid";
- }
-
- if (!refuseSequence) {
- NextMsgBeanMgr nextMsgBeanMgr = storageManager
- .getNextMsgBeanMgr();
- Collection collection = nextMsgBeanMgr.retrieveAll();
- Iterator it = collection.iterator();
- while (it.hasNext()) {
-
- //checking weather an outgoing sequence with the given id exists.
- SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager
- .getSequencePropretyBeanMgr();
- SequencePropertyBean sequencePropertyBean = sequencePropertyBeanMgr
- .retrieve(offeredSequenceId,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
- if (sequencePropertyBean != null) {
- String outSequenceId = (String) sequencePropertyBean.getValue();
- if (outSequenceId != null && outSequenceId.equals(offeredSequenceId)) {
- refuseSequence = true;
- reason = "An sequence with offered sequenceId has been already established";
- }
-
- }
- }
- }
-
- }
+// SequenceOffer offer = createSequence.getSequenceOffer();
+// if (offer != null) {
+//
+// String offeredSequenceId = offer.getIdentifer().getIdentifier();
+// if (offeredSequenceId == null || "".equals(offeredSequenceId)) {
+// refuseSequence = true;
+// reason = "Offered sequenceId is invalid";
+// }
+//
+// if (!refuseSequence) {
+// NextMsgBeanMgr nextMsgBeanMgr = storageManager
+// .getNextMsgBeanMgr();
+// Collection collection = nextMsgBeanMgr.retrieveAll();
+// Iterator it = collection.iterator();
+// while (it.hasNext()) {
+//
+// NextMsgBean nextMsgBean = (NextMsgBean) it.next();
+// String tempSequenceID = nextMsgBean.getSequenceID();
+//
+// if (tempSequenceID.equals(offeredSequenceId)) {
+// refuseSequence = true;
+// reason = "An incoming sequence with offered sequenceId has been already established";
+// }
+//
+// }
+//
+// SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager
+// .getSequencePropretyBeanMgr();
+// SequencePropertyBean sequencePropertyBean = sequencePropertyBeanMgr
+// .retrieve(offeredSequenceId,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+// if (sequencePropertyBean != null) {
+// refuseSequence = true;
+// reason = "A outgoing sequence with offered sequenceId has been already established";
+// }
+// }
+//
+// }
if (refuseSequence) {
FaultData data = new FaultData();
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Thu Mar 16 21:01:58 2006
@@ -45,43 +45,28 @@
String storedKey = (String) retransmitterBean.getMessageContextRefKey();
if (storedKey == null)
- return retransmitterBean;
+ throw new SandeshaException ("Stored Key not present in the retransmittable message");
MessageContext messageContext = storageManager.retrieveMessageContext(storedKey,configContext);
- if (messageContext.getConfigurationContext() == null)
- return retransmitterBean;
-// RMPolicyBean policyBean = (RMPolicyBean) messageContext
-// .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
-// if (policyBean == null) {
-// //loading default policies.
-// policyBean = PropertyManager.getInstance().getRMPolicyBean();
-// }
-
- Parameter parameter = messageContext.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
- if (parameter==null) {
- //log.error ("Cant adjust retransmission since, Parameter bean is not set");
- return retransmitterBean;
- }
-
- SandeshaPropertyBean propertyBean = (SandeshaPropertyBean) parameter.getValue();
+ SandeshaPropertyBean propertyBean = SandeshaUtil.getPropretyBean(messageContext);
//TODO make MaxRetransmissionCount a policy
- int maxRetransmissionCount = Sandesha2Constants.Properties.DefaultValues.RetransmissionCount;
- if (retransmitterBean.getSentCount()>maxRetransmissionCount) {
- log.debug("Stopping retransmission since maximum retransmission was exceeded");
- retransmitterBean.setSend(false);
-
- //TODO do reporting and cleaning since this sequence will not work correctly after this.
- }
+// int maxRetransmissionCount = Sandesha2Constants.Properties.DefaultValues.RetransmissionCount;
+// if (retransmitterBean.getSentCount()>maxRetransmissionCount) {
+// log.debug("Stopping retransmission since maximum retransmission was exceeded");
+// retransmitterBean.setSend(false);
+//
+// //TODO do reporting and cleaning since this sequence will not work correctly after this.
+// }
retransmitterBean.setSentCount(retransmitterBean.getSentCount() + 1);
adjustNextRetransmissionTime(retransmitterBean, propertyBean);
- if (retransmitterBean.getSentCount() >= Sandesha2Constants.MAXIMUM_RETRANSMISSION_ATTEMPTS)
- stopRetransmission(retransmitterBean);
+// if (retransmitterBean.getSentCount() >= Sandesha2Constants.MAXIMUM_RETRANSMISSION_ATTEMPTS)
+// stopRetransmission(retransmitterBean);
return retransmitterBean;
}
@@ -97,7 +82,7 @@
private SenderBean adjustNextRetransmissionTime(
SenderBean retransmitterBean, SandeshaPropertyBean propertyBean) {
- long lastSentTime = retransmitterBean.getTimeToSend();
+// long lastSentTime = retransmitterBean.getTimeToSend();
int count = retransmitterBean.getSentCount();
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Thu Mar 16 21:01:58 2006
@@ -40,6 +40,7 @@
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.Parameter;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.Handler;
import org.apache.ws.commons.om.OMElement;
@@ -245,34 +246,7 @@
invoker.stopInvokerForTheSequence (sequenceID);
}
- public static boolean verifySequenceCompletion(Iterator ackRangesIterator,
- long lastMessageNo) {
- HashMap startMap = new HashMap();
-
- while (ackRangesIterator.hasNext()) {
- AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator
- .next();
- startMap.put(new Long(temp.getLowerValue()), temp);
- }
-
- long start = 1;
- boolean loop = true;
- while (loop) {
- AcknowledgementRange temp = (AcknowledgementRange) startMap
- .get(new Long(start));
- if (temp == null) {
- loop = false;
- continue;
- }
-
- if (temp.getUpperValue() >= lastMessageNo)
- return true;
- start = temp.getUpperValue() + 1;
- }
-
- return false;
- }
/*public static SOAPEnvelope createSOAPMessage(MessageContext msgContext,
String soapNamespaceURI) throws AxisFault {
@@ -387,7 +361,7 @@
public static String getServerSideIncomingSeqIdFromInternalSeqId (
String internalSequenceId) throws SandeshaException {
- String startStr = Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":";
+ String startStr = Sandesha2Constants.INTERNAL_SEQUENCE_PREFIX + ":";
if (!internalSequenceId.startsWith(startStr)){
throw new SandeshaException ("Invalid internal sequence ID");
}
@@ -396,11 +370,11 @@
return incomingSequenceId;
}
- public static String getServerSideInternalSeqIdFromIncomingSeqId(
- String incomingSequenceId) {
- String internalSequenceId = Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":" + incomingSequenceId;
- return internalSequenceId;
- }
+// public static String getServerSideInternalSeqIdFromIncomingSeqId(
+// String incomingSequenceId) {
+// String internalSequenceId = Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":" + incomingSequenceId;
+// return internalSequenceId;
+// }
/**
* Used to obtain the storage Manager Implementation.
@@ -654,6 +628,26 @@
return retArr;
}
+ public static ArrayList getArrayListFromMsgsString (String str) throws SandeshaException {
+
+ if (str==null || "".equals(str))
+ return new ArrayList ();
+
+ ArrayList retArr = new ArrayList ();
+
+ StringTokenizer tokenizer = new StringTokenizer (str,",");
+
+ while (tokenizer.hasMoreElements()) {
+ String nextToken = tokenizer.nextToken();
+ if (nextToken!=null && !"".equals(nextToken)) {
+ Long lng = new Long (nextToken);
+ retArr.add(lng);
+ }
+ }
+
+ return retArr;
+ }
+
public static String getInternalSequenceID (String to, String sequenceKey) {
if (to==null && sequenceKey==null)
return null;
@@ -808,23 +802,112 @@
}
-// public static boolean inNumberPresentInList (String list, long no) throws SandeshaException {
+// public static boolean isNumberPresentInList (String list, long no) throws SandeshaException {
+// StringTokenizer tokenizer = new StringTokenizer (list,Sandesha2Constants.LIST_SEPERATOR);
+// while (tokenizer.hasMoreElements()){
+// String listPart = tokenizer.nextToken();
+// if (!"".equals(listPart) && isNumberPresentInListPart(listPart,no))
+// return true;
+// }
//
+// return false;
// }
//
// public static String putNumberToList (String list, long no) throws SandeshaException {
+//
// StringTokenizer tokenizer = new StringTokenizer (list,Sandesha2Constants.LIST_SEPERATOR);
//
+// boolean present = false;
// while (tokenizer.hasMoreElements()){
-// String element = tokenizer.nextToken();
-// String[] items = element.split(Sandesha2Constants.LIST_ITEM_SEPERATOR);
-// if (items.length!=1 && items.length!=2)
-// throw new SandeshaException ("Invalid string array");
-//
-//
+// String listPart = tokenizer.nextToken();
+// if (!"".equals(listPart) && isNumberPresentInListPart(listPart,no))
+// present = true;
// }
//
+// list = list + Sandesha2Constants.LIST_SEPERATOR + new Long (no).toString();
+// sortListParts (list);
+// mergeListParts (list);
// }
+//
+// private static boolean isNumberPresentInListPart (String listPart, long no) throws SandeshaException {
+// if (listPart==null || "".equals(listPart))
+// throw new SandeshaException ("Invalid list part");
+//
+// int seperatorPosition = listPart.indexOf(Sandesha2Constants.LIST_PART_SEPERATOR);
+//
+// try {
+// if (seperatorPosition<0) {
+// //this must be a single number
+// long tempNo = new Long (listPart).longValue();
+// if (no==tempNo)
+// return true;
+//
+// return false;
+// } else {
+// String number1Str = listPart.substring(0,seperatorPosition);
+// String number2Str = listPart.substring((seperatorPosition+1),listPart.length());
+//
+// long number1 = new Long (number1Str).longValue();
+// long number2 = new Long (number2Str).longValue();
+//
+// if (number1>number2)
+// throw new SandeshaException ("list part have numbers in the wrong order");
+//
+// if (no>=number1 && no<=number2)
+// return true;
+//
+// return false;
+// }
+// } catch (NumberFormatException e) {
+// throw new SandeshaException ("Invalid list part",e);
+// }
+// }
+//
+// private String sortListParts (String list) {
+//
+// }
+//
+// private String mergeListParts (String list) {
+//
+// }
+//
+// private void updatePartOfTheList () {
+//
+// }
+
+ public static String getSequenceProperty (String id, String name, ConfigurationContext context) throws SandeshaException {
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean sequencePropertyBean = sequencePropertyBeanMgr.retrieve(id,name);
+ if (sequencePropertyBean==null)
+ return null;
+ else
+ return sequencePropertyBean.getValue();
+ }
+
+ public static boolean isAllMsgsAckedUpto (long highestInMsgNo, String internalSequenceID, ConfigurationContext configCtx) throws SandeshaException {
+
+ String clientCompletedMessages = getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES,configCtx);
+ ArrayList ackedMsgsList = getArrayListFromString(clientCompletedMessages);
+
+ long smallestMsgNo = 1;
+ for (long tempMsgNo=smallestMsgNo;tempMsgNo<=highestInMsgNo;tempMsgNo++) {
+ if (!ackedMsgsList.contains(new Long(tempMsgNo).toString()))
+ return false;
+ }
+
+ return true; //all message upto the highest have been acked.
+ }
+
+ public static SandeshaPropertyBean getPropretyBean (MessageContext messageCtx) throws SandeshaException {
+ Parameter parameter = messageCtx.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
+ if (parameter==null)
+ throw new SandeshaException ("Property bean not set for the message");
+
+ SandeshaPropertyBean propertyBean = (SandeshaPropertyBean) parameter.getValue ();
+ return propertyBean;
+ }
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Thu Mar 16 21:01:58 2006
@@ -193,24 +193,22 @@
.getTransportOut();
TransportSender transportSender = transportOutDescription
.getSender();
+
+ boolean successfullySent = false;
if (transportSender != null) {
- transportSender.invoke(msgCtx);
+ try {
+ transportSender.invoke(msgCtx);
+ successfullySent = true;
+ } catch (AxisFault e) {
+ // TODO Auto-generated catch block
+ log.debug("Could not send message");
+ log.debug(e.getStackTrace().toString());
+ }
}
-
-
-
Transaction postSendTransaction = storageManager.getTransaction();
MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
-
- if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
- Sequence sequence = (Sequence) rmMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- long messageNo = sequence.getMessageNumber()
- .getMessageNumber();
- }
-
retransmitterAdjuster.adjustRetransmittion(bean, context);
// update or delete only if the object is still present.
@@ -227,8 +225,10 @@
postSendTransaction.commit(); // commiting the current transaction
- if (!msgCtx.isServerSide())
- checkForSyncResponses(msgCtx);
+ if (successfullySent) {
+ if (!msgCtx.isServerSide())
+ checkForSyncResponses(msgCtx);
+ }
Transaction terminateCleaningTransaction = storageManager
.getTransaction();
@@ -240,16 +240,7 @@
.getIdentifier();
ConfigurationContext configContext = msgCtx
.getConfigurationContext();
-
TerminateManager.terminateSendingSide(configContext,sequenceID, msgCtx.isServerSide());
-
- // removing a entry from the Listener
- String transport = msgCtx.getTransportOut().getName()
- .getLocalPart();
-
- // TODO complete below. Need a more eligent method which
- // finishes the current message before ending.
- // ListenerManager.stop(configContext,transport);
}
terminateCleaningTransaction.commit();
@@ -258,7 +249,8 @@
} catch (AxisFault e) {
String message = "An Exception was throws in sending";
- log.error(e.getMessage());
+ log.debug(message);
+ log.debug(e.getMessage());
// TODO : when this is the client side throw the exception to
// the client when necessary.
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org