You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/04/23 11:55:16 UTC
svn commit: r531400 [4/18] - in /webservices/sandesha/trunk/java/modules:
client/ core/ core/src/ core/src/main/ core/src/main/java/
core/src/main/java/org/ core/src/main/java/org/apache/
core/src/main/java/org/apache/sandesha2/ core/src/main/java/org/...
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,358 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Collection;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.CreateSequence;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+import org.apache.sandesha2.wsrm.Endpoint;
+import org.apache.sandesha2.wsrm.SequenceOffer;
+
+/**
+ * Responsible for processing an incoming Create Sequence message.
+ */
+
+public class CreateSeqMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(CreateSeqMsgProcessor.class);
+
+ public boolean processInMessage(RMMsgContext createSeqRMMsg) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: CreateSeqMsgProcessor::processInMessage");
+
+ try {
+ CreateSequence createSeqPart = (CreateSequence) createSeqRMMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+ if (createSeqPart == null) {
+ if (log.isDebugEnabled())
+ log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqParts));
+ FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg,
+ SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqParts),
+ new Exception());
+ // Return false if an Exception hasn't been thrown.
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);
+ return false;
+
+ }
+
+ MessageContext createSeqMsg = createSeqRMMsg.getMessageContext();
+ ConfigurationContext context = createSeqMsg.getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+
+ // If the inbound CreateSequence includes a SecurityTokenReference then
+ // ask the security manager to resolve that to a token for us. We also
+ // check that the Create was secured using the token.
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+ OMElement theSTR = createSeqPart.getSecurityTokenReference();
+ SecurityToken token = null;
+ if(theSTR != null) {
+ MessageContext msgcontext = createSeqRMMsg.getMessageContext();
+ token = secManager.getSecurityToken(theSTR, msgcontext);
+
+ // The create must be the body part of this message, so we check the
+ // security of that element.
+ OMElement body = msgcontext.getEnvelope().getBody();
+ secManager.checkProofOfPossession(token, body, msgcontext);
+ }
+
+ //if toAddress is RMAnon we may need to terminate the request side sequence here.
+ EndpointReference toEPR = createSeqMsg.getTo();
+ if (toEPR.hasAnonymousAddress()) {
+
+ RMSBean findBean = new RMSBean ();
+ findBean.setReplyToEPR(toEPR.getAddress());
+ findBean.setTerminationPauserForCS(true);
+
+ //TODO recheck
+ RMSBean rmsBean = storageManager.getRMSBeanMgr().findUnique(findBean);
+ if (rmsBean!=null) {
+ //AckManager hs not done the termination. Do the termination here.
+ MessageContext requestSideRefMessage = storageManager.retrieveMessageContext(rmsBean.getReferenceMessageStoreKey(),context);
+ if (requestSideRefMessage==null) {
+ FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg,
+ SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getInternalSequenceID()),
+ new Exception());
+ // Return false if an Exception hasn't been thrown.
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);
+ return false;
+ }
+
+ RMMsgContext requestSideRefRMMessage = MsgInitializer.initializeMessage(requestSideRefMessage);
+ TerminateManager.addTerminateSequenceMessage(requestSideRefRMMessage, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+ }
+ }
+
+
+ MessageContext outMessage = null;
+
+ // Create the new sequence id, as well as establishing the beans that handle the
+ // sequence state.
+ RMDBean rmdBean = SequenceManager.setupNewSequence(createSeqRMMsg, storageManager, secManager, token);
+
+ RMMsgContext createSeqResponse = RMMsgCreator.createCreateSeqResponseMsg(createSeqRMMsg, rmdBean);
+ outMessage = createSeqResponse.getMessageContext();
+ // Set a message ID for this Create Sequence Response message
+ outMessage.setMessageID(SandeshaUtil.getUUID());
+
+ createSeqResponse.setFlow(MessageContext.OUT_FLOW);
+
+ // for making sure that this won't be processed again
+ createSeqResponse.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ CreateSequenceResponse createSeqResPart = (CreateSequenceResponse) createSeqResponse
+ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+
+ // OFFER PROCESSING
+ SequenceOffer offer = createSeqPart.getSequenceOffer();
+ if (offer != null) {
+ Accept accept = createSeqResPart.getAccept();
+ if (accept == null) {
+ if (log.isDebugEnabled())
+ log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAcceptPart));
+ FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg,
+ SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAcceptPart),
+ new Exception());
+ // Return false if an Exception hasn't been thrown.
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);
+ return false;
+ }
+
+ // offered seq id
+ String offeredSequenceID = offer.getIdentifer().getIdentifier();
+
+ boolean offerEcepted = offerAccepted(offeredSequenceID, context, createSeqRMMsg, storageManager);
+
+ if (offerEcepted) {
+ // Setting the CreateSequence table entry for the outgoing
+ // side.
+ RMSBean rMSBean = new RMSBean();
+ rMSBean.setSequenceID(offeredSequenceID);
+ String outgoingSideInternalSequenceId = SandeshaUtil
+ .getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+ rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
+ // this is a dummy value
+ rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID());
+
+ rMSBean.setToEPR(rmdBean.getToEPR());
+ rMSBean.setAcksToEPR(rmdBean.getToEPR()); // The acks need to flow back into this endpoint
+ rMSBean.setReplyToEPR(rmdBean.getReplyToEPR());
+ rMSBean.setLastActivatedTime(System.currentTimeMillis());
+ rMSBean.setRMVersion(rmdBean.getRMVersion());
+ rMSBean.setClientCompletedMessages(new RangeString());
+
+ // Setting sequence properties for the outgoing sequence.
+ // Only will be used by the server side response path. Will
+ // be wasted properties for the client side.
+
+ Endpoint endpoint = offer.getEndpoint();
+ if (endpoint!=null) {
+ // setting the OfferedEndpoint
+ rMSBean.setOfferedEndPoint(endpoint.getEPR().getAddress());
+ }
+
+
+ // Store the inbound token (if any) with the new sequence
+ rMSBean.setSecurityTokenData(rmdBean.getSecurityTokenData());
+
+ // If this new sequence has anonymous acksTo, then we must poll for the acks
+ // If the inbound sequence is targetted at the WSRM anonymous URI, we need to start
+ // polling for this sequence.
+ String acksTo = rMSBean.getAcksToEPR();
+ EndpointReference reference = new EndpointReference(acksTo);
+ if ((acksTo == null || reference.hasAnonymousAddress()) &&
+ Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqRMMsg.getRMSpecVersion())) {
+ rMSBean.setPollingMode(true);
+ }
+
+ // Set the SOAP Version for this sequence.
+ rMSBean.setSoapVersion(SandeshaUtil.getSOAPVersion(createSeqRMMsg.getSOAPEnvelope()));
+
+ storageManager.getRMSBeanMgr().insert(rMSBean);
+
+ SandeshaUtil.startWorkersForSequence(context, rMSBean);
+
+ } else {
+ // removing the accept part.
+ createSeqResPart.setAccept(null);
+ createSeqResponse.addSOAPEnvelope();
+ }
+ }
+
+ //TODO add createSequenceResponse message as the referenceMessage to the RMDBean.
+
+ outMessage.setResponseWritten(true);
+
+ rmdBean.setLastActivatedTime(System.currentTimeMillis());
+
+ // If the inbound sequence is targetted at the anonymous URI, we need to start
+ // polling for this sequence.
+ if (toEPR.hasAnonymousAddress()) {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqRMMsg.getRMSpecVersion())) {
+ rmdBean.setPollingMode(true);
+ }
+ }
+
+ storageManager.getRMDBeanMgr().update(rmdBean);
+
+ SandeshaUtil.startWorkersForSequence(context, rmdBean);
+
+ AxisEngine engine = new AxisEngine(context);
+ try{
+ engine.send(outMessage);
+ }
+ catch(AxisFault e){
+ FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg,
+ SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendCreateSeqResponse,
+ SandeshaUtil.getStackTraceFromException(e)), e);
+ // Return false if an Exception hasn't been thrown.
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);
+ return false;
+ }
+
+ EndpointReference replyTo = createSeqMsg.getReplyTo();
+ if(replyTo == null || replyTo.hasAnonymousAddress()) {
+ createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+ } else {
+ createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ }
+
+ // SequencePropertyBean findBean = new SequencePropertyBean ();
+ // findBean.setName (Sandesha2Constants.SequenceProperties.TERMINATE_ON_CREATE_SEQUENCE);
+ // findBean.setValue(createSeqMsg.getTo().getAddress());
+
+ createSeqRMMsg.pause();
+ }
+ catch (Exception e) {
+ if (log.isDebugEnabled())
+ log.debug("Caught an exception processing CreateSequence message", e);
+ // Does the message context already contain a fault ?
+ // If it doesn't then we can add the CreateSequenceRefusedFault.
+ if (createSeqRMMsg.getMessageContext().getProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME) == null &&
+ createSeqRMMsg.getMessageContext().getProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME) == null) {
+ // Add the fault details to the message
+ FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg, SandeshaUtil.getStackTraceFromException(e), e);
+
+ // Return false if an Exception hasn't been thrown.
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);
+ return false;
+ }
+
+ // If we are SOAP12 and we have already processed the fault - rethrow the exception
+ if (createSeqRMMsg.getMessageContext().getProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME) != null) {
+ // throw the original exception
+ if (e instanceof AxisFault)
+ throw (AxisFault)e;
+
+ throw new SandeshaException(e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
+ return true;
+ }
+
+ private boolean offerAccepted(String sequenceId, ConfigurationContext configCtx, RMMsgContext createSeqRMMsg,
+ StorageManager storageManager) throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: CreateSeqMsgProcessor::offerAccepted, " + sequenceId);
+
+ if ("".equals(sequenceId)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
+ return false;
+ }
+
+ RMSBean createSeqFindBean = new RMSBean();
+ createSeqFindBean.setSequenceID(sequenceId);
+ Collection arr = storageManager.getRMSBeanMgr().find(createSeqFindBean);
+
+ if (arr.size() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
+ return false;
+ }
+ if (sequenceId.length() <= 1) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
+ return false; // Single character offers are NOT accepted.
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + true);
+ return true;
+ }
+
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ if (log.isDebugEnabled())
+ log.debug("Enter: CreateSeqMsgProcessor::processOutMessage");
+
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+
+ // adding the SANDESHA_LISTENER
+ SandeshaListener faultCallback = (SandeshaListener) msgCtx.getOptions().getProperty(
+ SandeshaClientConstants.SANDESHA_LISTENER);
+ if (faultCallback != null) {
+ OperationContext operationContext = msgCtx.getOperationContext();
+ if (operationContext != null) {
+ operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER, faultCallback);
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::processOutMessage " + Boolean.FALSE);
+ return false;
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,257 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+
+/**
+ * Responsible for processing an incoming Create Sequence Response message.
+ */
+
+public class CreateSeqResponseMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
+
+ public boolean processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
+
+ ConfigurationContext configCtx = createSeqResponseRMMsgCtx.getMessageContext().getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+ .getAxisConfiguration());
+
+ // Processing the create sequence response.
+
+ CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+ if (createSeqResponsePart == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqResponse);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String newOutSequenceId = createSeqResponsePart.getIdentifier().getIdentifier();
+ if (newOutSequenceId == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.newSeqIdIsNull);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ RelatesTo relatesTo = createSeqResponseRMMsgCtx.getMessageContext().getRelatesTo();
+ String createSeqMsgId = null;
+ if (relatesTo != null) {
+ createSeqMsgId = relatesTo.getValue();
+ } else {
+ // Work out the related message from the operation context
+ OperationContext context = createSeqResponseRMMsgCtx.getMessageContext().getOperationContext();
+ MessageContext createSeq = context.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+ if(createSeq != null) createSeqMsgId = createSeq.getMessageID();
+ }
+ if(createSeqMsgId == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
+ log.error(message);
+ throw new SandeshaException(message);
+ }
+
+ SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+ RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
+
+ RMSBean rmsBean = rmsBeanMgr.retrieve(createSeqMsgId);
+ if (rmsBean == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ // Check that the create sequence response message proves possession of the correct token
+ String tokenData = rmsBean.getSecurityTokenData();
+ if(tokenData != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+ MessageContext crtSeqResponseCtx = createSeqResponseRMMsgCtx.getMessageContext();
+ OMElement body = crtSeqResponseCtx.getEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(tokenData);
+ secManager.checkProofOfPossession(token, body, crtSeqResponseCtx);
+ }
+
+ String internalSequenceId = rmsBean.getInternalSequenceID();
+ if (internalSequenceId == null || "".equals(internalSequenceId)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.tempSeqIdNotSet);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+ createSeqResponseRMMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
+
+ rmsBean.setSequenceID(newOutSequenceId);
+
+ // We must poll for any reply-to that uses the anonymous URI. If it is a ws-a reply to then
+ // the create must include an offer (or this client cannot be identified). If the reply-to
+ // is the RM anon URI template then the offer is not required.
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
+ String acksTo = rmsBean.getAcksToEPR();
+ EndpointReference reference = new EndpointReference(acksTo);
+ if(acksTo == null || reference.hasAnonymousAddress()) {
+ rmsBean.setPollingMode(true);
+ }
+ }
+
+ SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
+ if (createSequenceSenderBean == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
+
+ // deleting the create sequence entry.
+ retransmitterMgr.delete(createSeqMsgId);
+
+ // processing for accept (offer has been sent)
+ Accept accept = createSeqResponsePart.getAccept();
+ if (accept != null) {
+
+ // TODO this should be detected in the Fault manager.
+ if (rmsBean.getOfferedSequence() == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ RMDBean rMDBean = new RMDBean();
+
+ EndpointReference acksToEPR = accept.getAcksTo().getEPR();
+ rMDBean.setAcksToEPR(acksToEPR.getAddress());
+ rMDBean.setSequenceID(rmsBean.getOfferedSequence());
+ rMDBean.setNextMsgNoToProcess(1);
+ rMDBean.setOutboundInternalSequence(rmsBean.getInternalSequenceID());
+
+ //Storing the referenceMessage of the sending side sequence as the reference message
+ //of the receiving side as well.
+ //This can be used when creating new outgoing messages.
+
+ String referenceMsgStoreKey = rmsBean.getReferenceMessageStoreKey();
+ MessageContext referenceMsg = storageManager.retrieveMessageContext(referenceMsgStoreKey, configCtx);
+
+ String newMessageStoreKey = SandeshaUtil.getUUID();
+ storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
+
+ rMDBean.setReferenceMessageKey(newMessageStoreKey);
+
+ // If this is an offered sequence that needs polling then we need to setup the
+ // rmdBean for polling too, so that it still gets serviced after the outbound
+ // sequence terminates.
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
+ if(rmsBean.isPollingMode()) {
+ rMDBean.setPollingMode(true);
+ }
+ }
+
+ String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
+ rMDBean.setRMVersion(rmSpecVersion);
+
+ EndpointReference toEPR = createSeqResponseRMMsgCtx.getTo();
+ if (toEPR==null) {
+ //Most probably this is a sync response message, using the replyTo of the request message
+ OperationContext operationContext = createSeqResponseRMMsgCtx.getMessageContext().getOperationContext();
+ if (operationContext!=null) {
+ MessageContext createSequnceMessage = operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+ if (createSequnceMessage!=null)
+ toEPR = createSequnceMessage.getReplyTo();
+ }
+ }
+
+ if (toEPR!=null)
+ rMDBean.setToAddress(toEPR.getAddress());
+
+ rMDBean.setServerCompletedMessages(new RangeString());
+ RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+
+ // Store the security token for the offered sequence
+ rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
+
+ rMDBean.setLastActivatedTime(System.currentTimeMillis());
+
+ rmdBeanMgr.insert(rMDBean);
+ SandeshaUtil.startWorkersForSequence(configCtx, rMDBean);
+ }
+
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
+ rmsBeanMgr.update(rmsBean);
+ SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+
+ // Locate and update all of the messages for this sequence, now that we know
+ // the sequence id.
+ SenderBean target = new SenderBean();
+ target.setInternalSequenceID(internalSequenceId);
+ target.setSend(false);
+
+ Iterator iterator = retransmitterMgr.find(target).iterator();
+ while (iterator.hasNext()) {
+ SenderBean tempBean = (SenderBean) iterator.next();
+
+ // asking to send the application msssage
+ tempBean.setSend(true);
+ tempBean.setSequenceID(newOutSequenceId);
+ retransmitterMgr.update(tempBean);
+ }
+
+ createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+
+ createSeqResponseRMMsgCtx.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
+ return true;
+ }
+
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
+ log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
+ }
+ return false;
+
+ }
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,192 @@
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.wsrm.Address;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.MakeConnection;
+import org.apache.sandesha2.wsrm.MessagePending;
+
+/**
+ * This class is responsible for processing MakeConnection request messages that come to the system.
+ * MakeConnection is only supported by WSRM 1.1
+ * Here a client can ask for reply messages using a polling mechanism, so even clients without real
+ * endpoints can ask for reliable response messages.
+ */
+public class MakeConnectionProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(MakeConnectionProcessor.class);
+
+ /**
+ * Prosesses incoming MakeConnection request messages.
+ * A message is selected by the set of SenderBeans that are waiting to be sent.
+ * This is processed using a SenderWorker.
+ */
+ public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::processInMessage " + rmMsgCtx.getSOAPEnvelope().getBody());
+
+ MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION);
+ Address address = makeConnection.getAddress();
+ Identifier identifier = makeConnection.getIdentifier();
+
+ ConfigurationContext configurationContext = rmMsgCtx.getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+
+ SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+
+ //selecting the set of SenderBeans that suit the given criteria.
+ SenderBean findSenderBean = new SenderBean ();
+ findSenderBean.setSend(true);
+ findSenderBean.setTransportAvailable(false);
+
+ if (address!=null)
+ findSenderBean.setToAddress(address.getAddress());
+
+ if (identifier!=null)
+ findSenderBean.setSequenceID(identifier.getIdentifier());
+
+ //finding the beans that go with the criteria of the passed SenderBean
+ //The reSend flag is ignored for this selection, so there is no need to
+ //set it.
+ Collection collection = senderBeanMgr.find(findSenderBean);
+
+ //removing beans that does not pass the resend test
+ for (Iterator it=collection.iterator();it.hasNext();) {
+ SenderBean bean = (SenderBean) it.next();
+ if (!bean.isReSend() && bean.getSentCount()>0)
+ it.remove();
+ }
+
+ //selecting a bean to send RANDOMLY. TODO- Should use a better mechanism.
+ int size = collection.size();
+ int itemToPick=-1;
+
+ boolean pending = false;
+ if (size>0) {
+ Random random = new Random ();
+ itemToPick = random.nextInt(size);
+ }
+
+ if (size>1)
+ pending = true; //there are more than one message to be delivered using the makeConnection.
+ //So the MessagePending header should have value true;
+
+ Iterator it = collection.iterator();
+
+ SenderBean senderBean = null;
+ for (int item=0;item<size;item++) {
+ senderBean = (SenderBean) it.next();
+ if (item==itemToPick)
+ break;
+ }
+
+ if (senderBean==null) {
+ if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message found");
+ return false;
+ }
+
+ replyToPoll(rmMsgCtx, senderBean, storageManager, pending, makeConnection.getNamespaceValue());
+
+ if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage");
+ return false;
+ }
+
+ public static void replyToPoll(RMMsgContext pollMessage,
+ SenderBean matchingMessage,
+ StorageManager storageManager,
+ boolean pending,
+ String namespace)
+ throws AxisFault
+ {
+ if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::replyToPoll");
+ TransportOutDescription transportOut = pollMessage.getMessageContext().getTransportOut();
+ if (transportOut==null) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.cantSendMakeConnectionNoTransportOut);
+ if(log.isDebugEnabled()) log.debug(message);
+ throw new SandeshaException (message);
+ }
+
+ String messageStorageKey = matchingMessage.getMessageContextRefKey();
+ MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey,pollMessage.getConfigurationContext());
+ if (returnMessage==null) {
+ String message = "Cannot find the message stored with the key:" + messageStorageKey;
+ if(log.isDebugEnabled()) log.debug(message);
+ // Someone else has either removed the sender & message, or another make connection got here first.
+ return;
+ }
+
+ if(pending) addMessagePendingHeader(returnMessage, namespace);
+
+ RMMsgContext returnRMMsg = MsgInitializer.initializeMessage(returnMessage);
+ setTransportProperties (returnMessage, pollMessage);
+
+ // Link the response to the request
+ OperationContext context = pollMessage.getMessageContext().getOperationContext();
+ if(context == null) {
+ AxisOperation oldOperation = returnMessage.getAxisOperation();
+
+ context = ContextFactory.createOperationContext(oldOperation, returnMessage.getServiceContext()); //new OperationContext(oldOperation);
+
+ context.addMessageContext(pollMessage.getMessageContext());
+ pollMessage.getMessageContext().setOperationContext(context);
+ }
+ context.addMessageContext(returnMessage);
+ returnMessage.setOperationContext(context);
+
+ returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
+
+ //running the MakeConnection through a SenderWorker.
+ //This will allow Sandesha2 to consider both of following senarios equally.
+ // 1. A message being sent by the Sender thread.
+ // 2. A message being sent as a reply to an MakeConnection.
+ SenderWorker worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage);
+ worker.setMessage(returnRMMsg);
+ worker.run();
+
+ if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::replyToPoll");
+ }
+
+ private static void addMessagePendingHeader (MessageContext returnMessage, String namespace) throws SandeshaException {
+ MessagePending messagePending = new MessagePending(namespace);
+ messagePending.setPending(true);
+ messagePending.toSOAPEnvelope(returnMessage.getEnvelope());
+ }
+
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ return false;
+ }
+
+ private static void setTransportProperties (MessageContext returnMessage, RMMsgContext makeConnectionMessage) {
+ returnMessage.setProperty(MessageContext.TRANSPORT_OUT,makeConnectionMessage.getProperty(MessageContext.TRANSPORT_OUT));
+ returnMessage.setProperty(Constants.OUT_TRANSPORT_INFO,makeConnectionMessage.getProperty(Constants.OUT_TRANSPORT_INFO));
+
+ Object contentType = makeConnectionMessage.getProperty(Constants.Configuration.CONTENT_TYPE);
+ returnMessage.setProperty(Constants.Configuration.CONTENT_TYPE, contentType);
+
+ returnMessage.setTransportOut(makeConnectionMessage.getMessageContext().getTransportOut());
+ }
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,44 @@
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SequenceEntry;
+import org.apache.sandesha2.wsrm.MessagePending;
+
+public class MessagePendingProcessor {
+
+ private static final Log log = LogFactory.getLog(MessagePendingProcessor.class);
+
+ public void processMessagePendingHeaders(RMMsgContext message) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: MessagePendingProcessor::processMessagePendingHeaders");
+
+ MessagePending messagePending = (MessagePending) message.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
+ if (messagePending!=null) {
+ boolean pending = messagePending.isPending();
+ if (pending) {
+ SequenceEntry entry = (SequenceEntry) message.getProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY);
+ if(entry != null) {
+ ConfigurationContext context = message.getConfigurationContext();
+ StorageManager storage = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+ PollingManager pollingManager = storage.getPollingManager();
+ if(pollingManager != null) pollingManager.schedulePollingRequest(entry.getSequenceId(), entry.isRmSource());
+ }
+ }
+ }
+
+
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: MessagePendingProcessor::processMessagePendingHeaders");
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,43 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.sandesha2.RMMsgContext;
+
+/**
+ * The message processor interface.
+ */
+
+public interface MsgProcessor {
+
+ /**
+ * @param rmMsgCtx
+ * @return true if the msg context has been paused
+ * @throws AxisFault
+ */
+ public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+
+ /**
+ *
+ * @param rmMsgCtx
+ * @return true if the msg context has been paused
+ * @throws AxisFault
+ */
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+}
\ No newline at end of file
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,52 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+
+/**
+ * Used to get a suitable message processor. Given the message type.
+ */
+
+public class MsgProcessorFactory {
+
+ public static MsgProcessor getMessageProcessor(RMMsgContext rmMessageContext) {
+
+ int messageType = rmMessageContext.getMessageType();
+
+ switch (messageType) {
+ case (Sandesha2Constants.MessageTypes.CREATE_SEQ):
+ return new CreateSeqMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.TERMINATE_SEQ):
+ return new TerminateSeqMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.TERMINATE_SEQ_RESPONSE):
+ return new TerminateSeqResponseMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.APPLICATION):
+ return new ApplicationMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE):
+ return new CreateSeqResponseMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE):
+ return new CloseSequenceProcessor();
+ case (Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG):
+ return new MakeConnectionProcessor ();
+ default:
+ return null;
+ }
+ }
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * Responsible for processing the Sequence header (if present) on an incoming
+ * message.
+ */
+
+public class SequenceProcessor {
+
+ private static final Log log = LogFactory.getLog(SequenceProcessor.class);
+
+ public InvocationResponse processSequenceHeader(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SequenceProcessor::processSequenceHeader");
+ InvocationResponse result = InvocationResponse.CONTINUE;
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if(sequence != null) {
+ // This is a reliable message, so hand it on to the main routine
+ result = processReliableMessage(rmMsgCtx);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Message does not contain a sequence header");
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processSequenceHeader " + result);
+ return result;
+ }
+
+ public InvocationResponse processReliableMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SequenceProcessor::processReliableMessage");
+
+ InvocationResponse result = InvocationResponse.CONTINUE;
+
+ if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+ && rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
+ return result;
+ }
+
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceId = sequence.getIdentifier().getIdentifier();
+ long msgNo = sequence.getMessageNumber().getMessageNumber();
+ boolean lastMessage = sequence.getLastMessage() != null;
+
+ // Check that both the Sequence header and message body have been secured properly
+ RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
+ RMDBean bean = mgr.retrieve(sequenceId);
+
+ if(bean != null && bean.getSecurityTokenData() != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+
+ QName seqName = new QName(rmMsgCtx.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.SEQUENCE);
+
+ SOAPEnvelope envelope = msgCtx.getEnvelope();
+ OMElement body = envelope.getBody();
+ OMElement seqHeader = envelope.getHeader().getFirstChildWithName(seqName);
+
+ SecurityToken token = secManager.recoverSecurityToken(bean.getSecurityTokenData());
+
+ secManager.checkProofOfPossession(token, seqHeader, msgCtx);
+ secManager.checkProofOfPossession(token, body, msgCtx);
+ }
+
+ // Store the inbound sequence id, number and lastMessage onto the operation context
+ OperationContext opCtx = msgCtx.getOperationContext();
+ if(opCtx != null) {
+ opCtx.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID, sequenceId);
+ opCtx.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_MESSAGE_NUMBER, new Long(msgNo));
+ if(lastMessage) opCtx.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE, Boolean.TRUE);
+ }
+
+ // setting acked msg no range
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ if (configCtx == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ if (FaultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, Unknown sequence");
+ return InvocationResponse.ABORT;
+ }
+
+ // throwing a fault if the sequence is terminated
+ if (FaultManager.checkForSequenceTerminated(rmMsgCtx, sequenceId, bean)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, Sequence terminated");
+ return InvocationResponse.ABORT;
+ }
+
+ // throwing a fault if the sequence is closed.
+ if (FaultManager.checkForSequenceClosed(rmMsgCtx, sequenceId, bean)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, Sequence closed");
+ return InvocationResponse.ABORT;
+ }
+ FaultManager.checkForLastMsgNumberExceeded(rmMsgCtx, storageManager);
+
+ if (FaultManager.checkForMessageRolledOver(rmMsgCtx, sequenceId, msgNo)) {
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, Message rolled over " + msgNo);
+
+ return InvocationResponse.ABORT;
+ }
+
+ // Pause the messages bean if not the right message to invoke.
+
+ // updating the last activated time of the sequence.
+ bean.setLastActivatedTime(System.currentTimeMillis());
+
+ if (lastMessage) {
+ //setting this as the LastMessage number
+ bean.setLastInMessageId(msgCtx.getMessageID());
+ }
+
+ EndpointReference replyTo = rmMsgCtx.getReplyTo();
+ String key = SandeshaUtil.getUUID(); // key to store the message.
+ // updating the Highest_In_Msg_No property which gives the highest
+ // message number retrieved from this sequence.
+ long highestInMsgNo = bean.getHighestInMessageNumber();
+
+ if (msgNo > highestInMsgNo) {
+ // If WS-Addressing is turned off there may not be a message id written into the SOAP
+ // headers, but we can still fake one up to help us match up requests and replies within
+ // this end of the connection.
+ String messageId = msgCtx.getMessageID();
+ if(messageId == null) {
+ messageId = SandeshaUtil.getUUID();
+ msgCtx.setMessageID(messageId);
+ }
+
+ bean.setHighestInMessageId(messageId);
+ bean.setHighestInMessageNumber(msgNo);
+ }
+
+ String specVersion = rmMsgCtx.getRMSpecVersion();
+ if (rmMsgCtx.getMessageContext().getAxisOperation().getName().getLocalPart().equals(Sandesha2Constants.RM_DUPLICATE_OPERATION.getLocalPart())
+ && (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
+ // this is a duplicate message and the invocation type is EXACTLY_ONCE. We try to return
+ // ack messages at this point, as if someone is sending duplicates then they may have
+ // missed earlier acks. We also have special processing for sync 2-way with RM 1.0
+ if((replyTo==null || replyTo.hasAnonymousAddress()) &&
+ (specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
+
+ SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+ SenderBean findSenderBean = new SenderBean ();
+ findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
+ findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
+ findSenderBean.setSend(true);
+
+ SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
+
+ // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
+ // processor. This will use this thread to re-send the reply, writing it into the transport.
+ // As the reply is now written we do not want to continue processing, or suspend, so we abort.
+ if(replyMessageBean != null) {
+ if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
+ MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
+ result = InvocationResponse.ABORT;
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, replayed message: " + result);
+ return result;
+ }
+ }
+
+ EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
+
+ // Send an Ack if needed.
+ //We are not sending acks for duplicate messages in the RM 1.0 anon InOut case.
+ //If a standalone ack get sent before the actualy message (I.e. before the original msg get
+ //replied), the client may take this as a InOnly message and may avoid looking for the application
+ //response.
+ if (!(Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmMsgCtx.getRMSpecVersion()) &&
+ rmMsgCtx.getReplyTo().hasAnonymousAddress())) {
+ sendAckIfNeeded(bean, sequenceId, rmMsgCtx, storageManager, true, acksTo.hasAnonymousAddress());
+ }
+
+ result = InvocationResponse.ABORT;
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
+ return result;
+ }
+
+ // If the message is a reply to an outbound message then we can update the RMSBean that
+ // matches.
+ String outboundSequence = bean.getOutboundInternalSequence();
+ if(outboundSequence != null) {
+ RMSBean outBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, outboundSequence);
+ if(outBean != null && outBean.getExpectedReplies() > 0 ) {
+ outBean.setExpectedReplies(outBean.getExpectedReplies() - 1);
+ RMSBeanMgr outMgr = storageManager.getRMSBeanMgr();
+ outMgr.update(outBean);
+ }
+ }
+
+ // Set the last activated time
+ bean.setLastActivatedTime(System.currentTimeMillis());
+
+ // Update the RMD bean
+ mgr.update(bean);
+
+ // If we are doing sync 2-way over WSRM 1.0, then we may just have received one of
+ // the reply messages that we were looking for. If so we can remove the matching sender bean.
+ int mep = msgCtx.getAxisOperation().getAxisSpecifMEPConstant();
+ if(specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0) &&
+ mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+ RelatesTo relatesTo = msgCtx.getRelatesTo();
+ if(relatesTo != null) {
+ String messageId = relatesTo.getValue();
+ SenderBean matcher = new SenderBean();
+ matcher.setMessageID(messageId);
+ SenderBean sender = storageManager.getSenderBeanMgr().findUnique(matcher);
+ if(sender != null) {
+ if(log.isDebugEnabled()) log.debug("Deleting sender for sync-2-way message");
+ storageManager.removeMessageContext(sender.getMessageContextRefKey());
+ storageManager.getSenderBeanMgr().delete(messageId);
+
+ // Try and terminate the corresponding outbound sequence
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sender.getSequenceID());
+ TerminateManager.checkAndTerminate(rmMsgCtx.getConfigurationContext(), storageManager, rmsBean);
+ }
+ }
+ }
+
+ //setting properties for the messageContext
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo));
+
+ // We only create an ack message if:
+ // - We have anonymous acks, and the backchannel is free
+ // - We have async acks
+ boolean backchannelFree = (replyTo != null && !replyTo.hasAnonymousAddress()) ||
+ WSDLConstants.MEP_CONSTANT_IN_ONLY == mep;
+ EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
+ if (acksTo.hasAnonymousAddress() && backchannelFree) {
+ Object responseWritten = msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
+ if (responseWritten==null || !Constants.VALUE_TRUE.equals(responseWritten)) {
+ RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
+ msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+ AcknowledgementManager.sendAckNow(ackRMMsgContext);
+ }
+ } else if (!acksTo.hasAnonymousAddress()) {
+ SandeshaPolicyBean policyBean = SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation());
+ long ackInterval = policyBean.getAcknowledgementInterval();
+ long timeToSend = System.currentTimeMillis() + ackInterval;
+
+ RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
+
+ AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, timeToSend, storageManager);
+ }
+
+ // If this message matches the WSRM 1.0 pattern for an empty last message (e.g.
+ // the sender wanted to signal the last message, but didn't have an application
+ // message to send) then we do not need to send the message on to the application.
+ if(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_LAST_MESSAGE.equals(msgCtx.getWSAAction()) ||
+ Sandesha2Constants.SPEC_2005_02.Actions.SOAP_ACTION_LAST_MESSAGE.equals(msgCtx.getSoapAction())) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, got WSRM 1.0 lastmessage, aborting");
+ return InvocationResponse.ABORT;
+ }
+
+ // If the storage manager has an invoker, then they may be implementing inOrder, or
+ // transactional delivery. Either way, if they have one we should use it.
+ SandeshaThread invoker = storageManager.getInvoker();
+ if (invoker != null) {
+ // Whatever the MEP, we stop processing here and the invoker will do the real work. We only
+ // SUSPEND if we need to keep the backchannel open for the response... we may as well ABORT
+ // to let other cases end more quickly.
+ if(backchannelFree) {
+ result = InvocationResponse.ABORT;
+ } else {
+ result = InvocationResponse.SUSPEND;
+ }
+ InvokerBeanMgr storageMapMgr = storageManager.getInvokerBeanMgr();
+
+ storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
+ storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
+
+ // This will avoid performing application processing more than once.
+ rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage " + result);
+
+ return result;
+ }
+
+
+ private void sendAckIfNeeded(RMDBean rmdBean, String sequenceId, RMMsgContext rmMsgCtx,
+ StorageManager storageManager, boolean serverSide, boolean anonymousAcksTo)
+ throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: SequenceProcessor::sendAckIfNeeded " + sequenceId);
+
+ RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
+ rmMsgCtx, rmdBean, sequenceId, storageManager, serverSide);
+
+ if (anonymousAcksTo) {
+ rmMsgCtx.getMessageContext().getOperationContext().
+ setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+ AcknowledgementManager.sendAckNow(ackRMMsgCtx);
+ } else {
+ long ackInterval = SandeshaUtil.getPropertyBean(
+ rmMsgCtx.getMessageContext().getAxisService())
+ .getAcknowledgementInterval();
+ long timeToSend = System.currentTimeMillis() + ackInterval;
+ AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, storageManager);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::sendAckIfNeeded");
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,413 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.http.server.AxisHttpResponseImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+/**
+ * Responsible for processing an incoming Terminate Sequence message.
+ */
+
+public class TerminateSeqMsgProcessor extends WSRMMessageSender implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
+
+ public boolean processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
+
+ MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
+
+ // Processing the terminate message
+ // TODO Add terminate sequence message logic.
+ TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ if (terminateSequence == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noTerminateSeqPart);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String sequenceId = terminateSequence.getIdentifier().getIdentifier();
+ if (sequenceId == null || "".equals(sequenceId)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidSequenceID, null);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+
+ // Check that the sender of this TerminateSequence holds the correct token
+ RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+ if(rmdBean != null && rmdBean.getSecurityTokenData() != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+ OMElement body = terminateSeqRMMsg.getSOAPEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(rmdBean.getSecurityTokenData());
+ secManager.checkProofOfPossession(token, body, terminateSeqRMMsg.getMessageContext());
+ }
+
+ if (FaultManager.checkForUnknownSequence(terminateSeqRMMsg, sequenceId, storageManager)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::processInMessage, unknown sequence");
+ return false;
+ }
+
+ // add the terminate sequence response if required.
+ RMMsgContext terminateSequenceResponse = null;
+ if (SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
+ terminateSequenceResponse = getTerminateSequenceResponse(terminateSeqRMMsg, rmdBean, sequenceId, storageManager);
+
+ setUpHighestMsgNumbers(context, storageManager, sequenceId, terminateSeqRMMsg);
+
+
+
+ boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(context.getAxisConfiguration()).isInOrder();
+
+
+ //if the invocation is inOrder and if this is RM 1.1 there is a posibility of all the messages having eleady being invoked.
+ //In this case we should do the full termination.
+
+ boolean doFullTermination = false;
+
+ if (inOrderInvocation) {
+
+ long highestMsgNo = rmdBean.getHighestInMessageNumber();
+ long nextMsgToProcess = rmdBean.getNextMsgNoToProcess();
+
+ if (nextMsgToProcess>highestMsgNo) {
+ //all the messages have been invoked, u can do the full termination
+ doFullTermination = true;
+ }
+ } else {
+ //for not-inorder case, always do the full termination.
+ doFullTermination = true;
+ }
+
+ if (doFullTermination) {
+ TerminateManager.cleanReceivingSideAfterInvocation(sequenceId, storageManager);
+ TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, storageManager);
+ } else
+ TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, storageManager);
+
+ rmdBean.setTerminated(true);
+ rmdBean.setLastActivatedTime(System.currentTimeMillis());
+ storageManager.getRMDBeanMgr().update(rmdBean);
+
+
+ //sending the terminate sequence response
+ if (terminateSequenceResponse != null) {
+
+ MessageContext outMessage = terminateSequenceResponse.getMessageContext();
+ EndpointReference toEPR = outMessage.getTo();
+
+ AxisEngine engine = new AxisEngine(terminateSeqMsg
+ .getConfigurationContext());
+
+ outMessage.setServerSide(true);
+
+ try {
+ engine.send(outMessage);
+ } catch (AxisFault e) {
+ if (log.isDebugEnabled())
+ log.debug("Unable to send terminate sequence response", e);
+
+ throw new SandeshaException(
+ SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse), e);
+ }
+
+ if (toEPR.hasAnonymousAddress()) {
+ terminateSeqMsg.getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+ } else {
+ terminateSeqMsg.getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ }
+
+ } else {
+ //if RM 1.0 Anonymous scenario we will be trying to attache the TerminateSequence of the response side
+ //as the response message.
+
+ String outgoingSideInternalSeqId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
+ SenderBean senderFindBean = new SenderBean ();
+ senderFindBean.setInternalSequenceID(outgoingSideInternalSeqId);
+ senderFindBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+ senderFindBean.setSend(true);
+ senderFindBean.setReSend(false);
+
+ SenderBean outgoingSideTerminateBean = storageManager.getSenderBeanMgr().findUnique(senderFindBean);
+ if (outgoingSideTerminateBean!=null) {
+
+ EndpointReference toEPR = new EndpointReference (outgoingSideTerminateBean.getToAddress());
+ if (toEPR.hasAnonymousAddress()) {
+ String messageKey = outgoingSideTerminateBean
+ .getMessageContextRefKey();
+ MessageContext message = storageManager
+ .retrieveMessageContext(messageKey, context);
+
+ RMMsgContext rmMessage = MsgInitializer.initializeMessage(message);
+
+ // attaching the this outgoing terminate message as the
+ // response to the incoming terminate message.
+ message.setTransportOut(terminateSeqMsg.getTransportOut());
+ message.setProperty(MessageContext.TRANSPORT_OUT,terminateSeqMsg.getProperty(MessageContext.TRANSPORT_OUT));
+ message.setProperty(Constants.OUT_TRANSPORT_INFO, terminateSeqMsg.getProperty(Constants.OUT_TRANSPORT_INFO));
+
+ terminateSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+
+ AxisEngine engine = new AxisEngine(context);
+ try {
+ engine.send(message);
+ } catch (AxisFault e) {
+ if (log.isDebugEnabled())
+ log.debug("Unable to send terminate sequence response", e);
+
+ throw new SandeshaException(
+ SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse), e);
+ }
+
+ MessageRetransmissionAdjuster.adjustRetransmittion(rmMessage, outgoingSideTerminateBean, context, storageManager);
+ }
+
+ }
+
+ }
+
+ terminateSeqMsg.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
+ return true;
+ }
+
+ private void setUpHighestMsgNumbers(ConfigurationContext configCtx, StorageManager storageManager,
+ String sequenceId, RMMsgContext terminateRMMsg) throws SandeshaException {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
+
+ RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
+ RMDBean bean = mgr.retrieve(sequenceId);
+
+ long highestInMsgNo = bean.getHighestInMessageNumber();
+
+ // following will be valid only for the server side, since the obtained
+ // int. seq ID is only valid there.
+ String responseSideInternalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
+
+
+ long highestOutMsgNo = 0;
+ try {
+ boolean addResponseSideTerminate = false;
+ if (highestInMsgNo == 0) {
+ addResponseSideTerminate = false;
+ } else {
+ // Mark up the highest inbound message as if it had the last message flag on it.
+ //
+ String inMsgId = bean.getHighestInMessageId();
+ bean.setLastInMessageId(inMsgId);
+
+ // Update the RMDBean
+ storageManager.getRMDBeanMgr().update(bean);
+
+ // If an outbound message has already gone out with that relatesTo, then we can terminate
+ // right away.
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, responseSideInternalSequenceId);
+
+ if(rmsBean != null) {
+ String highestOutRelatesTo = rmsBean.getHighestOutRelatesTo();
+ if (highestOutRelatesTo != null && highestOutRelatesTo.equals(inMsgId)) {
+ highestOutMsgNo = rmsBean.getHighestOutMessageNumber();
+ addResponseSideTerminate = true;
+
+ // It is possible that the message has gone out, but not been acked yet. In that case
+ // we can store the HIGHEST_OUT_MSG_NUMBER as the LAST_OUT_MESSAGE_NO, so that when the
+ // ack arrives we will terminate the sequence
+ rmsBean.setLastOutMessage(highestOutMsgNo);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ }
+ }
+ }
+
+ // If all the out message have been acked, add the outgoing
+ // terminate seq msg.
+ String outgoingSequnceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(responseSideInternalSequenceId, storageManager);
+
+ if (addResponseSideTerminate && highestOutMsgNo > 0 && responseSideInternalSequenceId != null
+ && outgoingSequnceID != null) {
+ boolean allAcked = SandeshaUtil.isAllMsgsAckedUpto(highestOutMsgNo, responseSideInternalSequenceId, storageManager);
+
+ if (allAcked)
+ {
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outgoingSequnceID);
+ if (!rmsBean.isTerminateAdded()) {
+ TerminateManager.addTerminateSequenceMessage(terminateRMMsg, rmsBean.getInternalSequenceID(), outgoingSequnceID , storageManager);
+ String referenceMsgKey = rmsBean.getReferenceMessageStoreKey();
+ if (referenceMsgKey==null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referenceMessageNotSetForSequence,rmsBean.getSequenceID());
+ throw new SandeshaException (message);
+ }
+
+ MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey, configCtx);
+
+ if (referenceMessage==null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getSequenceID());
+ throw new SandeshaException (message);
+ }
+
+ RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
+
+ }
+ }
+ }
+ } catch (AxisFault e) {
+ throw new SandeshaException(e);
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
+ }
+
+ private RMMsgContext getTerminateSequenceResponse(RMMsgContext terminateSeqRMMsg, RMDBean rmdBean, String sequenceId,
+ StorageManager storageManager) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::addTerminateSequenceResponse, " + sequenceId);
+
+ RMMsgContext terminateSeqResponseRMMsg = RMMsgCreator.createTerminateSeqResponseMsg(terminateSeqRMMsg, rmdBean);
+ MessageContext outMessage = terminateSeqResponseRMMsg.getMessageContext();
+
+ RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, rmdBean,
+ sequenceId, storageManager, true);
+
+ // copy over the ack parts
+ Iterator iter = ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ while (iter.hasNext()) {
+ SequenceAcknowledgement seqAck = (SequenceAcknowledgement) iter.next();
+ terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, seqAck);
+ }
+
+ terminateSeqResponseRMMsg.addSOAPEnvelope();
+
+ terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+ terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ outMessage.setResponseWritten(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::addTerminateSequenceResponse");
+
+ return terminateSeqResponseRMMsg;
+ }
+
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
+
+ // Get the parent processor to setup the out message
+ setupOutMessage(rmMsgCtx);
+
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(getStorageManager(), getInternalSequenceID());
+
+ // Check if the sequence is already terminated (stored on the internal sequenceid)
+ if (rmsBean.isTerminateAdded()) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
+ log.debug(message);
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage, sequence previously terminated");
+ return true;
+ }
+
+ AxisOperation terminateOp = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
+ rmMsgCtx.getRMSpecVersion(),
+ getMsgContext().getAxisService());
+
+ OperationContext opcontext = ContextFactory.createOperationContext(terminateOp, getMsgContext().getServiceContext());
+ opcontext.setParent(getMsgContext().getServiceContext());
+
+ getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(), opcontext);
+
+ getMsgContext().setOperationContext(opcontext);
+ getMsgContext().setAxisOperation(terminateOp);
+
+ TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ terminateSequencePart.getIdentifier().setIndentifer(getOutSequenceID());
+
+ rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(getRMVersion()));
+ rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(getRMVersion()));
+
+ rmsBean.setTerminateAdded(true);
+
+ // Update the RMSBean with the terminate added flag
+ getStorageManager().getRMSBeanMgr().update(rmsBean);
+
+ // Send the outgoing message
+ // Set a retransmitter lastSentTime so that terminate will be send with
+ // some delay.
+ // Otherwise this get send before return of the current request (ack).
+ // TODO: refine the terminate delay.
+ sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.TERMINATE_SEQ, Sandesha2Constants.TERMINATE_DELAY);
+
+ // Pause the message context
+ rmMsgCtx.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
+ return true;
+ }
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,96 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.TerminateSequenceResponse;
+
+/**
+ * To process terminate sequence response messages.
+ */
+public class TerminateSeqResponseMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
+
+ public boolean processInMessage(RMMsgContext terminateResRMMsg)
+ throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
+
+ MessageContext msgContext = terminateResRMMsg.getMessageContext();
+ ConfigurationContext context = terminateResRMMsg.getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+
+ TerminateSequenceResponse tsResponse = (TerminateSequenceResponse)
+ terminateResRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
+
+ String sequenceId = tsResponse.getIdentifier().getIdentifier();
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceId);
+
+ // Check that the sender of this TerminateSequence holds the correct token
+ if(rmsBean != null && rmsBean.getSecurityTokenData() != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+ OMElement body = terminateResRMMsg.getSOAPEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(rmsBean.getSecurityTokenData());
+ secManager.checkProofOfPossession(token, body, msgContext);
+ }
+
+ msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,rmsBean.getInternalSequenceID());
+
+ //shedulling a polling request for the response side.
+ if (rmsBean.getOfferedSequence()!=null) {
+ RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
+ RMDBean rMDBean = rMDBeanMgr.retrieve(sequenceId);
+
+ if (rMDBean!=null && rMDBean.isPollingMode()) {
+ PollingManager manager = storageManager.getPollingManager();
+ if(manager != null) manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+ }
+ }
+
+ TerminateManager.terminateSendingSide (rmsBean, storageManager);
+
+ // Stop this message travelling further through the Axis runtime
+ terminateResRMMsg.pause();
+
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
+ return true;
+ }
+
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+ if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
+ return false;
+ }
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,111 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgreceivers;
+
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+*Currently this is a dummy Msg Receiver.
+*All the necessary RM logic happens at MessageProcessors.
+*This only ensures that the defaults Messsage Receiver does not get called for RM control messages.
+*/
+
+
+public class RMMessageReceiver extends AbstractMessageReceiver {
+
+ private static final Log log = LogFactory.getLog(RMMessageReceiver.class);
+
+ public final void receive(MessageContext msgCtx) throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Entry: RMMessageReceiver::receive");
+
+ RMMsgContext rmMsgCtx = null;
+
+ if (msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT) != null)
+ rmMsgCtx = (RMMsgContext)msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT);
+ else
+ rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+ if(log.isDebugEnabled()) log.debug("MsgReceiver got type: " + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
+
+ // Note that some messages (such as stand-alone acks) will be routed here, but
+ // the headers will already have been processed. Therefore we should not assume
+ // that we will have a MsgProcessor every time.
+ MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+ if(msgProcessor != null) {
+ Transaction transaction = null;
+
+ try {
+ ConfigurationContext context = msgCtx.getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+ transaction = storageManager.getTransaction();
+
+ msgProcessor.processInMessage(rmMsgCtx);
+
+ } catch (Exception e) {
+ if (log.isDebugEnabled())
+ log.debug("Exception caught during processInMessage", e);
+ // message should not be sent in a exception situation.
+ msgCtx.pause();
+
+ if (transaction != null) {
+ try {
+ transaction.rollback();
+ transaction = null;
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+ log.debug(message, e);
+ }
+ }
+
+ if (!(e instanceof AxisFault)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.inMsgError, e.toString());
+ throw new AxisFault(message, e);
+ }
+
+ throw (AxisFault)e;
+ } finally {
+ if (transaction != null) {
+ try {
+ transaction.commit();
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
+ }
+ }
+ }
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: RMMessageReceiver::receive");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org