You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/10/30 16:43:25 UTC
svn commit: r469167 [4/7] - in /webservices/sandesha/trunk/java: config/
interop/conf/ src/org/apache/sandesha2/ src/org/apache/sandesha2/client/
src/org/apache/sandesha2/handlers/ src/org/apache/sandesha2/i18n/
src/org/apache/sandesha2/msgprocessors/ ...
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Oct 30 07:43:24 2006
@@ -17,7 +17,6 @@
package org.apache.sandesha2.msgprocessors;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -35,9 +34,7 @@
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisOperationFactory;
import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
@@ -68,11 +65,11 @@
private static final Log log = LogFactory.getLog(AckRequestedProcessor.class);
- public boolean processAckRequestedHeaders(MessageContext message) throws AxisFault {
+ public boolean processAckRequestedHeaders(RMMsgContext message) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AckRequestedProcessor::processAckRequestHeaders");
- SOAPEnvelope envelope = message.getEnvelope();
+ SOAPEnvelope envelope = message.getMessageContext().getEnvelope();
SOAPHeader header = envelope.getHeader();
boolean msgCtxPaused = false;
if(header!=null)
@@ -107,18 +104,13 @@
* @return true if the msg context was paused
* @throws AxisFault
*/
- public boolean processAckRequestedHeader(MessageContext msgContext, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
+ public boolean processAckRequestedHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AckRequestedProcessor::processAckRequestedHeader " + soapHeader);
- // TODO: Note that this RMMessageContext is not really any use - but we need to create it
- // so that it can be passed to the fault handling chain. It's really no more than a
- // container for the correct addressing and RM spec levels, so we'd be better off passing
- // them in directly. Unfortunately that change ripples through the codebase...
- RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgContext);
-
String sequenceId = ackRequested.getIdentifier().getIdentifier();
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
ConfigurationContext configurationContext = msgContext.getConfigurationContext();
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
@@ -142,6 +134,8 @@
// Setting the ack depending on AcksTo.
SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequencePropertyKey,
Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+ SequencePropertyBean versionBean = seqPropMgr.retrieve(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
String acksToStr = acksTo.getAddress();
@@ -149,24 +143,10 @@
if (acksToStr == null)
throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
- AxisOperation ackOperation = null;
-
- try {
- ackOperation = AxisOperationFactory.getOperationDescription(WSDL20_2004Constants.MEP_URI_IN_ONLY);
- } catch (AxisFault e) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisOperationError, e
- .toString()));
- }
-
- AxisOperation rmMsgOperation = rmMsgCtx.getMessageContext().getAxisOperation();
- if (rmMsgOperation != null) {
- ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
- if (outFlow != null) {
- ackOperation.setPhasesOutFlow(outFlow);
- ackOperation.setPhasesOutFaultFlow(outFlow);
- }
- }
-
+ AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.ACK,
+ versionBean.getValue(),
+ msgContext.getAxisService());
MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
@@ -210,14 +190,8 @@
if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
// operation context will be null when doing in a GLOBAL
// handler.
- try {
- AxisOperation op = AxisOperationFactory.getAxisOperation(WSDL20_2004Constants.MEP_CONSTANT_IN_OUT);
- OperationContext opCtx = new OperationContext(op);
- rmMsgCtx.getMessageContext().setAxisOperation(op);
- rmMsgCtx.getMessageContext().setOperationContext(opCtx);
- } catch (AxisFault e2) {
- throw new SandeshaException(e2.getMessage());
- }
+ OperationContext opCtx = new OperationContext(ackOperation);
+ rmMsgCtx.getMessageContext().setOperationContext(opCtx);
}
rmMsgCtx.getMessageContext().getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Oct 30 07:43:24 2006
@@ -45,10 +45,8 @@
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
-import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
import org.apache.sandesha2.wsrm.Nack;
@@ -64,16 +62,14 @@
/**
* @param message
- * @return true if the msg context was paused
* @throws AxisFault
*/
- public boolean processAckHeaders(MessageContext message) throws AxisFault {
+ public void processAckHeaders(RMMsgContext message) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AcknowledgementProcessor::processAckHeaders");
- SOAPEnvelope envelope = message.getEnvelope();
+ SOAPEnvelope envelope = message.getMessageContext().getEnvelope();
SOAPHeader header = envelope.getHeader();
- boolean returnValue = false;
if(header!=null)
{
for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
@@ -84,41 +80,28 @@
OMElement ack = (OMElement) acks.next();
SequenceAcknowledgement seqAck = new SequenceAcknowledgement(headerName.getNamespaceURI());
seqAck.fromOMElement(ack);
- boolean ackPaused = processAckHeader(message, ack, seqAck);
- //if not already paused we might be now
- if(!returnValue){
- returnValue = ackPaused;
- }
+ processAckHeader(message, ack, seqAck);
}
}
}
if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementProcessor::processAckHeaders " + returnValue);
- return returnValue;
+ log.debug("Exit: AcknowledgementProcessor::processAckHeaders");
}
/**
- * @param msgCtx
+ * @param rmMsgCtx
* @param soapHeader
* @param sequenceAck
- * @return true if the msg context was paused
* @throws AxisFault
*/
- private boolean processAckHeader(MessageContext msgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
+ private void processAckHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AcknowledgementProcessor::processAckHeader " + soapHeader);
- boolean returnValue = false;
-
- // TODO: Note that this RMMessageContext is not really any use - but we need to create it
- // so that it can be passed to the fault handling chain. It's really no more than a
- // container for the correct addressing and RM spec levels, so we'd be better off passing
- // them in directly. Unfortunately that change ripples through the codebase...
- RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
-
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
ConfigurationContext configCtx = msgCtx.getConfigurationContext();
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
@@ -260,16 +243,8 @@
}
}
-
- String action = msgCtx.getOptions().getAction();
- if (action!=null && action.equals(SpecSpecificConstants.getAckRequestAction(rmMsgCtx.getRMSpecVersion()))) {
- returnValue = true;
- rmMsgCtx.pause();
- }
-
if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementProcessor::processAckHeader " + returnValue);
- return returnValue;
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader");
}
private SenderBean getRetransmitterEntry(Collection collection, long msgNo) {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Oct 30 07:43:24 2006
@@ -17,28 +17,16 @@
package org.apache.sandesha2.msgprocessors;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAPBody;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.context.OperationContextFactory;
import org.apache.axis2.context.ServiceContext;
-import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
@@ -52,17 +40,11 @@
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.InvokerBean;
-import org.apache.sandesha2.storage.beans.NextMsgBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SOAPAbstractFactory;
@@ -88,385 +70,13 @@
public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Enter: ApplicationMsgProcessor::processInMessage");
-
- boolean msgCtxPaused = false;
-
- // Processing the application message.
- MessageContext msgCtx = rmMsgCtx.getMessageContext();
- if (msgCtx == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgContextNotSetInbound);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
- && rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
- return msgCtxPaused;
- }
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
- Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String sequenceId = sequence.getIdentifier().getIdentifier();
-
- String propertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
- // Check that both the Sequence header and message body have been secured properly
- SequencePropertyBean tokenBean = seqPropMgr.retrieve(propertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
- if(tokenBean != null) {
- SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
-
- QName seqName = new QName(rmMsgCtx.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.SEQUENCE);
-
- SOAPEnvelope envelope = msgCtx.getEnvelope();
- OMElement body = envelope.getBody();
- OMElement seqHeader = envelope.getHeader().getFirstChildWithName(seqName);
-
- SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
-
- secManager.checkProofOfPossession(token, seqHeader, msgCtx);
- secManager.checkProofOfPossession(token, body, msgCtx);
- }
-
- //RM will not send sync responses. If sync acks are there this will be
- // made true again later.
- if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
- rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,
- Constants.VALUE_FALSE);
+ log.debug("Exit: ApplicationMsgProcessor::processInMessage");
}
-
- // setting acked msg no range
- ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
- if (configCtx == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- FaultManager faultManager = new FaultManager();
- SandeshaException fault = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
- if (fault != null) {
- throw fault;
- }
-
- // setting mustUnderstand to false.
- sequence.setMustUnderstand(false);
- rmMsgCtx.addSOAPEnvelope();
-
- // throwing a fault if the sequence is closed.
- fault = faultManager.checkForSequenceClosed(rmMsgCtx, sequenceId, storageManager);
- if (fault != null) {
- throw fault;
- }
-
- fault = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx, storageManager);
- if (fault != null) {
- throw fault;
- }
-
- // updating the last activated time of the sequence.
- SequenceManager.updateLastActivatedTime(propertyKey, storageManager);
-
- SequencePropertyBean msgsBean = seqPropMgr.retrieve(propertyKey,
- Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-
- long msgNo = sequence.getMessageNumber().getMessageNumber();
- if (msgNo == 0) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
- .toString(msgNo));
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String key = SandeshaUtil.getUUID(); // key to store the message.
-
- // updating the Highest_In_Msg_No property which gives the highest
- // message number retrieved from this sequence.
- String highetsInMsgNoStr = SandeshaUtil.getSequenceProperty(propertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
- String highetsInMsgKey = SandeshaUtil.getSequenceProperty(propertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
- if (highetsInMsgKey == null)
- highetsInMsgKey = SandeshaUtil.getUUID();
-
- long highestInMsgNo = 0;
- if (highetsInMsgNoStr != null) {
- highestInMsgNo = Long.parseLong(highetsInMsgNoStr);
- }
-
- if (msgNo > highestInMsgNo) {
- highestInMsgNo = msgNo;
-
- String str = new Long(msgNo).toString();
- SequencePropertyBean highestMsgNoBean = new SequencePropertyBean(propertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, str);
- SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean(propertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, highetsInMsgKey);
-
- // storing the new message as the highest in message.
- storageManager.removeMessageContext(highetsInMsgKey);
- storageManager.storeMessageContext(highetsInMsgKey, msgCtx);
-
- if (highetsInMsgNoStr != null) {
- seqPropMgr.update(highestMsgNoBean);
- seqPropMgr.update(highestMsgKeyBean);
- } else {
- seqPropMgr.insert(highestMsgNoBean);
- seqPropMgr.insert(highestMsgKeyBean);
- }
- }
-
- String messagesStr = "";
- if (msgsBean != null)
- messagesStr = msgsBean.getValue();
- else {
- msgsBean = new SequencePropertyBean();
- msgsBean.setSequencePropertyKey(propertyKey);
- msgsBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
- msgsBean.setValue(messagesStr);
- }
-
- boolean msgNoPresentInList = msgNoPresentInList(messagesStr, msgNo);
-
- if (msgNoPresentInList
- && (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
- // this is a duplicate message and the invocation type is
- // EXACTLY_ONCE.
- rmMsgCtx.pause();
- msgCtxPaused = true;
- }
-
- if (!msgNoPresentInList)
- {
- if (messagesStr != null && !"".equals(messagesStr))
- messagesStr = messagesStr + "," + Long.toString(msgNo);
- else
- messagesStr = Long.toString(msgNo);
-
- msgsBean.setValue(messagesStr);
- seqPropMgr.update(msgsBean);
- }
-
- // Pause the messages bean if not the right message to invoke.
- NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
- NextMsgBean bean = mgr.retrieve(sequenceId);
-
- if (bean == null) {
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotFindSequence,
- sequenceId));
- }
-
- InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
-
- // inorder invocation is still a global property
- boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
- msgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
-
-
- //setting properties for the messageContext
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo));
-
- if (inOrderInvocation && !msgNoPresentInList) {
-
- SequencePropertyBean incomingSequenceListBean = seqPropMgr.retrieve(
- Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
- Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
- if (incomingSequenceListBean == null) {
- ArrayList incomingSequenceList = new ArrayList();
- incomingSequenceListBean = new SequencePropertyBean();
- incomingSequenceListBean.setSequencePropertyKey(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
- incomingSequenceListBean.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
- incomingSequenceListBean.setValue(incomingSequenceList.toString());
-
- // this get inserted before
- seqPropMgr.insert(incomingSequenceListBean);
- }
-
- ArrayList incomingSequenceList = SandeshaUtil.getArrayListFromString(incomingSequenceListBean.getValue());
-
- // Adding current sequence to the incoming sequence List.
- if (!incomingSequenceList.contains(sequenceId)) {
- incomingSequenceList.add(sequenceId);
-
- // saving the property.
- incomingSequenceListBean.setValue(incomingSequenceList.toString());
- seqPropMgr.update(incomingSequenceListBean);
- }
-
- // saving the message.
- try {
- storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
- storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
-
- // This will avoid performing application processing more
- // than
- // once.
- rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- } catch (Exception ex) {
- throw new SandeshaException(ex.getMessage(), ex);
- }
-
- // pause the message
- rmMsgCtx.pause();
- msgCtxPaused = true;
-
- // Starting the invoker if stopped.
- SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), sequenceId);
-
- }
-
- // Sending acknowledgements
- sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::processInMessage " + msgCtxPaused);
- return msgCtxPaused;
- }
-
- // TODO convert following from INT to LONG
- private boolean msgNoPresentInList(String list, long no) {
- String[] msgStrs = list.split(",");
-
- int l = msgStrs.length;
-
- for (int i = 0; i < l; i++) {
- if (msgStrs[i].equals(Long.toString(no)))
- return true;
- }
-
return false;
}
- public void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager)
- throws AxisFault {
-
- if (log.isDebugEnabled())
- log.debug("Enter: ApplicationMsgProcessor::sendAckIfNeeded");
-
- String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
- Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String sequenceId = sequence.getIdentifier().getIdentifier();
- ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
- if (configCtx == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
-
- RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey ,sequenceId, storageManager);
- MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
-
- EndpointReference acksTo = ackRMMsgCtx.getTo();
-
- if (SandeshaUtil.isAnonymousURI (acksTo.getAddress())) {
-
- // AxisEngine engine = new
- // AxisEngine(ackRMMsgCtx.getMessageContext()
- // .getConfigurationContext());
-
- // setting CONTEXT_WRITTEN since acksto is anonymous
- if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
- // operation context will be null when doing in a GLOBAL
- // handler.
-
- AxisOperation op = AxisOperationFactory.getAxisOperation(WSDL20_2004Constants.MEP_CONSTANT_IN_OUT);
- OperationContext opCtx = new OperationContext(op);
- rmMsgCtx.getMessageContext().setAxisOperation(op);
- rmMsgCtx.getMessageContext().setOperationContext(opCtx);
- }
-
- rmMsgCtx.getMessageContext().getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
-
- rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN, "true");
-
- ackRMMsgCtx.getMessageContext().setServerSide(true);
-
- AxisEngine engine = new AxisEngine(configCtx);
- engine.send(ackRMMsgCtx.getMessageContext());
-
- } else {
-
- // / Transaction asyncAckTransaction =
- // storageManager.getTransaction();
-
- SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
-
- String key = SandeshaUtil.getUUID();
-
- SenderBean ackBean = new SenderBean();
- ackBean.setMessageContextRefKey(key);
- ackBean.setMessageID(ackMsgCtx.getMessageID());
- ackBean.setReSend(false);
- ackBean.setSequenceID(sequencePropertyKey);
- EndpointReference to = ackMsgCtx.getTo();
- if (to!=null)
- ackBean.setToAddress(to.getAddress());
-
- // this will be set to true in the sender.
- ackBean.setSend(true);
-
- ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
- long ackInterval = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation())
- .getAcknowledgementInterval();
-
- // Ack will be sent as stand alone, only after the retransmitter
- // interval.
- long timeToSend = System.currentTimeMillis() + ackInterval;
-
- // removing old acks.
- SenderBean findBean = new SenderBean();
- findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-
- // this will be set to true in the sandesha2TransportSender.
- findBean.setSend(true);
- findBean.setReSend(false);
- Collection coll = retransmitterBeanMgr.find(findBean);
- Iterator it = coll.iterator();
-
- if (it.hasNext()) {
- SenderBean oldAckBean = (SenderBean) it.next();
- timeToSend = oldAckBean.getTimeToSend(); // If there is an
- // old ack. This ack
- // will be sent in
- // the old
- // timeToSend.
-
- // removing the retransmitted entry for the oldAck
- retransmitterBeanMgr.delete(oldAckBean.getMessageID());
-
- // removing the message store entry for the old ack
- storageManager.removeMessageContext(oldAckBean.getMessageContextRefKey());
- }
-
- ackBean.setTimeToSend(timeToSend);
- storageManager.storeMessageContext(key, ackMsgCtx);
-
- ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- // inserting the new ack.
- retransmitterBeanMgr.insert(ackBean);
- // / asyncAckTransaction.commit();
-
- // passing the message through sandesha2sender
- ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
- ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
-
- SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
-
- SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(), sequenceId);
- }
-
-
- if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::sendAckIfNeeded");
- }
-
public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
@@ -1227,11 +837,4 @@
log.debug("Exit: ApplicationMsgProcessor::setNextMsgNo");
}
- private boolean isWSAAnonymous (String address) {
- if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
- AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address))
- return true;
-
- return false;
- }
}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=auto&rev=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Mon Oct 30 07:43:24 2006
@@ -0,0 +1,456 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * Responsible for processing the Sequence header (if present) on an incoming
+ * message.
+ */
+
+public class SequenceProcessor {
+
+ private static final Log log = LogFactory.getLog(SequenceProcessor.class);
+
+ public boolean processSequenceHeader(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SequenceProcessor::processSequenceHeader");
+ boolean result = false;
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if(sequence != null) {
+ // This is a reliable message, so hand it on to the main routine
+ result = processReliableMessage(rmMsgCtx);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Message does not contain a sequence header");
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processSequenceHeader " + result);
+ return result;
+ }
+
+ public boolean processReliableMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SequenceProcessor::processReliableMessage");
+
+ boolean msgCtxPaused = false;
+
+ if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+ && rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
+ return msgCtxPaused;
+ }
+
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceId = sequence.getIdentifier().getIdentifier();
+
+ String propertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+
+ // Check that both the Sequence header and message body have been secured properly
+ SequencePropertyBean tokenBean = seqPropMgr.retrieve(propertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+ if(tokenBean != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+
+ QName seqName = new QName(rmMsgCtx.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.SEQUENCE);
+
+ SOAPEnvelope envelope = msgCtx.getEnvelope();
+ OMElement body = envelope.getBody();
+ OMElement seqHeader = envelope.getHeader().getFirstChildWithName(seqName);
+
+ SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+
+ secManager.checkProofOfPossession(token, seqHeader, msgCtx);
+ secManager.checkProofOfPossession(token, body, msgCtx);
+ }
+
+ //RM will not send sync responses. If sync acks are there this will be
+ // made true again later.
+ if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
+ rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,
+ Constants.VALUE_FALSE);
+ }
+
+ // setting acked msg no range
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ if (configCtx == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ FaultManager faultManager = new FaultManager();
+ SandeshaException fault = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
+ if (fault != null) {
+ throw fault;
+ }
+
+ // setting mustUnderstand to false.
+ sequence.setMustUnderstand(false);
+ rmMsgCtx.addSOAPEnvelope();
+
+ // throwing a fault if the sequence is closed.
+ fault = faultManager.checkForSequenceClosed(rmMsgCtx, sequenceId, storageManager);
+ if (fault != null) {
+ throw fault;
+ }
+
+ fault = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx, storageManager);
+ if (fault != null) {
+ throw fault;
+ }
+
+ // updating the last activated time of the sequence.
+ SequenceManager.updateLastActivatedTime(propertyKey, storageManager);
+
+ SequencePropertyBean msgsBean = seqPropMgr.retrieve(propertyKey,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+
+ long msgNo = sequence.getMessageNumber().getMessageNumber();
+ if (msgNo == 0) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
+ .toString(msgNo));
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String key = SandeshaUtil.getUUID(); // key to store the message.
+
+ // updating the Highest_In_Msg_No property which gives the highest
+ // message number retrieved from this sequence.
+ String highetsInMsgNoStr = SandeshaUtil.getSequenceProperty(propertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
+ String highetsInMsgKey = SandeshaUtil.getSequenceProperty(propertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
+ if (highetsInMsgKey == null)
+ highetsInMsgKey = SandeshaUtil.getUUID();
+
+ long highestInMsgNo = 0;
+ if (highetsInMsgNoStr != null) {
+ highestInMsgNo = Long.parseLong(highetsInMsgNoStr);
+ }
+
+ if (msgNo > highestInMsgNo) {
+ highestInMsgNo = msgNo;
+
+ String str = new Long(msgNo).toString();
+ SequencePropertyBean highestMsgNoBean = new SequencePropertyBean(propertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, str);
+ SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean(propertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, highetsInMsgKey);
+
+ // storing the new message as the highest in message.
+ storageManager.removeMessageContext(highetsInMsgKey);
+ storageManager.storeMessageContext(highetsInMsgKey, msgCtx);
+
+ if (highetsInMsgNoStr != null) {
+ seqPropMgr.update(highestMsgNoBean);
+ seqPropMgr.update(highestMsgKeyBean);
+ } else {
+ seqPropMgr.insert(highestMsgNoBean);
+ seqPropMgr.insert(highestMsgKeyBean);
+ }
+ }
+
+ String messagesStr = "";
+ if (msgsBean != null)
+ messagesStr = msgsBean.getValue();
+ else {
+ msgsBean = new SequencePropertyBean();
+ msgsBean.setSequencePropertyKey(propertyKey);
+ msgsBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+ msgsBean.setValue(messagesStr);
+ }
+
+ boolean msgNoPresentInList = msgNoPresentInList(messagesStr, msgNo);
+
+ if (msgNoPresentInList
+ && (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
+ // this is a duplicate message and the invocation type is
+ // EXACTLY_ONCE.
+ rmMsgCtx.pause();
+ msgCtxPaused = true;
+ }
+
+ if (!msgNoPresentInList)
+ {
+ if (messagesStr != null && !"".equals(messagesStr))
+ messagesStr = messagesStr + "," + Long.toString(msgNo);
+ else
+ messagesStr = Long.toString(msgNo);
+
+ msgsBean.setValue(messagesStr);
+ seqPropMgr.update(msgsBean);
+ }
+
+ // Pause the messages bean if not the right message to invoke.
+ NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
+ NextMsgBean bean = mgr.retrieve(sequenceId);
+
+ if (bean == null) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotFindSequence,
+ sequenceId));
+ }
+
+ InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
+
+ // inorder invocation is still a global property
+ boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
+ msgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
+
+
+ //setting properties for the messageContext
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo));
+
+ if (inOrderInvocation && !msgNoPresentInList) {
+
+ SequencePropertyBean incomingSequenceListBean = seqPropMgr.retrieve(
+ Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+ Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (incomingSequenceListBean == null) {
+ ArrayList incomingSequenceList = new ArrayList();
+ incomingSequenceListBean = new SequencePropertyBean();
+ incomingSequenceListBean.setSequencePropertyKey(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
+ incomingSequenceListBean.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+ incomingSequenceListBean.setValue(incomingSequenceList.toString());
+
+ // this get inserted before
+ seqPropMgr.insert(incomingSequenceListBean);
+ }
+
+ ArrayList incomingSequenceList = SandeshaUtil.getArrayListFromString(incomingSequenceListBean.getValue());
+
+ // Adding current sequence to the incoming sequence List.
+ if (!incomingSequenceList.contains(sequenceId)) {
+ incomingSequenceList.add(sequenceId);
+
+ // saving the property.
+ incomingSequenceListBean.setValue(incomingSequenceList.toString());
+ seqPropMgr.update(incomingSequenceListBean);
+ }
+
+ // saving the message.
+ try {
+ storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
+ storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
+
+ // This will avoid performing application processing more
+ // than
+ // once.
+ rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ } catch (Exception ex) {
+ throw new SandeshaException(ex.getMessage(), ex);
+ }
+
+ // pause the message
+ rmMsgCtx.pause();
+ msgCtxPaused = true;
+
+ // Starting the invoker if stopped.
+ SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), sequenceId);
+
+ }
+
+ // Sending acknowledgements
+ sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processInMessage " + msgCtxPaused);
+ return msgCtxPaused;
+ }
+
+ // TODO convert following from INT to LONG
+ private boolean msgNoPresentInList(String list, long no) {
+ String[] msgStrs = list.split(",");
+
+ int l = msgStrs.length;
+
+ for (int i = 0; i < l; i++) {
+ if (msgStrs[i].equals(Long.toString(no)))
+ return true;
+ }
+
+ return false;
+ }
+
+ public void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager)
+ throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: SequenceProcessor::sendAckIfNeeded");
+
+ String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceId = sequence.getIdentifier().getIdentifier();
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ if (configCtx == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+ if(log.isDebugEnabled()) log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey, sequenceId, storageManager);
+ MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+
+ EndpointReference acksTo = ackRMMsgCtx.getTo();
+
+ if (SandeshaUtil.isAnonymousURI (acksTo.getAddress())) {
+
+ // setting CONTEXT_WRITTEN since acksto is anonymous
+ if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
+ // operation context will be null when doing in a GLOBAL
+ // handler.
+ AxisOperation op = ackMsgCtx.getAxisOperation();
+ OperationContext opCtx = new OperationContext(op);
+ rmMsgCtx.getMessageContext().setOperationContext(opCtx);
+ }
+
+ rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+
+ rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN, "true");
+
+ ackRMMsgCtx.getMessageContext().setServerSide(true);
+
+ AxisEngine engine = new AxisEngine(configCtx);
+ engine.send(ackRMMsgCtx.getMessageContext());
+
+ } else {
+
+ // / Transaction asyncAckTransaction =
+ // storageManager.getTransaction();
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean ackBean = new SenderBean();
+ ackBean.setMessageContextRefKey(key);
+ ackBean.setMessageID(ackMsgCtx.getMessageID());
+ ackBean.setReSend(false);
+ ackBean.setSequenceID(sequencePropertyKey);
+ EndpointReference to = ackMsgCtx.getTo();
+ if (to!=null)
+ ackBean.setToAddress(to.getAddress());
+
+ // this will be set to true in the sender.
+ ackBean.setSend(true);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+ long ackInterval = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation())
+ .getAcknowledgementInterval();
+
+ // Ack will be sent as stand alone, only after the retransmitter
+ // interval.
+ long timeToSend = System.currentTimeMillis() + ackInterval;
+
+ // removing old acks.
+ SenderBean findBean = new SenderBean();
+ findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ // this will be set to true in the sandesha2TransportSender.
+ findBean.setSend(true);
+ findBean.setReSend(false);
+ Collection coll = retransmitterBeanMgr.find(findBean);
+ Iterator it = coll.iterator();
+
+ if (it.hasNext()) {
+ SenderBean oldAckBean = (SenderBean) it.next();
+ timeToSend = oldAckBean.getTimeToSend(); // If there is an
+ // old ack. This ack
+ // will be sent in
+ // the old
+ // timeToSend.
+
+ // removing the retransmitted entry for the oldAck
+ retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+
+ // removing the message store entry for the old ack
+ storageManager.removeMessageContext(oldAckBean.getMessageContextRefKey());
+ }
+
+ ackBean.setTimeToSend(timeToSend);
+ storageManager.storeMessageContext(key, ackMsgCtx);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
+ // / asyncAckTransaction.commit();
+
+ // passing the message through sandesha2sender
+ ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+ ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+
+ SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+ SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(), sequenceId);
+ }
+
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::sendAckIfNeeded");
+ }
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Oct 30 07:43:24 2006
@@ -19,8 +19,6 @@
import java.util.Iterator;
-import javax.xml.namespace.QName;
-
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
@@ -31,7 +29,6 @@
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.context.OperationContextFactory;
import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.OutInAxisOperation;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.util.Utils;
import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
@@ -358,38 +355,18 @@
String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
- // registring an InOutOperationContext for this.
- // since the serviceContext.fireAndForget only sets a inOnly One
- // this does not work when there is a terminateSequnceResponse
- // TODO do processing of terminateMessagesCorrectly., create a new
- // message instead of sendign the one given by the serviceClient
- // TODO important
-
- AxisOperation outInAxisOp = new OutInAxisOperation(new QName("temp"));
-
- AxisOperation referenceInOutOperation = msgContext.getAxisService()
- .getOperation(
- new QName(Sandesha2Constants.RM_IN_OUT_OPERATION_NAME));
- if (referenceInOutOperation == null) {
- String messge = "Cant find the recerence RM InOut operation";
- throw new SandeshaException(messge);
- }
-
- outInAxisOp.setParent(msgContext.getAxisService());
- // setting flows
- // outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation.getRemainingPhasesInFlow());
- outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation
- .getRemainingPhasesInFlow());
-
+ AxisOperation terminateOp = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
+ rmMsgCtx.getRMSpecVersion(),
+ msgContext.getAxisService());
OperationContext opcontext = OperationContextFactory
.createOperationContext(
- WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, outInAxisOp);
+ WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, terminateOp);
opcontext.setParent(msgContext.getServiceContext());
- configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),
- opcontext);
+ configurationContext.registerOperationContext(rmMsgCtx.getMessageId(), opcontext);
msgContext.setOperationContext(opcontext);
- msgContext.setAxisOperation(outInAxisOp);
+ msgContext.setAxisOperation(terminateOp);
if (terminated != null && "true".equals(terminated)) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Mon Oct 30 07:43:24 2006
@@ -1,48 +1,110 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.sandesha2.msgreceivers;
-
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.receivers.AbstractMessageReceiver;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.SandeshaUtil;
-
-/**
-*Currently this is a dummy Msg Receiver.
-*All the necessary RM logic happens at MessageProcessors.
-*This only ensures that the defaults Messsage Receiver does not get called for RM control messages.
-*/
-
-
-public class RMMessageReceiver extends AbstractMessageReceiver {
-
- private static final Log log = LogFactory.getLog(RMMessageReceiver.class.getName());
-
- public final void receive(MessageContext messgeCtx) throws AxisFault {
- log.debug("RM MESSSAGE RECEIVER WAS CALLED");
-
- RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(messgeCtx);
- log.debug("MsgReceiver got type:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
- }
-
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgreceivers;
+
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+*Currently this is a dummy Msg Receiver.
+*All the necessary RM logic happens at MessageProcessors.
+*This only ensures that the defaults Messsage Receiver does not get called for RM control messages.
+*/
+
+
+public class RMMessageReceiver extends AbstractMessageReceiver {
+
+ private static final Log log = LogFactory.getLog(RMMessageReceiver.class.getName());
+
+ public final void receive(MessageContext msgCtx) throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Entry: RMMessageReceiver::receive");
+
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+ if(log.isDebugEnabled()) log.debug("MsgReceiver got type:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
+
+ // Note that some messages (such as stand-alone acks) will be routed here, but
+ // the headers will already have been processed. Therefore we should not assume
+ // that we will have a MsgProcessor every time.
+ MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+ if(msgProcessor != null) {
+ boolean withinTransaction = false;
+ String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+ withinTransaction = true;
+ }
+
+ Transaction transaction = null;
+ if (!withinTransaction) {
+ ConfigurationContext context = msgCtx.getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration()); transaction = storageManager.getTransaction();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+ }
+
+ boolean rolledBack = false;
+
+ try {
+
+ msgProcessor.processInMessage(rmMsgCtx);
+
+ } catch (AxisFault e) {
+ // message should not be sent in a exception situation.
+ msgCtx.pause();
+
+ if (!withinTransaction) {
+ try {
+ transaction.rollback();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolledBack = true;
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+ log.debug(message, e);
+ }
+ }
+
+ throw e;
+ } finally {
+ if (!withinTransaction && !rolledBack) {
+ try {
+ transaction.commit();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
+ }
+ }
+ }
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: RMMessageReceiver::receive");
+ }
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java Mon Oct 30 07:43:24 2006
@@ -1,327 +1,304 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.sandesha2.util;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axiom.soap.SOAPHeader;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.AddressingConstants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.wsrm.AcknowledgementRange;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-
-/**
- * Contains logic for managing acknowledgements.
- */
-
-public class AcknowledgementManager {
-
- private static Log log = LogFactory.getLog(AcknowledgementManager.class);
-
- /**
- * Piggybacks any available acks of the same sequence to the given
- * application message.
- *
- * @param applicationRMMsgContext
- * @throws SandeshaException
- */
- public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager)
- throws SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
-
- ConfigurationContext configurationContext = rmMessageContext.getConfigurationContext();
-
- SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
-
- SenderBean findBean = new SenderBean();
-
- findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
- findBean.setSend(true);
- findBean.setReSend(false);
-
- String carrietTo = rmMessageContext.getTo().getAddress();
-
- Collection collection = retransmitterBeanMgr.find(findBean);
-
- Iterator it = collection.iterator();
-
- piggybackLoop: while (it.hasNext()) {
- SenderBean ackBean = (SenderBean) it.next();
-
- long timeNow = System.currentTimeMillis();
- if (ackBean.getTimeToSend() > timeNow) {
- // //Piggybacking will happen only if the end of ack interval
- // (timeToSend) is not reached.
-
- MessageContext ackMsgContext = storageManager.retrieveMessageContext(ackBean.getMessageContextRefKey(),
- configurationContext);
-
- // wsa:To has to match for piggybacking.
- String to = ackMsgContext.getTo().getAddress();
- if (!carrietTo.equals(to)) {
- continue piggybackLoop;
- }
-
- if (log.isDebugEnabled()) log.debug("Adding ack headers");
-
- // deleting the ack entry.
- retransmitterBeanMgr.delete(ackBean.getMessageID());
-
- // Adding the ack(s) to the application message
- boolean acks = false;
- SOAPHeader appMsgHeaders = rmMessageContext.getMessageContext().getEnvelope().getHeader();
-
- SOAPHeader headers = ackMsgContext.getEnvelope().getHeader();
- if(headers != null) {
- for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
-
- QName name = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK);
- Iterator iter = headers.getChildrenWithName(name);
- while(iter.hasNext()) {
- OMElement ackElement = (OMElement) iter.next();
-
- SequenceAcknowledgement sequenceAcknowledgement = new SequenceAcknowledgement (Sandesha2Constants.SPEC_NS_URIS[i]);
- sequenceAcknowledgement.fromOMElement(ackElement);
-
- sequenceAcknowledgement.toOMElement(appMsgHeaders);
- acks = true;
- }
- }
- }
-
- if (!acks) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckMessageEntry,
- ackMsgContext.getEnvelope().toString());
- log.debug(message);
- throw new SandeshaException(message);
- }
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
- }
-
- /**
- * this is used to get the acked messages of a sequence. If this is an
- * outgoing message the sequenceIdentifier should be the internal
- * sequenceID.
- *
- * @param sequenceIdentifier
- * @param outGoingMessage
- * @return
- */
- public static ArrayList getClientCompletedMessagesList(String sequenceID, SequencePropertyBeanMgr seqPropMgr)
- throws SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter: AcknowledgementManager::getClientCompletedMessagesList");
-
- // first trying to get it from the internal sequence id.
- SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(sequenceID,
- Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
- String internalSequenceID = null;
- if (internalSequenceBean != null)
- internalSequenceID = internalSequenceBean.getValue();
-
- SequencePropertyBean completedMessagesBean = null;
- if (internalSequenceID != null)
- completedMessagesBean = seqPropMgr.retrieve(internalSequenceID,
- Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
-
- if (completedMessagesBean == null)
- completedMessagesBean = seqPropMgr.retrieve(sequenceID,
- Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
-
- ArrayList completedMsgList = null;
- if (completedMessagesBean != null) {
- completedMsgList = SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
- } else {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull, sequenceID);
- SandeshaException e = new SandeshaException(message);
- if(log.isDebugEnabled()) log.debug("Throwing exception", e);
- throw e;
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementManager::getClientCompletedMessagesList");
- return completedMsgList;
- }
-
- public static ArrayList getServerCompletedMessagesList(String sequenceID, SequencePropertyBeanMgr seqPropMgr)
- throws SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter: AcknowledgementManager::getServerCompletedMessagesList");
-
- SequencePropertyBean completedMessagesBean = null;
-
- completedMessagesBean = seqPropMgr.retrieve(sequenceID,
- Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-
- ArrayList completedMsgList = null;
- if (completedMessagesBean != null) {
- completedMsgList = SandeshaUtil.getArrayListFromMsgsString(completedMessagesBean.getValue());
- } else {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull, sequenceID);
- SandeshaException e = new SandeshaException(message);
- if(log.isDebugEnabled()) log.debug("Throwing exception", e);
- throw e;
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementManager::getServerCompletedMessagesList");
- return completedMsgList;
- }
-
- public static RMMsgContext generateAckMessage(RMMsgContext referenceRMMessage, String sequencePropertyKey ,String sequenceId,
- StorageManager storageManager) throws AxisFault {
- if (log.isDebugEnabled())
- log.debug("Enter: AcknowledgementManager::generateAckMessage");
-
- MessageContext referenceMsg = referenceRMMessage.getMessageContext();
-
- ConfigurationContext configurationContext = referenceRMMessage.getMessageContext().getConfigurationContext();
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
- // Setting the ack depending on AcksTo.
- SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
-
- EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
- String acksToStr = acksTo.getAddress();
-
- if (acksToStr == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
-
- AxisOperation ackOperation = null;
-
- ackOperation = AxisOperationFactory.getOperationDescription(WSDL20_2004Constants.MEP_URI_IN_ONLY);
-
- AxisOperation rmMsgOperation = referenceRMMessage.getMessageContext().getAxisOperation();
- if (rmMsgOperation != null) {
- ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
- if (outFlow != null) {
- ackOperation.setPhasesOutFlow(outFlow);
- ackOperation.setPhasesOutFaultFlow(outFlow);
- }
- }
-
- MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
- ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, referenceMsg
- .getProperty(AddressingConstants.WS_ADDRESSING_VERSION)); // TODO
- // do
- // this
- // in
- // the
- // RMMsgCreator
-
- ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
- ackRMMsgCtx.setRMNamespaceValue(referenceRMMessage.getRMNamespaceValue());
-
- ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
-
- SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
- .getSOAPVersion(referenceMsg.getEnvelope()));
-
- // Setting new envelope
- SOAPEnvelope envelope = factory.getDefaultEnvelope();
-
- ackMsgCtx.setEnvelope(envelope);
-
- ackMsgCtx.setTo(acksTo);
-
- // adding the SequenceAcknowledgement part.
- RMMsgCreator.addAckMessage(ackRMMsgCtx, sequencePropertyKey ,sequenceId, storageManager);
-
- ackMsgCtx.setProperty(MessageContext.TRANSPORT_IN, null);
-
- String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
-
- ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, addressingNamespaceURI);
-
- if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementManager::generateAckMessage");
- return ackRMMsgCtx;
- }
-
- public static boolean verifySequenceCompletion(Iterator ackRangesIterator, long lastMessageNo) {
- if (log.isDebugEnabled())
- log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
-
- HashMap startMap = new HashMap();
-
- while (ackRangesIterator.hasNext()) {
- AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator.next();
- startMap.put(new Long(temp.getLowerValue()), temp);
- }
-
- long start = 1;
- boolean result = false;
- while (!result) {
- AcknowledgementRange temp = (AcknowledgementRange) startMap.get(new Long(start));
- if (temp == null) {
- break;
- }
-
- if (temp.getUpperValue() >= lastMessageNo)
- result = true;
-
- start = temp.getUpperValue() + 1;
- }
-
- if (log.isDebugEnabled())
- log.debug("Enter: AcknowledgementManager::verifySequenceCompletion " + result);
- return result;
- }
-
- public static void addFinalAcknowledgement () {
-
- }
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.wsrm.AcknowledgementRange;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Contains logic for managing acknowledgements.
+ */
+
+public class AcknowledgementManager {
+
+ private static Log log = LogFactory.getLog(AcknowledgementManager.class);
+
+ /**
+ * Piggybacks any available acks of the same sequence to the given
+ * application message.
+ *
+ * @param applicationRMMsgContext
+ * @throws SandeshaException
+ */
+ public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager)
+ throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
+
+ ConfigurationContext configurationContext = rmMessageContext.getConfigurationContext();
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
+
+ SenderBean findBean = new SenderBean();
+
+ findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+ findBean.setSend(true);
+ findBean.setReSend(false);
+
+ String carrietTo = rmMessageContext.getTo().getAddress();
+
+ Collection collection = retransmitterBeanMgr.find(findBean);
+
+ Iterator it = collection.iterator();
+
+ piggybackLoop: while (it.hasNext()) {
+ SenderBean ackBean = (SenderBean) it.next();
+
+ long timeNow = System.currentTimeMillis();
+ if (ackBean.getTimeToSend() > timeNow) {
+ // //Piggybacking will happen only if the end of ack interval
+ // (timeToSend) is not reached.
+
+ MessageContext ackMsgContext = storageManager.retrieveMessageContext(ackBean.getMessageContextRefKey(),
+ configurationContext);
+
+ // wsa:To has to match for piggybacking.
+ String to = ackMsgContext.getTo().getAddress();
+ if (!carrietTo.equals(to)) {
+ continue piggybackLoop;
+ }
+
+ if (log.isDebugEnabled()) log.debug("Adding ack headers");
+
+ // deleting the ack entry.
+ retransmitterBeanMgr.delete(ackBean.getMessageID());
+
+ // Adding the ack(s) to the application message
+ boolean acks = false;
+ SOAPHeader appMsgHeaders = rmMessageContext.getMessageContext().getEnvelope().getHeader();
+
+ SOAPHeader headers = ackMsgContext.getEnvelope().getHeader();
+ if(headers != null) {
+ for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
+
+ QName name = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK);
+ Iterator iter = headers.getChildrenWithName(name);
+ while(iter.hasNext()) {
+ OMElement ackElement = (OMElement) iter.next();
+
+ SequenceAcknowledgement sequenceAcknowledgement = new SequenceAcknowledgement (Sandesha2Constants.SPEC_NS_URIS[i]);
+ sequenceAcknowledgement.fromOMElement(ackElement);
+
+ sequenceAcknowledgement.toOMElement(appMsgHeaders);
+ acks = true;
+ }
+ }
+ }
+
+ if (!acks) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckMessageEntry,
+ ackMsgContext.getEnvelope().toString());
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
+ }
+
+ /**
+ * this is used to get the acked messages of a sequence. If this is an
+ * outgoing message the sequenceIdentifier should be the internal
+ * sequenceID.
+ *
+ * @param sequenceIdentifier
+ * @param outGoingMessage
+ * @return
+ */
+ public static ArrayList getClientCompletedMessagesList(String sequenceID, SequencePropertyBeanMgr seqPropMgr)
+ throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::getClientCompletedMessagesList");
+
+ // first trying to get it from the internal sequence id.
+ SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ String internalSequenceID = null;
+ if (internalSequenceBean != null)
+ internalSequenceID = internalSequenceBean.getValue();
+
+ SequencePropertyBean completedMessagesBean = null;
+ if (internalSequenceID != null)
+ completedMessagesBean = seqPropMgr.retrieve(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+
+ if (completedMessagesBean == null)
+ completedMessagesBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+
+ ArrayList completedMsgList = null;
+ if (completedMessagesBean != null) {
+ completedMsgList = SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
+ } else {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull, sequenceID);
+ SandeshaException e = new SandeshaException(message);
+ if(log.isDebugEnabled()) log.debug("Throwing exception", e);
+ throw e;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::getClientCompletedMessagesList");
+ return completedMsgList;
+ }
+
+ public static ArrayList getServerCompletedMessagesList(String sequenceID, SequencePropertyBeanMgr seqPropMgr)
+ throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::getServerCompletedMessagesList");
+
+ SequencePropertyBean completedMessagesBean = null;
+
+ completedMessagesBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+
+ ArrayList completedMsgList = null;
+ if (completedMessagesBean != null) {
+ completedMsgList = SandeshaUtil.getArrayListFromMsgsString(completedMessagesBean.getValue());
+ } else {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull, sequenceID);
+ SandeshaException e = new SandeshaException(message);
+ if(log.isDebugEnabled()) log.debug("Throwing exception", e);
+ throw e;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::getServerCompletedMessagesList");
+ return completedMsgList;
+ }
+
+ public static RMMsgContext generateAckMessage(RMMsgContext referenceRMMessage, String sequencePropertyKey ,String sequenceId,
+ StorageManager storageManager) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::generateAckMessage");
+
+ MessageContext referenceMsg = referenceRMMessage.getMessageContext();
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+
+ // Setting the ack depending on AcksTo.
+ SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+ EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
+ String acksToStr = acksTo.getAddress();
+
+ if (acksToStr == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
+
+ AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.ACK,
+ referenceRMMessage.getRMSpecVersion(),
+ referenceMsg.getAxisService());
+
+ MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
+ ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+ ackRMMsgCtx.setRMNamespaceValue(referenceRMMessage.getRMNamespaceValue());
+
+ ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+ .getSOAPVersion(referenceMsg.getEnvelope()));
+
+ // Setting new envelope
+ SOAPEnvelope envelope = factory.getDefaultEnvelope();
+
+ ackMsgCtx.setEnvelope(envelope);
+
+ ackMsgCtx.setTo(acksTo);
+
+ // adding the SequenceAcknowledgement part.
+ RMMsgCreator.addAckMessage(ackRMMsgCtx, sequencePropertyKey ,sequenceId, storageManager);
+
+ String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
+
+ ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, addressingNamespaceURI);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::generateAckMessage");
+ return ackRMMsgCtx;
+ }
+
+ public static boolean verifySequenceCompletion(Iterator ackRangesIterator, long lastMessageNo) {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
+
+ HashMap startMap = new HashMap();
+
+ while (ackRangesIterator.hasNext()) {
+ AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator.next();
+ startMap.put(new Long(temp.getLowerValue()), temp);
+ }
+
+ long start = 1;
+ boolean result = false;
+ while (!result) {
+ AcknowledgementRange temp = (AcknowledgementRange) startMap.get(new Long(start));
+ if (temp == null) {
+ break;
+ }
+
+ if (temp.getUpperValue() >= lastMessageNo)
+ result = true;
+
+ start = temp.getUpperValue() + 1;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::verifySequenceCompletion " + result);
+ return result;
+ }
+
+ public static void addFinalAcknowledgement () {
+
+ }
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java Mon Oct 30 07:43:24 2006
@@ -33,18 +33,8 @@
import org.apache.axiom.soap.SOAPFaultSubCode;
import org.apache.axiom.soap.SOAPFaultText;
import org.apache.axiom.soap.SOAPFaultValue;
-import org.apache.axiom.soap.impl.dom.SOAPTextImpl;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.ServiceContext;
-import org.apache.axis2.context.ServiceGroupContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.util.Utils;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.FaultData;
@@ -97,7 +87,6 @@
if (createSequence == null)
throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqParts));
- ConfigurationContext context = createSequenceMessage.getConfigurationContext();
if (storageManager == null)
throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotGetStorageManager));
@@ -146,7 +135,6 @@
long messageNumber = sequence.getMessageNumber().getMessageNumber();
String sequenceID = sequence.getIdentifier().getIdentifier();
- ConfigurationContext configCtx = applicationRMMessage.getMessageContext().getConfigurationContext();
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
boolean lastMessageNumberExceeded = false;
@@ -214,7 +202,6 @@
MessageContext messageContext = rmMessageContext.getMessageContext();
CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
- int type = rmMessageContext.getMessageType();
boolean validSequence = false;
@@ -367,7 +354,6 @@
log.debug("Enter: FaultManager::checkForSequenceClosed, " + sequenceID);
MessageContext referenceMessage = referenceRMMessage.getMessageContext();
- ConfigurationContext configCtx = referenceMessage.getConfigurationContext();
SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org