You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/09/23 17:59:06 UTC
svn commit: r449267 [2/3] - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2: client/
msgprocessors/ util/
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=449267&r1=449266&r2=449267
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Sat Sep 23 08:59:05 2006
@@ -1,1181 +1,1192 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import java.util.ArrayList;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPBody;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.AddressingConstants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
-import org.apache.axis2.context.ServiceContext;
-import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.transport.TransportSender;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaListener;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.InvokerBean;
-import org.apache.sandesha2.storage.beans.NextMsgBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.RMMsgCreator;
-import org.apache.sandesha2.util.SOAPAbstractFactory;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
-import org.apache.sandesha2.util.SpecSpecificConstants;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CreateSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.SequenceOffer;
-import org.apache.sandesha2.wsrm.UsesSequenceSTR;
-
-/**
- * Responsible for processing an incoming Application message.
- */
-
-public class ApplicationMsgProcessor implements MsgProcessor {
-
- private static final Log log = LogFactory.getLog(ApplicationMsgProcessor.class);
-
- public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
- if (log.isDebugEnabled())
- log.debug("Enter: ApplicationMsgProcessor::processInMessage");
-
- // Processing the application message.
- MessageContext msgCtx = rmMsgCtx.getMessageContext();
- if (msgCtx == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgContextNotSetInbound);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
- && rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
- return;
- }
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
- Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String sequenceId = sequence.getIdentifier().getIdentifier();
-
- String propertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
- // Check that both the Sequence header and message body have been secured properly
- SequencePropertyBean tokenBean = seqPropMgr.retrieve(propertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
- if(tokenBean != null) {
- SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
- OMElement body = msgCtx.getEnvelope().getBody();
- SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
-
- //TODO get the element from the SOAP Envelope
-// secManager.checkProofOfPossession(token, sequence.getOMElement(), msgCtx);
-
- secManager.checkProofOfPossession(token, body, msgCtx);
- }
-
- //RM will not send sync responses. If sync acks are there this will be
- // made true again later.
- if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
- rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,
- Constants.VALUE_FALSE);
- }
-
- FaultManager faultManager = new FaultManager();
- RMMsgContext faultMessageContext = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx, storageManager);
- if (faultMessageContext != null) {
- ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
- AxisEngine engine = new AxisEngine(configurationContext);
-
- try {
- engine.sendFault(faultMessageContext.getMessageContext());
- } catch (AxisFault e) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFault, e
- .toString()));
- }
-
- msgCtx.pause();
- return;
- }
-
- // setting acked msg no range
- ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
- if (configCtx == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
- if (faultMessageContext != null) {
- ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
- AxisEngine engine = new AxisEngine(configurationContext);
-
- try {
- engine.send(faultMessageContext.getMessageContext());
- } catch (AxisFault e) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFault, e
- .toString()));
- }
-
- msgCtx.pause();
- return;
- }
-
- // setting mustUnderstand to false.
- sequence.setMustUnderstand(false);
- rmMsgCtx.addSOAPEnvelope();
-
- // throwing a fault if the sequence is closed.
- faultMessageContext = faultManager.checkForSequenceClosed(rmMsgCtx, sequenceId, storageManager);
- if (faultMessageContext != null) {
- ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
- AxisEngine engine = new AxisEngine(configurationContext);
-
- try {
- engine.sendFault(faultMessageContext.getMessageContext());
- } catch (AxisFault e) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFault, e
- .toString()));
- }
-
- return;
- }
-
- // updating the last activated time of the sequence.
- SequenceManager.updateLastActivatedTime(propertyKey, storageManager);
-
- SequencePropertyBean msgsBean = seqPropMgr.retrieve(propertyKey,
- Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-
- long msgNo = sequence.getMessageNumber().getMessageNumber();
- if (msgNo == 0) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
- .toString(msgNo));
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String key = SandeshaUtil.getUUID(); // key to store the message.
-
- // updating the Highest_In_Msg_No property which gives the highest
- // message number retrieved from this sequence.
- String highetsInMsgNoStr = SandeshaUtil.getSequenceProperty(propertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
- String highetsInMsgKey = SandeshaUtil.getSequenceProperty(propertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
- if (highetsInMsgKey == null)
- highetsInMsgKey = SandeshaUtil.getUUID();
-
- long highestInMsgNo = 0;
- if (highetsInMsgNoStr != null) {
- highestInMsgNo = Long.parseLong(highetsInMsgNoStr);
- }
-
- if (msgNo > highestInMsgNo) {
- highestInMsgNo = msgNo;
-
- String str = new Long(msgNo).toString();
- SequencePropertyBean highestMsgNoBean = new SequencePropertyBean(propertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, str);
- SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean(propertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, highetsInMsgKey);
-
- // storing the new message as the highest in message.
- storageManager.removeMessageContext(highetsInMsgKey);
- storageManager.storeMessageContext(highetsInMsgKey, msgCtx);
-
- if (highetsInMsgNoStr != null) {
- seqPropMgr.update(highestMsgNoBean);
- seqPropMgr.update(highestMsgKeyBean);
- } else {
- seqPropMgr.insert(highestMsgNoBean);
- seqPropMgr.insert(highestMsgKeyBean);
- }
- }
-
- String messagesStr = "";
- if (msgsBean != null)
- messagesStr = (String) msgsBean.getValue();
- else {
- msgsBean = new SequencePropertyBean();
- msgsBean.setSequencePropertyKey(propertyKey);
- msgsBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
- msgsBean.setValue(messagesStr);
- }
-
- if (msgNoPresentInList(messagesStr, msgNo)
- && (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
- // this is a duplicate message and the invocation type is
- // EXACTLY_ONCE.
- rmMsgCtx.pause();
- }
-
- if (messagesStr != "" && messagesStr != null)
- messagesStr = messagesStr + "," + Long.toString(msgNo);
- else
- messagesStr = Long.toString(msgNo);
-
- msgsBean.setValue(messagesStr);
- seqPropMgr.update(msgsBean);
-
- // Pause the messages bean if not the right message to invoke.
- NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
- NextMsgBean bean = mgr.retrieve(sequenceId);
-
- if (bean == null) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotFindSequence,
- sequenceId));
- }
-
- InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
-
- // inorder invocation is still a global property
- boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
- msgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
-
-
- //setting properties for the messageContext
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo));
-
- if (inOrderInvocation) {
-
- SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr.retrieve(
- Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
- Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
- if (incomingSequenceListBean == null) {
- ArrayList incomingSequenceList = new ArrayList();
- incomingSequenceListBean = new SequencePropertyBean();
- incomingSequenceListBean.setSequencePropertyKey(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
- incomingSequenceListBean.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
- incomingSequenceListBean.setValue(incomingSequenceList.toString());
-
- // this get inserted before
- seqPropMgr.insert(incomingSequenceListBean);
- }
-
- ArrayList incomingSequenceList = SandeshaUtil.getArrayListFromString(incomingSequenceListBean.getValue());
-
- // Adding current sequence to the incoming sequence List.
- if (!incomingSequenceList.contains(sequenceId)) {
- incomingSequenceList.add(sequenceId);
-
- // saving the property.
- incomingSequenceListBean.setValue(incomingSequenceList.toString());
- seqPropMgr.update(incomingSequenceListBean);
- }
-
- // saving the message.
- try {
- storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
- storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
-
- // This will avoid performing application processing more
- // than
- // once.
- rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- } catch (Exception ex) {
- throw new SandeshaException(ex.getMessage());
- }
-
- // pause the message
- rmMsgCtx.pause();
-
- // Starting the invoker if stopped.
- SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), sequenceId);
-
- }
-
- // Sending acknowledgements
- sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::processInMessage");
- }
-
- // TODO convert following from INT to LONG
- private boolean msgNoPresentInList(String list, long no) {
- String[] msgStrs = list.split(",");
-
- int l = msgStrs.length;
-
- for (int i = 0; i < l; i++) {
- if (msgStrs[i].equals(Long.toString(no)))
- return true;
- }
-
- return false;
- }
-
- public void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager)
- throws AxisFault {
-
- if (log.isDebugEnabled())
- log.debug("Enter: ApplicationMsgProcessor::sendAckIfNeeded");
-
- String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
- Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String sequenceId = sequence.getIdentifier().getIdentifier();
- ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
- if (configCtx == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
-
- RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey ,sequenceId, storageManager);
-
- AxisEngine engine = new AxisEngine(configCtx);
-
- try {
- engine.send(ackRMMessage.getMessageContext());
- } catch (AxisFault e) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendAck, sequenceId, e
- .toString());
- throw new SandeshaException(message, e);
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::sendAckIfNeeded");
- }
-
- public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
- if (log.isDebugEnabled())
- log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
-
- MessageContext msgContext = rmMsgCtx.getMessageContext();
- ConfigurationContext configContext = msgContext.getConfigurationContext();
-
- // setting the Fault callback
- SandeshaListener faultCallback = (SandeshaListener) msgContext.getOptions().getProperty(
- SandeshaClientConstants.SANDESHA_LISTENER);
- if (faultCallback != null) {
- OperationContext operationContext = msgContext.getOperationContext();
- if (operationContext != null) {
- operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER, faultCallback);
- }
- }
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext
- .getAxisConfiguration());
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
- boolean serverSide = msgContext.isServerSide();
-
- // setting message Id if null
- if (msgContext.getMessageID() == null)
- msgContext.setMessageID(SandeshaUtil.getUUID());
-
- // find internal sequence id
- String internalSequenceId = null;
-
- String storageKey = SandeshaUtil.getUUID(); // the key which will be
- // used to store this
- // message.
-
- /*
- * Internal sequence id is the one used to refer to the sequence (since
- * actual sequence id is not available when first msg arrives) server
- * side - a derivation of the sequenceId of the incoming sequence client
- * side - a derivation of wsaTo & SeequenceKey
- */
-
- boolean lastMessage = false;
- if (serverSide) {
- // getting the request message and rmMessage.
- MessageContext reqMsgCtx;
- try {
- reqMsgCtx = msgContext.getOperationContext().getMessageContext(
- OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- } catch (AxisFault e) {
- throw new SandeshaException(e);
- }
-
- RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(reqMsgCtx);
-
- Sequence reqSequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (reqSequence == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String incomingSeqId = reqSequence.getIdentifier().getIdentifier();
- if (incomingSeqId == null || incomingSeqId == "") {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.incomingSequenceNotValidID, "''"
- + incomingSeqId + "''");
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- long requestMsgNo = reqSequence.getMessageNumber().getMessageNumber();
-
- internalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(incomingSeqId);
-
- // deciding weather the last message.
- String requestLastMsgNoStr = SandeshaUtil.getSequenceProperty(incomingSeqId,
- Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO, storageManager);
- if (requestLastMsgNoStr != null) {
- long requestLastMsgNo = Long.parseLong(requestLastMsgNoStr);
- if (requestLastMsgNo == requestMsgNo)
- lastMessage = true;
- }
-
- } else {
- // set the internal sequence id for the client side.
- EndpointReference toEPR = msgContext.getTo();
- if (toEPR == null || toEPR.getAddress() == null || "".equals(toEPR.getAddress())) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String to = toEPR.getAddress();
- String sequenceKey = (String) msgContext.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- internalSequenceId = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
-
- String lastAppMessage = (String) msgContext.getProperty(SandeshaClientConstants.LAST_MESSAGE);
- if (lastAppMessage != null && "true".equals(lastAppMessage))
- lastMessage = true;
- }
-
- if (internalSequenceId!=null)
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
-
- String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
- /*
- * checking weather the user has given the messageNumber (most of the
- * cases this will not be the case where the system will generate the
- * message numbers
- */
-
- // User should set it as a long object.
- Long messageNumberLng = (Long) msgContext.getProperty(SandeshaClientConstants.MESSAGE_NUMBER);
-
- long givenMessageNumber = -1;
- if (messageNumberLng != null) {
- givenMessageNumber = messageNumberLng.longValue();
- if (givenMessageNumber <= 0) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.msgNumberMustBeLargerThanZero, Long.toString(givenMessageNumber)));
- }
- }
-
- // the message number that was last used.
- long systemMessageNumber = getPreviousMsgNo(configContext, sequencePropertyKey, storageManager);
-
- // The number given by the user has to be larger than the last stored
- // number.
- if (givenMessageNumber > 0 && givenMessageNumber <= systemMessageNumber) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberNotLargerThanLastMsg, Long
- .toString(givenMessageNumber));
- throw new SandeshaException(message);
- }
-
- // Finding the correct message number.
- long messageNumber = -1;
- if (givenMessageNumber > 0) // if given message number is valid use it.
- // (this is larger than the last stored due
- // to the last check)
- messageNumber = givenMessageNumber;
- else if (systemMessageNumber > 0) { // if system message number is valid
- // use it.
- messageNumber = systemMessageNumber + 1;
- } else { // This is the first message (systemMessageNumber = -1)
- messageNumber = 1;
- }
-
- // A dummy message is a one which will not be processed as a actual
- // application message.
- // The RM handlers will simply let these go.
- String dummyMessageString = (String) msgContext.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
- boolean dummyMessage = false;
- if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
- dummyMessage = true;
-
- // saving the used message number
- if (!dummyMessage)
- setNextMsgNo(configContext, sequencePropertyKey, messageNumber, storageManager);
-
- // set this as the response highest message.
- SequencePropertyBean responseHighestMsgBean = new SequencePropertyBean(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_NUMBER, new Long(messageNumber).toString());
- seqPropMgr.insert(responseHighestMsgBean);
-
- if (lastMessage) {
-
- SequencePropertyBean responseHighestMsgKeyBean = new SequencePropertyBean(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_KEY, storageKey);
-
- SequencePropertyBean responseLastMsgKeyBean = new SequencePropertyBean(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO, new Long(messageNumber).toString());
-
- seqPropMgr.insert(responseHighestMsgKeyBean);
- seqPropMgr.insert(responseLastMsgKeyBean);
- }
-
- boolean sendCreateSequence = false;
-
- SequencePropertyBean outSeqBean = seqPropMgr.retrieve(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
-
- // setting async ack endpoint for the server side. (if present)
- if (serverSide) {
-// String incomingSequenceID = SandeshaUtil.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
- SequencePropertyBean incomingToBean = seqPropMgr.retrieve(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.TO_EPR);
- if (incomingToBean != null) {
- String incomingTo = incomingToBean.getValue();
- msgContext.setProperty(SandeshaClientConstants.AcksTo, incomingTo);
- }
- }
-
- // FINDING THE SPEC VERSION
- String specVersion = null;
- if (msgContext.isServerSide()) {
- // in the server side, get the RM version from the request sequence.
- MessageContext requestMessageContext;
- try {
- requestMessageContext = msgContext.getOperationContext().getMessageContext(
- AxisOperationFactory.MESSAGE_LABEL_IN_VALUE);
- } catch (AxisFault e) {
- throw new SandeshaException(e);
- }
-
- if (requestMessageContext == null) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.requestMsgContextNull));
- }
-
- RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(requestMessageContext);
-
- Sequence sequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String requestSequenceID = sequence.getIdentifier().getIdentifier();
-
- String requestSideSequencePropertyKey = SandeshaUtil.getSequencePropertyKey(requestRMMsgCtx);
-
- SequencePropertyBean specVersionBean = seqPropMgr.retrieve(requestSideSequencePropertyKey,
- Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
- if (specVersionBean == null) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.specVersionPropertyNotAvailable, requestSequenceID));
- }
-
- specVersion = specVersionBean.getValue();
- } else {
- // in the client side, user will set the RM version.
- specVersion = (String) msgContext.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
- }
-
- if (specVersion == null)
- specVersion = SpecSpecificConstants.getDefaultSpecVersion(); // TODO
- // change
- // the
- // default
- // to
- // v1_1.
-
- if (messageNumber == 1) {
- if (outSeqBean == null) { // out sequence will be set for the
- // server side, in the case of an offer.
- sendCreateSequence = true; // message number being one and not
- // having an out sequence, implies
- // that a create sequence has to be
- // send.
- }
-
- // if first message - setup the sending side sequence - both for the
- // server and the client sides
- SequenceManager.setupNewClientSequence(msgContext, sequencePropertyKey, specVersion, storageManager);
- }
-
- ServiceContext serviceContext = msgContext.getServiceContext();
- OperationContext operationContext = msgContext.getOperationContext();
-
- // SENDING THE CREATE SEQUENCE.
- if (sendCreateSequence) {
- SequencePropertyBean responseCreateSeqAdded = seqPropMgr.retrieve(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT);
-
- String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
- String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
-
- if (responseCreateSeqAdded == null) {
- responseCreateSeqAdded = new SequencePropertyBean(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT, "true");
- seqPropMgr.insert(responseCreateSeqAdded);
-
- String acksTo = null;
- if (serviceContext != null)
- acksTo = (String) msgContext.getProperty(SandeshaClientConstants.AcksTo);
-
- if (msgContext.isServerSide()) {
- // we do not set acksTo value to anonymous when the create
- // sequence is send from the server.
- MessageContext requestMessage;
- try {
- requestMessage = operationContext
- .getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- } catch (AxisFault e) {
- throw new SandeshaException(e);
- }
-
- if (requestMessage == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.requestMsgNotPresent);
- log.debug(message);
- throw new SandeshaException(message);
- }
- acksTo = requestMessage.getTo().getAddress();
-
- } else {
- if (acksTo == null)
- acksTo = anonymousURI;
- }
-
- if (!anonymousURI.equals(acksTo) && !serverSide) {
- String transportIn = (String) configContext // TODO verify
- .getProperty(MessageContext.TRANSPORT_IN);
- if (transportIn == null)
- transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
- } else if (acksTo == null && serverSide) {
-// String incomingSequencId = SandeshaUtil
-// .getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
-
- try {
- MessageContext requestMsgContext = operationContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- RMMsgContext requestRMMsgContext = MsgInitializer.initializeMessage(requestMsgContext);
-
- String requestSideSequencePropertyKey = SandeshaUtil.getSequencePropertyKey(requestRMMsgContext);
-
- SequencePropertyBean bean = seqPropMgr.retrieve(requestSideSequencePropertyKey,
- Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
- if (bean != null) {
- EndpointReference acksToEPR = new EndpointReference(bean.getValue());
- if (acksToEPR != null)
- acksTo = (String) acksToEPR.getAddress();
- }
- } catch (AxisFault e) {
- throw new SandeshaException (e);
- }
- } else if (anonymousURI.equals(acksTo)) {
- // set transport in.
- Object trIn = msgContext.getProperty(MessageContext.TRANSPORT_IN);
- if (trIn == null) {
- // TODO
- }
- }
- addCreateSequenceMessage(rmMsgCtx, sequencePropertyKey ,internalSequenceId, acksTo, storageManager);
- }
- }
-
- SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
- if (env == null) {
- SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(env))
- .getDefaultEnvelope();
- rmMsgCtx.setSOAPEnvelop(envelope);
- }
-
- SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
- if (soapBody == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapBodyNotPresent);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String messageId1 = SandeshaUtil.getUUID();
- if (rmMsgCtx.getMessageId() == null) {
- rmMsgCtx.setMessageId(messageId1);
- }
-
- if (serverSide) {
- // let the request end with 202 if a ack has not been
- // written in the incoming thread.
-
- MessageContext reqMsgCtx = null;
- try {
- reqMsgCtx = msgContext.getOperationContext().getMessageContext(
- OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- } catch (AxisFault e) {
- throw new SandeshaException(e);
- }
-
- if (reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN) == null
- || !"true".equals(reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN)))
- reqMsgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
- }
-
- EndpointReference toEPR = msgContext.getTo();
- if (toEPR == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- // setting default actions.
- String to = toEPR.getAddress();
- String operationName = msgContext.getOperationContext().getAxisOperation().getName().getLocalPart();
- if (msgContext.getWSAAction() == null) {
- msgContext.setWSAAction(to + "/" + operationName);
- }
- if (msgContext.getSoapAction() == null) {
- msgContext.setSoapAction("\"" + to + "/" + operationName + "\"");
- }
-
- // processing the response if not an dummy.
- if (!dummyMessage)
- processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber, storageKey, storageManager);
-
-
- if (!isWSAAnonymous (to)) {
- //If message has a real to address or if it is in the polling-mode it shoud be send by the sender or should
- //be taken away by make connections, so pausing it.
-
- msgContext.pause(); // the execution will be stopped.
- }
-
- //If to address is wsa:anonymous it wont be possible to send the this message so, letting it go in the current thread.
- //(it might get the the other end in the back-channel of the request message, no retransmissions possible).
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::processOutMessage");
- }
-
- private void addCreateSequenceMessage(RMMsgContext applicationRMMsg, String sequencePropertyKey, String internalSequenceId, String acksTo,
- StorageManager storageManager) throws AxisFault {
-
- if (log.isDebugEnabled())
- log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + internalSequenceId);
-
- MessageContext applicationMsg = applicationRMMsg.getMessageContext();
- ConfigurationContext configCtx = applicationMsg.getConfigurationContext();
-
- // generating a new create sequeuce message.
- RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(applicationRMMsg, sequencePropertyKey, acksTo,
- storageManager);
-
- createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
- CreateSequence createSequencePart = (CreateSequence) createSeqRMMessage
- .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
-
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
- CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
- SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
-
- SequenceOffer offer = createSequencePart.getSequenceOffer();
- if (offer != null) {
- String offeredSequenceId = offer.getIdentifer().getIdentifier();
-
- SequencePropertyBean offeredSequenceBean = new SequencePropertyBean();
- offeredSequenceBean.setName(Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
- offeredSequenceBean.setSequencePropertyKey(sequencePropertyKey);
- offeredSequenceBean.setValue(offeredSequenceId);
-
- seqPropMgr.insert(offeredSequenceBean);
- }
-
- MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
- createSeqMsg.setRelationships(null); // create seq msg does not
- // relateTo anything
-
- String createSequenceMessageStoreKey = SandeshaUtil.getUUID(); // the key taht will be used to store
- //the create sequence message.
-
- CreateSeqBean createSeqBean = new CreateSeqBean();
- createSeqBean.setInternalSequenceID(internalSequenceId);
- createSeqBean.setCreateSeqMsgID(createSeqMsg.getMessageID());
- createSeqBean.setCreateSequenceMsgStoreKey(createSequenceMessageStoreKey);
-
- //TODO set the replyTo of CreateSeq (and others) to Anymomous if Application Msgs hv it as Anonymous.
-
- //checking weather the sequence is in polling mode.
- boolean pollingMode = false;
- EndpointReference replyTo = applicationRMMsg.getReplyTo();
- if (replyTo!=null && SandeshaUtil.isWSRMAnonymousReplyTo(replyTo.getAddress()))
- pollingMode = true;
- else if (replyTo!=null && offer!=null &&
- (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyTo.getAddress()) ||
- AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyTo.getAddress())))
- pollingMode = true;
-
- createSeqBean.setPollingMode(pollingMode);
-
- //if PollingMode is true, starting the pollingmanager.
- if (pollingMode)
- SandeshaUtil.startPollingManager(configCtx);
-
- SecurityToken token = (SecurityToken) createSeqRMMessage.getProperty(Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
- if(token != null) {
- SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
- createSeqBean.setSecurityTokenData(secManager.getTokenRecoveryData(token));
-
- // If we are using token based security, and the 1.1 spec level, then we
- // should introduce a UsesSequenceSTR header into the message.
- if(createSequencePart.getNamespaceValue().equals(Sandesha2Constants.SPEC_2006_08.NS_URI)) {
- UsesSequenceSTR header = new UsesSequenceSTR(null, Sandesha2Constants.SPEC_2006_08.NS_URI);
- header.toSOAPEnvelope(createSeqMsg.getEnvelope());
- }
- }
-
- createSeqMgr.insert(createSeqBean);
-
- String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
- String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
-
- if (createSeqMsg.getReplyTo() == null)
- createSeqMsg.setReplyTo(new EndpointReference(anonymousURI));
-
- SenderBean createSeqEntry = new SenderBean();
- createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
- createSeqEntry.setTimeToSend(System.currentTimeMillis());
- createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
- createSeqEntry.setInternalSequenceID(sequencePropertyKey);
- // this will be set to true in the sender
- createSeqEntry.setSend(true);
-
- createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
- createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
- retransmitterMgr.insert(createSeqEntry);
-
- storageManager.storeMessageContext(createSequenceMessageStoreKey, createSeqMsg); // storing the
- // message.
-
- // message will be stored in the Sandesha2TransportSender
- createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, createSequenceMessageStoreKey);
-
- TransportOutDescription transportOut = createSeqMsg.getTransportOut();
-
- createSeqMsg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC, transportOut);
- createSeqMsg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
- createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, createSequenceMessageStoreKey);
-
- Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
- createSeqMsg.setTransportOut(sandesha2TransportOutDesc);
-
- // sending the message once through Sandesha2TransportSender.
- AxisEngine engine = new AxisEngine(createSeqMsg.getConfigurationContext());
- try {
- engine.resumeSend(createSeqMsg);
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage());
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage");
- }
-
- private void processResponseMessage(RMMsgContext rmMsg, String internalSequenceId, long messageNumber,
- String storageKey, StorageManager storageManager) throws SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId);
-
- MessageContext msg = rmMsg.getMessageContext();
-
- SequencePropertyBeanMgr sequencePropertyMgr = storageManager.getSequencePropertyBeanMgr();
- SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
-
- SequencePropertyBean toBean = sequencePropertyMgr.retrieve(internalSequenceId,
- Sandesha2Constants.SequenceProperties.TO_EPR);
- SequencePropertyBean replyToBean = sequencePropertyMgr.retrieve(internalSequenceId,
- Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
-
- // again - looks weird in the client side - but consistent
- SequencePropertyBean outSequenceBean = sequencePropertyMgr.retrieve(internalSequenceId,
- Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
-
- if (toBean == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- EndpointReference toEPR = new EndpointReference(toBean.getValue());
-
- EndpointReference replyToEPR = null;
- if (replyToBean != null) {
- replyToEPR = new EndpointReference(replyToBean.getValue());
- }
-
- if (toEPR == null || toEPR.getAddress() == null || toEPR.getAddress() == "") {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String newToStr = null;
- if (msg.isServerSide()) {
- try {
- MessageContext requestMsg = msg.getOperationContext().getMessageContext(
- OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- if (requestMsg != null) {
- newToStr = requestMsg.getReplyTo().getAddress();
- }
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage());
- }
- }
-
- if (newToStr != null)
- rmMsg.setTo(new EndpointReference(newToStr));
- else
- rmMsg.setTo(toEPR);
-
- if (replyToEPR != null)
- rmMsg.setReplyTo(replyToEPR);
-
- String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId, storageManager);
- if (rmVersion == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.specVersionPropertyNotAvailable, internalSequenceId));
-
- String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
-
- Sequence sequence = new Sequence(rmNamespaceValue);
- MessageNumber msgNumber = new MessageNumber(rmNamespaceValue);
- msgNumber.setMessageNumber(messageNumber);
- sequence.setMessageNumber(msgNumber);
-
- // setting last message
- if (msg.isServerSide()) {
- MessageContext requestMsg = null;
-
- try {
- requestMsg = msg.getOperationContext()
- .getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage());
- }
-
- RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsg);
- Sequence requestSequence = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (requestSequence == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.requestSeqIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- if (requestSequence.getLastMessage() != null) {
- sequence.setLastMessage(new LastMessage(rmNamespaceValue));
- }
-
- } else {
- // client side
-
- OperationContext operationContext = msg.getOperationContext();
- if (operationContext != null) {
- Object obj = msg.getProperty(SandeshaClientConstants.LAST_MESSAGE);
- if (obj != null && "true".equals(obj)) {
-
- SequencePropertyBean specVersionBean = sequencePropertyMgr.retrieve(internalSequenceId,
- Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
- if (specVersionBean == null)
- throw new SandeshaException(SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.specVersionNotSet));
-
- String specVersion = specVersionBean.getValue();
- if (SpecSpecificConstants.isLastMessageIndicatorRequired(specVersion))
- sequence.setLastMessage(new LastMessage(rmNamespaceValue));
- }
- }
- }
-
- AckRequested ackRequested = null;
-
- boolean addAckRequested = false;
- // if (!lastMessage)
- // addAckRequested = true; //TODO decide the policy to add the
- // ackRequested tag
-
- // setting the Sequnece id.
- // Set send = true/false depending on the availability of the out
- // sequence id.
- String identifierStr = null;
- if (outSequenceBean == null || outSequenceBean.getValue() == null) {
- identifierStr = Sandesha2Constants.TEMP_SEQUENCE_ID;
-
- } else {
- identifierStr = (String) outSequenceBean.getValue();
- }
-
- Identifier id1 = new Identifier(rmNamespaceValue);
- id1.setIndentifer(identifierStr);
- sequence.setIdentifier(id1);
- rmMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, sequence);
-
- if (addAckRequested) {
- ackRequested = new AckRequested(rmNamespaceValue);
- Identifier id2 = new Identifier(rmNamespaceValue);
- id2.setIndentifer(identifierStr);
- ackRequested.setIdentifier(id2);
- rmMsg.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST, ackRequested);
- }
-
- try {
- rmMsg.addSOAPEnvelope();
- } catch (AxisFault e1) {
- throw new SandeshaException(e1.getMessage());
- }
-
- // Retransmitter bean entry for the application message
- SenderBean appMsgEntry = new SenderBean();
-
- appMsgEntry.setMessageContextRefKey(storageKey);
-
- appMsgEntry.setTimeToSend(System.currentTimeMillis());
- appMsgEntry.setMessageID(rmMsg.getMessageId());
- appMsgEntry.setMessageNumber(messageNumber);
- appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
- if (outSequenceBean == null || outSequenceBean.getValue() == null) {
- appMsgEntry.setSend(false);
- } else {
- appMsgEntry.setSend(true);
- // Send will be set to true at the sender.
- msg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
- }
-
- appMsgEntry.setInternalSequenceID(internalSequenceId);
- storageManager.storeMessageContext(storageKey, msg);
- retransmitterMgr.insert(appMsgEntry);
- msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- // changing the sender. This will set send to true.
- TransportSender sender = msg.getTransportOut().getSender();
-
- if (sender != null) {
- Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
- msg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, storageKey);
- msg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC, msg.getTransportOut());
- msg.setTransportOut(sandesha2TransportOutDesc);
-
- }
-
- // increasing the current handler index, so that the message will not be
- // going throught the SandeshaOutHandler again.
- msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
-
- // sending the message through, other handlers and the
- // Sandesha2TransportSender so that it get dumped to the storage.
- AxisEngine engine = new AxisEngine(msg.getConfigurationContext());
- try {
- engine.resumeSend(msg);
- } catch (AxisFault e) {
- throw new SandeshaException(e);
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
- }
-
- private long getPreviousMsgNo(ConfigurationContext context, String sequencePropertyKey, StorageManager storageManager)
- throws SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter: ApplicationMsgProcessor::getPreviousMsgNo, " + sequencePropertyKey);
-
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
- SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
-
- long nextMsgNo = -1;
- if (nextMsgNoBean != null) {
- Long nextMsgNoLng = new Long(nextMsgNoBean.getValue());
- nextMsgNo = nextMsgNoLng.longValue();
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::getPreviousMsgNo, " + nextMsgNo);
-
- return nextMsgNo;
- }
-
- private void setNextMsgNo(ConfigurationContext context, String sequencePropertyKey, long msgNo,
- StorageManager storageManager) throws SandeshaException {
-
- if (log.isDebugEnabled())
- log.debug("Enter: ApplicationMsgProcessor::setNextMsgNo, " + sequencePropertyKey + ", " + msgNo);
-
- if (msgNo <= 0) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberMustBeLargerThanZero, Long
- .toString(msgNo));
- throw new SandeshaException(message);
- }
-
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
- SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
-
- boolean update = true;
- if (nextMsgNoBean == null) {
- update = false;
- nextMsgNoBean = new SequencePropertyBean();
- nextMsgNoBean.setSequencePropertyKey(sequencePropertyKey);
- nextMsgNoBean.setName(Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
- }
-
- nextMsgNoBean.setValue(new Long(msgNo).toString());
- if (update)
- seqPropMgr.update(nextMsgNoBean);
- else
- seqPropMgr.insert(nextMsgNoBean);
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::setNextMsgNo");
- }
-
- private boolean isWSAAnonymous (String address) {
- if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
- AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address))
- return true;
-
- return false;
- }
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.ArrayList;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.context.ServiceContext;
+import org.apache.axis2.description.AxisOperationFactory;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CreateSequence;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.LastMessage;
+import org.apache.sandesha2.wsrm.MessageNumber;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.SequenceOffer;
+import org.apache.sandesha2.wsrm.UsesSequenceSTR;
+
+/**
+ * Responsible for processing an incoming Application message.
+ */
+
+public class ApplicationMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(ApplicationMsgProcessor.class);
+
+ public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: ApplicationMsgProcessor::processInMessage");
+
+ // Processing the application message.
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+ if (msgCtx == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgContextNotSetInbound);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+ && rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
+ return;
+ }
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceId = sequence.getIdentifier().getIdentifier();
+
+ String propertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+
+ // Check that both the Sequence header and message body have been secured properly
+ SequencePropertyBean tokenBean = seqPropMgr.retrieve(propertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+ if(tokenBean != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+ OMElement body = msgCtx.getEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+
+ //TODO get the element from the SOAP Envelope
+// secManager.checkProofOfPossession(token, sequence.getOMElement(), msgCtx);
+
+ secManager.checkProofOfPossession(token, body, msgCtx);
+ }
+
+ //RM will not send sync responses. If sync acks are there this will be
+ // made true again later.
+ if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
+ rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,
+ Constants.VALUE_FALSE);
+ }
+
+ FaultManager faultManager = new FaultManager();
+ RMMsgContext faultMessageContext = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx, storageManager);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFault, e
+ .toString()));
+ }
+
+ msgCtx.pause();
+ return;
+ }
+
+ // setting acked msg no range
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ if (configCtx == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.send(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFault, e
+ .toString()));
+ }
+
+ msgCtx.pause();
+ return;
+ }
+
+ // setting mustUnderstand to false.
+ sequence.setMustUnderstand(false);
+ rmMsgCtx.addSOAPEnvelope();
+
+ // throwing a fault if the sequence is closed.
+ faultMessageContext = faultManager.checkForSequenceClosed(rmMsgCtx, sequenceId, storageManager);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFault, e
+ .toString()));
+ }
+
+ return;
+ }
+
+ // updating the last activated time of the sequence.
+ SequenceManager.updateLastActivatedTime(propertyKey, storageManager);
+
+ SequencePropertyBean msgsBean = seqPropMgr.retrieve(propertyKey,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+
+ long msgNo = sequence.getMessageNumber().getMessageNumber();
+ if (msgNo == 0) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
+ .toString(msgNo));
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String key = SandeshaUtil.getUUID(); // key to store the message.
+
+ // updating the Highest_In_Msg_No property which gives the highest
+ // message number retrieved from this sequence.
+ String highetsInMsgNoStr = SandeshaUtil.getSequenceProperty(propertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
+ String highetsInMsgKey = SandeshaUtil.getSequenceProperty(propertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
+ if (highetsInMsgKey == null)
+ highetsInMsgKey = SandeshaUtil.getUUID();
+
+ long highestInMsgNo = 0;
+ if (highetsInMsgNoStr != null) {
+ highestInMsgNo = Long.parseLong(highetsInMsgNoStr);
+ }
+
+ if (msgNo > highestInMsgNo) {
+ highestInMsgNo = msgNo;
+
+ String str = new Long(msgNo).toString();
+ SequencePropertyBean highestMsgNoBean = new SequencePropertyBean(propertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, str);
+ SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean(propertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, highetsInMsgKey);
+
+ // storing the new message as the highest in message.
+ storageManager.removeMessageContext(highetsInMsgKey);
+ storageManager.storeMessageContext(highetsInMsgKey, msgCtx);
+
+ if (highetsInMsgNoStr != null) {
+ seqPropMgr.update(highestMsgNoBean);
+ seqPropMgr.update(highestMsgKeyBean);
+ } else {
+ seqPropMgr.insert(highestMsgNoBean);
+ seqPropMgr.insert(highestMsgKeyBean);
+ }
+ }
+
+ String messagesStr = "";
+ if (msgsBean != null)
+ messagesStr = (String) msgsBean.getValue();
+ else {
+ msgsBean = new SequencePropertyBean();
+ msgsBean.setSequencePropertyKey(propertyKey);
+ msgsBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+ msgsBean.setValue(messagesStr);
+ }
+
+ if (msgNoPresentInList(messagesStr, msgNo)
+ && (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
+ // this is a duplicate message and the invocation type is
+ // EXACTLY_ONCE.
+ rmMsgCtx.pause();
+ }
+
+ if (messagesStr != "" && messagesStr != null)
+ messagesStr = messagesStr + "," + Long.toString(msgNo);
+ else
+ messagesStr = Long.toString(msgNo);
+
+ msgsBean.setValue(messagesStr);
+ seqPropMgr.update(msgsBean);
+
+ // Pause the messages bean if not the right message to invoke.
+ NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
+ NextMsgBean bean = mgr.retrieve(sequenceId);
+
+ if (bean == null) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotFindSequence,
+ sequenceId));
+ }
+
+ InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
+
+ // inorder invocation is still a global property
+ boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
+ msgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
+
+
+ //setting properties for the messageContext
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo));
+
+ if (inOrderInvocation) {
+
+ SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr.retrieve(
+ Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+ Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (incomingSequenceListBean == null) {
+ ArrayList incomingSequenceList = new ArrayList();
+ incomingSequenceListBean = new SequencePropertyBean();
+ incomingSequenceListBean.setSequencePropertyKey(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
+ incomingSequenceListBean.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+ incomingSequenceListBean.setValue(incomingSequenceList.toString());
+
+ // this get inserted before
+ seqPropMgr.insert(incomingSequenceListBean);
+ }
+
+ ArrayList incomingSequenceList = SandeshaUtil.getArrayListFromString(incomingSequenceListBean.getValue());
+
+ // Adding current sequence to the incoming sequence List.
+ if (!incomingSequenceList.contains(sequenceId)) {
+ incomingSequenceList.add(sequenceId);
+
+ // saving the property.
+ incomingSequenceListBean.setValue(incomingSequenceList.toString());
+ seqPropMgr.update(incomingSequenceListBean);
+ }
+
+ // saving the message.
+ try {
+ storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
+ storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
+
+ // This will avoid performing application processing more
+ // than
+ // once.
+ rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ } catch (Exception ex) {
+ throw new SandeshaException(ex.getMessage());
+ }
+
+ // pause the message
+ rmMsgCtx.pause();
+
+ // Starting the invoker if stopped.
+ SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), sequenceId);
+
+ }
+
+ // Sending acknowledgements
+ sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: ApplicationMsgProcessor::processInMessage");
+ }
+
+ // TODO convert following from INT to LONG
+ private boolean msgNoPresentInList(String list, long no) {
+ String[] msgStrs = list.split(",");
+
+ int l = msgStrs.length;
+
+ for (int i = 0; i < l; i++) {
+ if (msgStrs[i].equals(Long.toString(no)))
+ return true;
+ }
+
+ return false;
+ }
+
+ public void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager)
+ throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: ApplicationMsgProcessor::sendAckIfNeeded");
+
+ String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceId = sequence.getIdentifier().getIdentifier();
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ if (configCtx == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
+
+ RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey ,sequenceId, storageManager);
+
+ AxisEngine engine = new AxisEngine(configCtx);
+
+ try {
+ engine.send(ackRMMessage.getMessageContext());
+ } catch (AxisFault e) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendAck, sequenceId, e
+ .toString());
+ throw new SandeshaException(message, e);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: ApplicationMsgProcessor::sendAckIfNeeded");
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
+
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
+ ConfigurationContext configContext = msgContext.getConfigurationContext();
+
+ // setting the Fault callback
+ SandeshaListener faultCallback = (SandeshaListener) msgContext.getOptions().getProperty(
+ SandeshaClientConstants.SANDESHA_LISTENER);
+ if (faultCallback != null) {
+ OperationContext operationContext = msgContext.getOperationContext();
+ if (operationContext != null) {
+ operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER, faultCallback);
+ }
+ }
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext
+ .getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+
+ boolean serverSide = msgContext.isServerSide();
+
+ // setting message Id if null
+ if (msgContext.getMessageID() == null)
+ msgContext.setMessageID(SandeshaUtil.getUUID());
+
+ // find internal sequence id
+ String internalSequenceId = null;
+
+ String storageKey = SandeshaUtil.getUUID(); // the key which will be
+ // used to store this
+ // message.
+
+ /*
+ * Internal sequence id is the one used to refer to the sequence (since
+ * actual sequence id is not available when first msg arrives) server
+ * side - a derivation of the sequenceId of the incoming sequence client
+ * side - a derivation of wsaTo & SeequenceKey
+ */
+
+ boolean lastMessage = false;
+ if (serverSide) {
+ // getting the request message and rmMessage.
+ MessageContext reqMsgCtx;
+ try {
+ reqMsgCtx = msgContext.getOperationContext().getMessageContext(
+ OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException(e);
+ }
+
+ RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(reqMsgCtx);
+
+ Sequence reqSequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (reqSequence == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String incomingSeqId = reqSequence.getIdentifier().getIdentifier();
+ if (incomingSeqId == null || incomingSeqId == "") {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.incomingSequenceNotValidID, "''"
+ + incomingSeqId + "''");
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ long requestMsgNo = reqSequence.getMessageNumber().getMessageNumber();
+
+ internalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(incomingSeqId);
+
+ // deciding weather the last message.
+ String requestLastMsgNoStr = SandeshaUtil.getSequenceProperty(incomingSeqId,
+ Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO, storageManager);
+ if (requestLastMsgNoStr != null) {
+ long requestLastMsgNo = Long.parseLong(requestLastMsgNoStr);
+ if (requestLastMsgNo == requestMsgNo)
+ lastMessage = true;
+ }
+
+ } else {
+ // set the internal sequence id for the client side.
+ EndpointReference toEPR = msgContext.getTo();
+ if (toEPR == null || toEPR.getAddress() == null || "".equals(toEPR.getAddress())) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String to = toEPR.getAddress();
+ String sequenceKey = (String) msgContext.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ internalSequenceId = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+
+ String lastAppMessage = (String) msgContext.getProperty(SandeshaClientConstants.LAST_MESSAGE);
+ if (lastAppMessage != null && "true".equals(lastAppMessage))
+ lastMessage = true;
+ }
+
+ if (internalSequenceId!=null)
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
+
+ String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+
+ /*
+ * checking weather the user has given the messageNumber (most of the
+ * cases this will not be the case where the system will generate the
+ * message numbers
+ */
+
+ // User should set it as a long object.
+ Long messageNumberLng = (Long) msgContext.getProperty(SandeshaClientConstants.MESSAGE_NUMBER);
+
+ long givenMessageNumber = -1;
+ if (messageNumberLng != null) {
+ givenMessageNumber = messageNumberLng.longValue();
+ if (givenMessageNumber <= 0) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.msgNumberMustBeLargerThanZero, Long.toString(givenMessageNumber)));
+ }
+ }
+
+ // the message number that was last used.
+ long systemMessageNumber = getPreviousMsgNo(configContext, sequencePropertyKey, storageManager);
+
+ // The number given by the user has to be larger than the last stored
+ // number.
+ if (givenMessageNumber > 0 && givenMessageNumber <= systemMessageNumber) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberNotLargerThanLastMsg, Long
+ .toString(givenMessageNumber));
+ throw new SandeshaException(message);
+ }
+
+ // Finding the correct message number.
+ long messageNumber = -1;
+ if (givenMessageNumber > 0) // if given message number is valid use it.
+ // (this is larger than the last stored due
+ // to the last check)
+ messageNumber = givenMessageNumber;
+ else if (systemMessageNumber > 0) { // if system message number is valid
+ // use it.
+ messageNumber = systemMessageNumber + 1;
+ } else { // This is the first message (systemMessageNumber = -1)
+ messageNumber = 1;
+ }
+
+ // A dummy message is a one which will not be processed as a actual
+ // application message.
+ // The RM handlers will simply let these go.
+ String dummyMessageString = (String) msgContext.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
+ boolean dummyMessage = false;
+ if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
+ dummyMessage = true;
+
+ // saving the used message number
+ if (!dummyMessage)
+ setNextMsgNo(configContext, sequencePropertyKey, messageNumber, storageManager);
+
+ // set this as the response highest message.
+ SequencePropertyBean responseHighestMsgBean = new SequencePropertyBean(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_NUMBER, new Long(messageNumber).toString());
+ seqPropMgr.insert(responseHighestMsgBean);
+
+ if (lastMessage) {
+
+ SequencePropertyBean responseHighestMsgKeyBean = new SequencePropertyBean(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_KEY, storageKey);
+
+ SequencePropertyBean responseLastMsgKeyBean = new SequencePropertyBean(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO, new Long(messageNumber).toString());
+
+ seqPropMgr.insert(responseHighestMsgKeyBean);
+ seqPropMgr.insert(responseLastMsgKeyBean);
+ }
+
+ boolean sendCreateSequence = false;
+
+ SequencePropertyBean outSeqBean = seqPropMgr.retrieve(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+
+ // setting async ack endpoint for the server side. (if present)
+ if (serverSide) {
+// String incomingSequenceID = SandeshaUtil.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
+ SequencePropertyBean incomingToBean = seqPropMgr.retrieve(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.TO_EPR);
+ if (incomingToBean != null) {
+ String incomingTo = incomingToBean.getValue();
+ msgContext.setProperty(SandeshaClientConstants.AcksTo, incomingTo);
+ }
+ }
+
+ // FINDING THE SPEC VERSION
+ String specVersion = null;
+ if (msgContext.isServerSide()) {
+ // in the server side, get the RM version from the request sequence.
+ MessageContext requestMessageContext;
+ try {
+ requestMessageContext = msgContext.getOperationContext().getMessageContext(
+ AxisOperationFactory.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException(e);
+ }
+
+ if (requestMessageContext == null) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.requestMsgContextNull));
+ }
+
+ RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(requestMessageContext);
+
+ Sequence sequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String requestSequenceID = sequence.getIdentifier().getIdentifier();
+
+ String requestSideSequencePropertyKey = SandeshaUtil.getSequencePropertyKey(requestRMMsgCtx);
+
+ SequencePropertyBean specVersionBean = seqPropMgr.retrieve(requestSideSequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+ if (specVersionBean == null) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.specVersionPropertyNotAvailable, requestSequenceID));
+ }
+
+ specVersion = specVersionBean.getValue();
+ } else {
+ // in the client side, user will set the RM version.
+ specVersion = (String) msgContext.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
+ }
+
+ if (specVersion == null)
+ specVersion = SpecSpecificConstants.getDefaultSpecVersion(); // TODO
+ // change
+ // the
+ // default
+ // to
+ // v1_1.
+
+ if (messageNumber == 1) {
+ if (outSeqBean == null) { // out sequence will be set for the
+ // server side, in the case of an offer.
+ sendCreateSequence = true; // message number being one and not
+ // having an out sequence, implies
+ // that a create sequence has to be
+ // send.
+ }
+
+ // if first message - setup the sending side sequence - both for the
+ // server and the client sides
+ SequenceManager.setupNewClientSequence(msgContext, sequencePropertyKey, specVersion, storageManager);
+ }
+
+ ServiceContext serviceContext = msgContext.getServiceContext();
+ OperationContext operationContext = msgContext.getOperationContext();
+
+ // SENDING THE CREATE SEQUENCE.
+ EndpointReference acksToEPR = new EndpointReference(null); //use this to hold the acksTo EPR
+ if (sendCreateSequence) {
+ SequencePropertyBean responseCreateSeqAdded = seqPropMgr.retrieve(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT);
+
+ String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
+ String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
+
+ if (responseCreateSeqAdded == null) {
+ responseCreateSeqAdded = new SequencePropertyBean(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT, "true");
+ seqPropMgr.insert(responseCreateSeqAdded);
+
+ if (serviceContext != null)
+ acksToEPR.setAddress((String) msgContext.getProperty(SandeshaClientConstants.AcksTo));
+
+ if (msgContext.isServerSide()) {
+ // we do not set acksTo value to anonymous when the create
+ // sequence is send from the server.
+ MessageContext requestMessage;
+ try {
+ requestMessage = operationContext
+ .getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException(e);
+ }
+
+ if (requestMessage == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.requestMsgNotPresent);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+ acksToEPR = requestMessage.getTo();
+
+ } else {
+ if (acksToEPR.getAddress() == null){
+ EndpointReference replyToEPR = msgContext.getReplyTo();
+ if(Boolean.getBoolean((String)operationContext.getProperty(SandeshaClientConstants.USE_REPLY_TO_AS_ACKS_TO))
+ && replyToEPR!=null && !replyToEPR.getAddress().equals("")){
+ //use the replyTo address as acksTo
+ if (log.isDebugEnabled())
+ log.debug("Using replyTo " + replyToEPR + " EPR as AcksTo, addr=" + acksToEPR.getAddress());
+ acksToEPR = replyToEPR;
+ }
+ else{
+ acksToEPR.setAddress(anonymousURI);
+ }
+ }
+ }
+
+ if (acksToEPR.getAddress()!=null && !anonymousURI.equals(acksToEPR.getAddress()) && !serverSide) {
+ String transportIn = (String) configContext // TODO verify
+ .getProperty(MessageContext.TRANSPORT_IN);
+ if (transportIn == null)
+ transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
+ } else if (acksToEPR.getAddress() == null && serverSide) {
+// String incomingSequencId = SandeshaUtil
+// .getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
+
+ try {
+ MessageContext requestMsgContext = operationContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ RMMsgContext requestRMMsgContext = MsgInitializer.initializeMessage(requestMsgContext);
+
+ String requestSideSequencePropertyKey = SandeshaUtil.getSequencePropertyKey(requestRMMsgContext);
+
+ SequencePropertyBean bean = seqPropMgr.retrieve(requestSideSequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
+ if (bean != null) {
+ String beanAcksToValue = bean.getValue();
+ if (beanAcksToValue != null)
+ acksToEPR.setAddress(beanAcksToValue);
+ }
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+ } else if (anonymousURI.equals(acksToEPR.getAddress())) {
+ // set transport in.
+ Object trIn = msgContext.getProperty(MessageContext.TRANSPORT_IN);
+ if (trIn == null) {
+ // TODO
+ }
+ }
+ addCreateSequenceMessage(rmMsgCtx, sequencePropertyKey ,internalSequenceId, acksToEPR, storageManager);
+ }
+ }
+
+ SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
+ if (env == null) {
+ SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(env))
+ .getDefaultEnvelope();
+ rmMsgCtx.setSOAPEnvelop(envelope);
+ }
+
+ SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
+ if (soapBody == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapBodyNotPresent);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String messageId1 = SandeshaUtil.getUUID();
+ if (rmMsgCtx.getMessageId() == null) {
+ rmMsgCtx.setMessageId(messageId1);
+ }
+
+ if (serverSide) {
+ // let the request end with 202 if a ack has not been
+ // written in the incoming thread.
+
+ MessageContext reqMsgCtx = null;
+ try {
+ reqMsgCtx = msgContext.getOperationContext().getMessageContext(
+ OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ } catch (AxisFault e) {
+ throw new SandeshaException(e);
+ }
+
+ if (reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN) == null
+ || !"true".equals(reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN)))
+ reqMsgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ }
+
+ EndpointReference toEPR = msgContext.getTo();
+ if (toEPR == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ // setting default actions.
+ String to = toEPR.getAddress();
+ String operationName = msgContext.getOperationContext().getAxisOperation().getName().getLocalPart();
+ if (msgContext.getWSAAction() == null) {
+ msgContext.setWSAAction(to + "/" + operationName);
+ }
+ if (msgContext.getSoapAction() == null) {
+ msgContext.setSoapAction("\"" + to + "/" + operationName + "\"");
+ }
+
+ // processing the response if not an dummy.
+ if (!dummyMessage)
+ processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber, storageKey, storageManager);
+
+
+ if (!isWSAAnonymous (to)) {
+ //If message has a real to address or if it is in the polling-mode it shoud be send by the sender or should
+ //be taken away by make connections, so pausing it.
+
+ msgContext.pause(); // the execution will be stopped.
+ }
+
+ //If to address is wsa:anonymous it wont be possible to send the this message so, letting it go in the current thread.
+ //(it might get the the other end in the back-channel of the request message, no retransmissions possible).
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: ApplicationMsgProcessor::processOutMessage");
+ }
+
+ private void addCreateSequenceMessage(RMMsgContext applicationRMMsg, String sequencePropertyKey, String internalSequenceId, EndpointReference acksTo,
+ StorageManager storageManager) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + internalSequenceId);
+
+ MessageContext applicationMsg = applicationRMMsg.getMessageContext();
+ ConfigurationContext configCtx = applicationMsg.getConfigurationContext();
+
+ // generating a new create sequeuce message.
+ RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(applicationRMMsg, sequencePropertyKey, acksTo,
+ storageManager);
+
+ createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
+ CreateSequence createSequencePart = (CreateSequence) createSeqRMMessage
+ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+ CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
+ SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+ SequenceOffer offer = createSequencePart.getSequenceOffer();
+ if (offer != null) {
+ String offeredSequenceId = offer.getIdentifer().getIdentifier();
+
+ SequencePropertyBean offeredSequenceBean = new SequencePropertyBean();
+ offeredSequenceBean.setName(Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
+ offeredSequenceBean.setSequencePropertyKey(sequencePropertyKey);
+ offeredSequenceBean.setValue(offeredSequenceId);
+
+ seqPropMgr.insert(offeredSequenceBean);
+ }
+
+ MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
+ createSeqMsg.setRelationships(null); // create seq msg does not
+ // relateTo anything
+
+ String createSequenceMessageStoreKey = SandeshaUtil.getUUID(); // the key taht will be used to store
+ //the create sequence message.
+
+ CreateSeqBean createSeqBean = new CreateSeqBean();
+ createSeqBean.setInternalSequenceID(internalSequenceId);
+ createSeqBean.setCreateSeqMsgID(createSeqMsg.getMessageID());
+ createSeqBean.setCreateSequenceMsgStoreKey(createSequenceMessageStoreKey);
+
+ //TODO set the replyTo of CreateSeq (and others) to Anymomous if Application Msgs hv it as Anonymous.
+
+ //checking weather the sequence is in polling mode.
+ boolean pollingMode = false;
+ EndpointReference replyTo = applicationRMMsg.getReplyTo();
+ if (replyTo!=null && SandeshaUtil.isWSRMAnonymousReplyTo(replyTo.getAddress()))
+ pollingMode = true;
+ else if (replyTo!=null && offer!=null &&
+ (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyTo.getAddress()) ||
+ AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyTo.getAddress())))
+ pollingMode = true;
+
+ createSeqBean.setPollingMode(pollingMode);
+
+ //if PollingMode is true, starting the pollingmanager.
+ if (pollingMode)
+ SandeshaUtil.startPollingManager(configCtx);
+
+ SecurityToken token = (SecurityToken) createSeqRMMessage.getProperty(Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+ if(token != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+ createSeqBean.setSecurityTokenData(secManager.getTokenRecoveryData(token));
+
+ // If we are using token based security, and the 1.1 spec level, then we
+ // should introduce a UsesSequenceSTR header into the message.
+ if(createSequencePart.getNamespaceValue().equals(Sandesha2Constants.SPEC_2006_08.NS_URI)) {
+ UsesSequenceSTR header = new UsesSequenceSTR(null, Sandesha2Constants.SPEC_2006_08.NS_URI);
+ header.toSOAPEnvelope(createSeqMsg.getEnvelope());
+ }
+ }
+
+ createSeqMgr.insert(createSeqBean);
+
+ String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
+ String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
+
+ if (createSeqMsg.getReplyTo() == null)
+ createSeqMsg.setReplyTo(new EndpointReference(anonymousURI));
+
+ SenderBean createSeqEntry = new SenderBean();
+ createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
+ createSeqEntry.setTimeToSend(System.currentTimeMillis());
+ createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
+ createSeqEntry.setInternalSequenceID(sequencePropertyKey);
+ // this will be set to true in the sender
+ createSeqEntry.setSend(true);
+
+ createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+ createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
+ retransmitterMgr.insert(createSeqEntry);
+
+ storageManager.storeMessageContext(createSequenceMessageStoreKey, createSeqMsg); // storing the
+ // message.
+
+ // message will be stored in the Sandesha2TransportSender
+ createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, createSequenceMessageStoreKey);
+
+ TransportOutDescription transportOut = createSeqMsg.getTransportOut();
+
+ createSeqMsg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC, transportOut);
+ createSeqMsg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+ createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, createSequenceMessageStoreKey);
+
+ Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
+ createSeqMsg.setTransportOut(sandesha2TransportOutDesc);
+
+ // sending the message once through Sandesha2TransportSender.
+ AxisEngine engine = new AxisEngine(createSeqMsg.getConfigurationContext());
+ try {
+ engine.resumeSend(createSeqMsg);
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage");
+ }
+
+ private void processResponseMessage(RMMsgContext rmMsg, String internalSequenceId, long messageNumber,
+ String storageKey, StorageManager storageManager) throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId);
+
+ MessageContext msg = rmMsg.getMessageContext();
+
+ SequencePropertyBeanMgr sequencePropertyMgr = storageManager.getSequencePropertyBeanMgr();
+ SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+ SequencePropertyBean toBean = sequencePropertyMgr.retrieve(internalSequenceId,
+ Sandesha2Constants.SequenceProperties.TO_EPR);
+ SequencePropertyBean replyToBean = sequencePropertyMgr.retrieve(internalSequenceId,
+ Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
+
+ // again - looks weird in the client side - but consistent
+ SequencePropertyBean outSequenceBean = sequencePropertyMgr.retrieve(internalSequenceId,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+
+ if (toBean == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ EndpointReference toEPR = new EndpointReference(toBean.getValue());
+
+ EndpointReference replyToEPR = null;
+ if (replyToBean != null) {
+ replyToEPR = new EndpointReference(replyToBean.getValue());
+ }
+
+ if (toEPR == null || toEPR.getAddress() == null || toEPR.getAddress() == "") {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String newToStr = null;
[... 229 lines stripped ...]
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org