You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/10/30 16:43:25 UTC

svn commit: r469167 [4/7] - in /webservices/sandesha/trunk/java: config/ interop/conf/ src/org/apache/sandesha2/ src/org/apache/sandesha2/client/ src/org/apache/sandesha2/handlers/ src/org/apache/sandesha2/i18n/ src/org/apache/sandesha2/msgprocessors/ ...

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Oct 30 07:43:24 2006
@@ -17,7 +17,6 @@
 
 package org.apache.sandesha2.msgprocessors;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
@@ -35,9 +34,7 @@
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisOperationFactory;
 import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
@@ -68,11 +65,11 @@
 
 	private static final Log log = LogFactory.getLog(AckRequestedProcessor.class);
 
-	public boolean processAckRequestedHeaders(MessageContext message) throws AxisFault {
+	public boolean processAckRequestedHeaders(RMMsgContext message) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AckRequestedProcessor::processAckRequestHeaders");
 
-		SOAPEnvelope envelope = message.getEnvelope();
+		SOAPEnvelope envelope = message.getMessageContext().getEnvelope();
 		SOAPHeader header = envelope.getHeader();
 		boolean msgCtxPaused = false;
 		if(header!=null)
@@ -107,18 +104,13 @@
 	 * @return true if the msg context was paused
 	 * @throws AxisFault
 	 */
-	public boolean processAckRequestedHeader(MessageContext msgContext, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
+	public boolean processAckRequestedHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AckRequestedProcessor::processAckRequestedHeader " + soapHeader);
 
-		// TODO: Note that this RMMessageContext is not really any use - but we need to create it
-		// so that it can be passed to the fault handling chain. It's really no more than a
-		// container for the correct addressing and RM spec levels, so we'd be better off passing
-		// them in directly. Unfortunately that change ripples through the codebase...
-		RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgContext);
-
 		String sequenceId = ackRequested.getIdentifier().getIdentifier();
 
+		MessageContext msgContext = rmMsgCtx.getMessageContext();
 		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
 
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
@@ -142,6 +134,8 @@
 		// Setting the ack depending on AcksTo.
 		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequencePropertyKey,
 				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+		SequencePropertyBean versionBean = seqPropMgr.retrieve(sequencePropertyKey,
+				Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
 
 		EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
 		String acksToStr = acksTo.getAddress();
@@ -149,24 +143,10 @@
 		if (acksToStr == null)
 			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
 
-		AxisOperation ackOperation = null;
-
-		try {
-			ackOperation = AxisOperationFactory.getOperationDescription(WSDL20_2004Constants.MEP_URI_IN_ONLY);
-		} catch (AxisFault e) {
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisOperationError, e
-					.toString()));
-		}
-
-		AxisOperation rmMsgOperation = rmMsgCtx.getMessageContext().getAxisOperation();
-		if (rmMsgOperation != null) {
-			ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
-			if (outFlow != null) {
-				ackOperation.setPhasesOutFlow(outFlow);
-				ackOperation.setPhasesOutFaultFlow(outFlow);
-			}
-		}
-
+		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.ACK,
+				versionBean.getValue(),
+				msgContext.getAxisService());
 		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
 
 		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
@@ -210,14 +190,8 @@
 			if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
 				// operation context will be null when doing in a GLOBAL
 				// handler.
-				try {
-					AxisOperation op = AxisOperationFactory.getAxisOperation(WSDL20_2004Constants.MEP_CONSTANT_IN_OUT);
-					OperationContext opCtx = new OperationContext(op);
-					rmMsgCtx.getMessageContext().setAxisOperation(op);
-					rmMsgCtx.getMessageContext().setOperationContext(opCtx);
-				} catch (AxisFault e2) {
-					throw new SandeshaException(e2.getMessage());
-				}
+				OperationContext opCtx = new OperationContext(ackOperation);
+				rmMsgCtx.getMessageContext().setOperationContext(opCtx);
 			}
 
 			rmMsgCtx.getMessageContext().getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Oct 30 07:43:24 2006
@@ -45,10 +45,8 @@
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
-import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
 import org.apache.sandesha2.wsrm.AcknowledgementRange;
 import org.apache.sandesha2.wsrm.Nack;
@@ -64,16 +62,14 @@
 
 	/**
 	 * @param message
-	 * @return true if the msg context was paused
 	 * @throws AxisFault
 	 */
-	public boolean processAckHeaders(MessageContext message) throws AxisFault {
+	public void processAckHeaders(RMMsgContext message) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementProcessor::processAckHeaders");
 
-		SOAPEnvelope envelope = message.getEnvelope();
+		SOAPEnvelope envelope = message.getMessageContext().getEnvelope();
 		SOAPHeader header = envelope.getHeader();
-		boolean returnValue = false;
 		if(header!=null)
 		{
 			for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
@@ -84,41 +80,28 @@
 					OMElement ack = (OMElement) acks.next();
 					SequenceAcknowledgement seqAck = new SequenceAcknowledgement(headerName.getNamespaceURI());
 					seqAck.fromOMElement(ack);
-					boolean ackPaused = processAckHeader(message, ack, seqAck);
-					//if not already paused we might be now
-					if(!returnValue){
-						returnValue = ackPaused;
-					}
+					processAckHeader(message, ack, seqAck);
 				}
 			}			
 		}
 
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementProcessor::processAckHeaders " + returnValue);
-		return returnValue;
+			log.debug("Exit: AcknowledgementProcessor::processAckHeaders");
 	}
 	
 	/**
-	 * @param msgCtx
+	 * @param rmMsgCtx
 	 * @param soapHeader
 	 * @param sequenceAck
-	 * @return true if the msg context was paused
 	 * @throws AxisFault
 	 */
-	private boolean processAckHeader(MessageContext msgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
+	private void processAckHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
 		throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementProcessor::processAckHeader " + soapHeader);
 		
-		boolean returnValue = false;
-		
-		// TODO: Note that this RMMessageContext is not really any use - but we need to create it
-		// so that it can be passed to the fault handling chain. It's really no more than a
-		// container for the correct addressing and RM spec levels, so we'd be better off passing
-		// them in directly. Unfortunately that change ripples through the codebase...
-		RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
-
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
 		ConfigurationContext configCtx = msgCtx.getConfigurationContext();
 
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
@@ -260,16 +243,8 @@
 			}
 		}
 
-
-		String action = msgCtx.getOptions().getAction();
-		if (action!=null && action.equals(SpecSpecificConstants.getAckRequestAction(rmMsgCtx.getRMSpecVersion()))) {
-			returnValue = true;
-			rmMsgCtx.pause();
-		}
-
 		if (log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementProcessor::processAckHeader " + returnValue);
-		return returnValue;
+			log.debug("Exit: AcknowledgementProcessor::processAckHeader");
 	}
 
 	private SenderBean getRetransmitterEntry(Collection collection, long msgNo) {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Oct 30 07:43:24 2006
@@ -17,28 +17,16 @@
 
 package org.apache.sandesha2.msgprocessors;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPBody;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.context.OperationContextFactory;
 import org.apache.axis2.context.ServiceContext;
-import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
@@ -52,17 +40,11 @@
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.InvokerBean;
-import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SOAPAbstractFactory;
@@ -88,385 +70,13 @@
 
 	
 	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
-		if (log.isDebugEnabled())
+		if (log.isDebugEnabled()) {
 			log.debug("Enter: ApplicationMsgProcessor::processInMessage");
-
-		boolean msgCtxPaused = false;
-		
-		// Processing the application message.
-		MessageContext msgCtx = rmMsgCtx.getMessageContext();
-		if (msgCtx == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgContextNotSetInbound);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
-				&& rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
-			return msgCtxPaused;
-		}
-
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
-		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-		String sequenceId = sequence.getIdentifier().getIdentifier();
-		
-		String propertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-		
-		// Check that both the Sequence header and message body have been secured properly
-		SequencePropertyBean tokenBean = seqPropMgr.retrieve(propertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
-		if(tokenBean != null) {
-			SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
-			
-			QName seqName = new QName(rmMsgCtx.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.SEQUENCE);
-			
-			SOAPEnvelope envelope = msgCtx.getEnvelope();
-			OMElement body = envelope.getBody();
-			OMElement seqHeader = envelope.getHeader().getFirstChildWithName(seqName);
-			
-			SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
-			
-			secManager.checkProofOfPossession(token, seqHeader, msgCtx);
-			secManager.checkProofOfPossession(token, body, msgCtx);
-		}
-		
-		//RM will not send sync responses. If sync acks are there this will be
-		// made true again later.
-		if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
-			rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,
-					Constants.VALUE_FALSE);
+			log.debug("Exit: ApplicationMsgProcessor::processInMessage");
 		}
-		
-		// setting acked msg no range
-		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
-		if (configCtx == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		FaultManager faultManager = new FaultManager();
-		SandeshaException fault = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
-		if (fault != null) {
-			throw fault;
-		}
-
-		// setting mustUnderstand to false.
-		sequence.setMustUnderstand(false);
-		rmMsgCtx.addSOAPEnvelope();
-
-		// throwing a fault if the sequence is closed.
-		fault = faultManager.checkForSequenceClosed(rmMsgCtx, sequenceId, storageManager);
-		if (fault != null) {
-			throw fault;
-		}
-
-		fault = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx, storageManager);
-		if (fault != null) {
-			throw fault;
-		}
-
-		// updating the last activated time of the sequence.
-		SequenceManager.updateLastActivatedTime(propertyKey, storageManager);
-
-		SequencePropertyBean msgsBean = seqPropMgr.retrieve(propertyKey,
-				Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-
-		long msgNo = sequence.getMessageNumber().getMessageNumber();
-		if (msgNo == 0) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
-					.toString(msgNo));
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		String key = SandeshaUtil.getUUID(); // key to store the message.
-
-		// updating the Highest_In_Msg_No property which gives the highest
-		// message number retrieved from this sequence.
-		String highetsInMsgNoStr = SandeshaUtil.getSequenceProperty(propertyKey,
-				Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
-		String highetsInMsgKey = SandeshaUtil.getSequenceProperty(propertyKey,
-				Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
-		if (highetsInMsgKey == null)
-			highetsInMsgKey = SandeshaUtil.getUUID();
-
-		long highestInMsgNo = 0;
-		if (highetsInMsgNoStr != null) {
-			highestInMsgNo = Long.parseLong(highetsInMsgNoStr);
-		}
-
-		if (msgNo > highestInMsgNo) {
-			highestInMsgNo = msgNo;
-
-			String str = new Long(msgNo).toString();
-			SequencePropertyBean highestMsgNoBean = new SequencePropertyBean(propertyKey,
-					Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, str);
-			SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean(propertyKey,
-					Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, highetsInMsgKey);
-
-			// storing the new message as the highest in message.
-			storageManager.removeMessageContext(highetsInMsgKey);
-			storageManager.storeMessageContext(highetsInMsgKey, msgCtx);
-
-			if (highetsInMsgNoStr != null) {
-				seqPropMgr.update(highestMsgNoBean);
-				seqPropMgr.update(highestMsgKeyBean);
-			} else {
-				seqPropMgr.insert(highestMsgNoBean);
-				seqPropMgr.insert(highestMsgKeyBean);
-			}
-		}
-
-		String messagesStr = "";
-		if (msgsBean != null)
-			messagesStr = msgsBean.getValue();
-		else {
-			msgsBean = new SequencePropertyBean();
-			msgsBean.setSequencePropertyKey(propertyKey);
-			msgsBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-			msgsBean.setValue(messagesStr);
-		}
-
-    boolean msgNoPresentInList = msgNoPresentInList(messagesStr, msgNo);
-		
-		if (msgNoPresentInList
-				&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
-			// this is a duplicate message and the invocation type is
-			// EXACTLY_ONCE.
-			rmMsgCtx.pause();
-			msgCtxPaused = true;
-		}
-
-		if (!msgNoPresentInList)
-		{
-			if (messagesStr != null && !"".equals(messagesStr))
-				messagesStr = messagesStr + "," + Long.toString(msgNo);
-			else
-				messagesStr = Long.toString(msgNo);
-	
-			msgsBean.setValue(messagesStr);
-			seqPropMgr.update(msgsBean);
-		}
-
-		// Pause the messages bean if not the right message to invoke.
-		NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
-		NextMsgBean bean = mgr.retrieve(sequenceId);
-
-		if (bean == null) {
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotFindSequence,
-					sequenceId));
-		}
-
-		InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
-
-		// inorder invocation is still a global property
-		boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
-				msgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
-
-
-		//setting properties for the messageContext
-		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
-		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo));
-		
-		if (inOrderInvocation && !msgNoPresentInList) {
-
-			SequencePropertyBean incomingSequenceListBean = seqPropMgr.retrieve(
-					Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
-					Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
-			if (incomingSequenceListBean == null) {
-				ArrayList incomingSequenceList = new ArrayList();
-				incomingSequenceListBean = new SequencePropertyBean();
-				incomingSequenceListBean.setSequencePropertyKey(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
-				incomingSequenceListBean.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-				incomingSequenceListBean.setValue(incomingSequenceList.toString());
-
-				// this get inserted before
-				seqPropMgr.insert(incomingSequenceListBean);
-			}
-
-			ArrayList incomingSequenceList = SandeshaUtil.getArrayListFromString(incomingSequenceListBean.getValue());
-
-			// Adding current sequence to the incoming sequence List.
-			if (!incomingSequenceList.contains(sequenceId)) {
-				incomingSequenceList.add(sequenceId);
-
-				// saving the property.
-				incomingSequenceListBean.setValue(incomingSequenceList.toString());
-				seqPropMgr.update(incomingSequenceListBean);
-			}
-
-			// saving the message.
-			try {
-				storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
-				storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
-
-				// This will avoid performing application processing more
-				// than
-				// once.
-				rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-			} catch (Exception ex) {
-				throw new SandeshaException(ex.getMessage(), ex);
-			}
-
-			// pause the message
-			rmMsgCtx.pause();
-			msgCtxPaused = true;
-
-			// Starting the invoker if stopped.
-			SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), sequenceId);
-
-		}
-
-		// Sending acknowledgements
-		sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: ApplicationMsgProcessor::processInMessage " + msgCtxPaused);
-		return msgCtxPaused;
-	}
-
-	// TODO convert following from INT to LONG
-	private boolean msgNoPresentInList(String list, long no) {
-		String[] msgStrs = list.split(",");
-
-		int l = msgStrs.length;
-
-		for (int i = 0; i < l; i++) {
-			if (msgStrs[i].equals(Long.toString(no)))
-				return true;
-		}
-
 		return false;
 	}
 
-	public void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager)
-			throws AxisFault {
-
-		if (log.isDebugEnabled())
-			log.debug("Enter: ApplicationMsgProcessor::sendAckIfNeeded");
-
-		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-		
-		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-		String sequenceId = sequence.getIdentifier().getIdentifier();
-		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
-		if (configCtx == null)
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
-
-		RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey ,sequenceId, storageManager);
-		MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
-		
-		EndpointReference acksTo = ackRMMsgCtx.getTo();
-		
-		if (SandeshaUtil.isAnonymousURI (acksTo.getAddress())) {
-
-			// AxisEngine engine = new
-			// AxisEngine(ackRMMsgCtx.getMessageContext()
-			// .getConfigurationContext());
-
-			// setting CONTEXT_WRITTEN since acksto is anonymous
-			if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
-				// operation context will be null when doing in a GLOBAL
-				// handler.
-
-				AxisOperation op = AxisOperationFactory.getAxisOperation(WSDL20_2004Constants.MEP_CONSTANT_IN_OUT);
-				OperationContext opCtx = new OperationContext(op);
-				rmMsgCtx.getMessageContext().setAxisOperation(op);
-				rmMsgCtx.getMessageContext().setOperationContext(opCtx);
-			}
-
-			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
-					org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
-
-			rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN, "true");
-
-			ackRMMsgCtx.getMessageContext().setServerSide(true);
-			
-			AxisEngine engine = new AxisEngine(configCtx);
-			engine.send(ackRMMsgCtx.getMessageContext());
-
-		} else {
-
-			// / Transaction asyncAckTransaction =
-			// storageManager.getTransaction();
-
-			SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
-
-			String key = SandeshaUtil.getUUID();
-
-			SenderBean ackBean = new SenderBean();
-			ackBean.setMessageContextRefKey(key);
-			ackBean.setMessageID(ackMsgCtx.getMessageID());
-			ackBean.setReSend(false);
-			ackBean.setSequenceID(sequencePropertyKey);
-			EndpointReference to = ackMsgCtx.getTo();
-			if (to!=null)
-				ackBean.setToAddress(to.getAddress());
-
-			// this will be set to true in the sender.
-			ackBean.setSend(true);
-
-			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
-			ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-			long ackInterval = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation())
-					.getAcknowledgementInterval();
-
-			// Ack will be sent as stand alone, only after the retransmitter
-			// interval.
-			long timeToSend = System.currentTimeMillis() + ackInterval;
-
-			// removing old acks.
-			SenderBean findBean = new SenderBean();
-			findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-
-			// this will be set to true in the sandesha2TransportSender.
-			findBean.setSend(true);
-			findBean.setReSend(false);
-			Collection coll = retransmitterBeanMgr.find(findBean);
-			Iterator it = coll.iterator();
-
-			if (it.hasNext()) {
-				SenderBean oldAckBean = (SenderBean) it.next();
-				timeToSend = oldAckBean.getTimeToSend(); // If there is an
-															// old ack. This ack
-															// will be sent in
-															// the old
-															// timeToSend.
-
-				// removing the retransmitted entry for the oldAck
-				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
-
-				// removing the message store entry for the old ack
-				storageManager.removeMessageContext(oldAckBean.getMessageContextRefKey());
-			}
-
-			ackBean.setTimeToSend(timeToSend);
-			storageManager.storeMessageContext(key, ackMsgCtx);
-
-			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-			
-			// inserting the new ack.
-			retransmitterBeanMgr.insert(ackBean);
-			// / asyncAckTransaction.commit();
-
-			// passing the message through sandesha2sender
-			ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
-			ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
-			
-			SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
-			
-			SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(), sequenceId);
-		}
-		
-		
-		if (log.isDebugEnabled())
-			log.debug("Exit: ApplicationMsgProcessor::sendAckIfNeeded");
-	}
-
 	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
@@ -1227,11 +837,4 @@
 			log.debug("Exit: ApplicationMsgProcessor::setNextMsgNo");
 	}
 	
-	private boolean isWSAAnonymous (String address) {
-		if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
-				AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address))
-			return true;
-		
-		return false;
-	}
 }

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=auto&rev=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Mon Oct 30 07:43:24 2006
@@ -0,0 +1,456 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * Responsible for processing the Sequence header (if present) on an incoming
+ * message.
+ */
+
+public class SequenceProcessor {
+
+	private static final Log log = LogFactory.getLog(SequenceProcessor.class);
+
+	public boolean processSequenceHeader(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: SequenceProcessor::processSequenceHeader");
+		boolean result = false;
+		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+		if(sequence != null) {
+			// This is a reliable message, so hand it on to the main routine
+			result = processReliableMessage(rmMsgCtx);
+		} else {
+			if (log.isDebugEnabled())
+				log.debug("Message does not contain a sequence header");
+		}
+		if (log.isDebugEnabled())
+			log.debug("Exit: SequenceProcessor::processSequenceHeader " + result);
+		return result;
+	}
+	
+	public boolean processReliableMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: SequenceProcessor::processReliableMessage");
+
+		boolean msgCtxPaused = false;
+		
+		if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+				&& rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
+			return msgCtxPaused;
+		}
+
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+		String sequenceId = sequence.getIdentifier().getIdentifier();
+		
+		String propertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+		
+		// Check that both the Sequence header and message body have been secured properly
+		SequencePropertyBean tokenBean = seqPropMgr.retrieve(propertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+		if(tokenBean != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+			
+			QName seqName = new QName(rmMsgCtx.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.SEQUENCE);
+			
+			SOAPEnvelope envelope = msgCtx.getEnvelope();
+			OMElement body = envelope.getBody();
+			OMElement seqHeader = envelope.getHeader().getFirstChildWithName(seqName);
+			
+			SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+			
+			secManager.checkProofOfPossession(token, seqHeader, msgCtx);
+			secManager.checkProofOfPossession(token, body, msgCtx);
+		}
+		
+		//RM will not send sync responses. If sync acks are there this will be
+		// made true again later.
+		if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
+			rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,
+					Constants.VALUE_FALSE);
+		}
+		
+		// setting acked msg no range
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+		if (configCtx == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		FaultManager faultManager = new FaultManager();
+		SandeshaException fault = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
+		if (fault != null) {
+			throw fault;
+		}
+
+		// setting mustUnderstand to false.
+		sequence.setMustUnderstand(false);
+		rmMsgCtx.addSOAPEnvelope();
+
+		// throwing a fault if the sequence is closed.
+		fault = faultManager.checkForSequenceClosed(rmMsgCtx, sequenceId, storageManager);
+		if (fault != null) {
+			throw fault;
+		}
+
+		fault = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx, storageManager);
+		if (fault != null) {
+			throw fault;
+		}
+
+		// updating the last activated time of the sequence.
+		SequenceManager.updateLastActivatedTime(propertyKey, storageManager);
+
+		SequencePropertyBean msgsBean = seqPropMgr.retrieve(propertyKey,
+				Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+
+		long msgNo = sequence.getMessageNumber().getMessageNumber();
+		if (msgNo == 0) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
+					.toString(msgNo));
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		String key = SandeshaUtil.getUUID(); // key to store the message.
+
+		// updating the Highest_In_Msg_No property which gives the highest
+		// message number retrieved from this sequence.
+		String highetsInMsgNoStr = SandeshaUtil.getSequenceProperty(propertyKey,
+				Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
+		String highetsInMsgKey = SandeshaUtil.getSequenceProperty(propertyKey,
+				Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
+		if (highetsInMsgKey == null)
+			highetsInMsgKey = SandeshaUtil.getUUID();
+
+		long highestInMsgNo = 0;
+		if (highetsInMsgNoStr != null) {
+			highestInMsgNo = Long.parseLong(highetsInMsgNoStr);
+		}
+
+		if (msgNo > highestInMsgNo) {
+			highestInMsgNo = msgNo;
+
+			String str = new Long(msgNo).toString();
+			SequencePropertyBean highestMsgNoBean = new SequencePropertyBean(propertyKey,
+					Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, str);
+			SequencePropertyBean highestMsgKeyBean = new SequencePropertyBean(propertyKey,
+					Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, highetsInMsgKey);
+
+			// storing the new message as the highest in message.
+			storageManager.removeMessageContext(highetsInMsgKey);
+			storageManager.storeMessageContext(highetsInMsgKey, msgCtx);
+
+			if (highetsInMsgNoStr != null) {
+				seqPropMgr.update(highestMsgNoBean);
+				seqPropMgr.update(highestMsgKeyBean);
+			} else {
+				seqPropMgr.insert(highestMsgNoBean);
+				seqPropMgr.insert(highestMsgKeyBean);
+			}
+		}
+
+		String messagesStr = "";
+		if (msgsBean != null)
+			messagesStr = msgsBean.getValue();
+		else {
+			msgsBean = new SequencePropertyBean();
+			msgsBean.setSequencePropertyKey(propertyKey);
+			msgsBean.setName(Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+			msgsBean.setValue(messagesStr);
+		}
+
+		boolean msgNoPresentInList = msgNoPresentInList(messagesStr, msgNo);
+		
+		if (msgNoPresentInList
+				&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
+			// this is a duplicate message and the invocation type is
+			// EXACTLY_ONCE.
+			rmMsgCtx.pause();
+			msgCtxPaused = true;
+		}
+
+		if (!msgNoPresentInList)
+		{
+			if (messagesStr != null && !"".equals(messagesStr))
+				messagesStr = messagesStr + "," + Long.toString(msgNo);
+			else
+				messagesStr = Long.toString(msgNo);
+	
+			msgsBean.setValue(messagesStr);
+			seqPropMgr.update(msgsBean);
+		}
+
+		// Pause the messages bean if not the right message to invoke.
+		NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
+		NextMsgBean bean = mgr.retrieve(sequenceId);
+
+		if (bean == null) {
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotFindSequence,
+					sequenceId));
+		}
+
+		InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr();
+
+		// inorder invocation is still a global property
+		boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
+				msgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
+
+
+		//setting properties for the messageContext
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo));
+		
+		if (inOrderInvocation && !msgNoPresentInList) {
+
+			SequencePropertyBean incomingSequenceListBean = seqPropMgr.retrieve(
+					Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+					Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+			if (incomingSequenceListBean == null) {
+				ArrayList incomingSequenceList = new ArrayList();
+				incomingSequenceListBean = new SequencePropertyBean();
+				incomingSequenceListBean.setSequencePropertyKey(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
+				incomingSequenceListBean.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+				incomingSequenceListBean.setValue(incomingSequenceList.toString());
+
+				// this get inserted before
+				seqPropMgr.insert(incomingSequenceListBean);
+			}
+
+			ArrayList incomingSequenceList = SandeshaUtil.getArrayListFromString(incomingSequenceListBean.getValue());
+
+			// Adding current sequence to the incoming sequence List.
+			if (!incomingSequenceList.contains(sequenceId)) {
+				incomingSequenceList.add(sequenceId);
+
+				// saving the property.
+				incomingSequenceListBean.setValue(incomingSequenceList.toString());
+				seqPropMgr.update(incomingSequenceListBean);
+			}
+
+			// saving the message.
+			try {
+				storageManager.storeMessageContext(key, rmMsgCtx.getMessageContext());
+				storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
+
+				// This will avoid performing application processing more
+				// than
+				// once.
+				rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+			} catch (Exception ex) {
+				throw new SandeshaException(ex.getMessage(), ex);
+			}
+
+			// pause the message
+			rmMsgCtx.pause();
+			msgCtxPaused = true;
+
+			// Starting the invoker if stopped.
+			SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), sequenceId);
+
+		}
+
+		// Sending acknowledgements
+		sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: SequenceProcessor::processInMessage " + msgCtxPaused);
+		return msgCtxPaused;
+	}
+
+	// TODO convert following from INT to LONG
+	private boolean msgNoPresentInList(String list, long no) {
+		String[] msgStrs = list.split(",");
+
+		int l = msgStrs.length;
+
+		for (int i = 0; i < l; i++) {
+			if (msgStrs[i].equals(Long.toString(no)))
+				return true;
+		}
+
+		return false;
+	}
+
+	public void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager)
+			throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: SequenceProcessor::sendAckIfNeeded");
+
+		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+		
+		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+		String sequenceId = sequence.getIdentifier().getIdentifier();
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+		if (configCtx == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+			if(log.isDebugEnabled()) log.debug(message);
+			throw new SandeshaException(message);
+		}
+		
+		RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey, sequenceId, storageManager);
+		MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+		
+		EndpointReference acksTo = ackRMMsgCtx.getTo();
+		
+		if (SandeshaUtil.isAnonymousURI (acksTo.getAddress())) {
+
+			// setting CONTEXT_WRITTEN since acksto is anonymous
+			if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
+				// operation context will be null when doing in a GLOBAL
+				// handler.
+				AxisOperation op = ackMsgCtx.getAxisOperation();
+				OperationContext opCtx = new OperationContext(op);
+				rmMsgCtx.getMessageContext().setOperationContext(opCtx);
+			}
+
+			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+					org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+
+			rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN, "true");
+
+			ackRMMsgCtx.getMessageContext().setServerSide(true);
+			
+			AxisEngine engine = new AxisEngine(configCtx);
+			engine.send(ackRMMsgCtx.getMessageContext());
+
+		} else {
+
+			// / Transaction asyncAckTransaction =
+			// storageManager.getTransaction();
+
+			SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
+
+			String key = SandeshaUtil.getUUID();
+
+			SenderBean ackBean = new SenderBean();
+			ackBean.setMessageContextRefKey(key);
+			ackBean.setMessageID(ackMsgCtx.getMessageID());
+			ackBean.setReSend(false);
+			ackBean.setSequenceID(sequencePropertyKey);
+			EndpointReference to = ackMsgCtx.getTo();
+			if (to!=null)
+				ackBean.setToAddress(to.getAddress());
+
+			// this will be set to true in the sender.
+			ackBean.setSend(true);
+
+			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+			ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+			long ackInterval = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation())
+					.getAcknowledgementInterval();
+
+			// Ack will be sent as stand alone, only after the retransmitter
+			// interval.
+			long timeToSend = System.currentTimeMillis() + ackInterval;
+
+			// removing old acks.
+			SenderBean findBean = new SenderBean();
+			findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+			// this will be set to true in the sandesha2TransportSender.
+			findBean.setSend(true);
+			findBean.setReSend(false);
+			Collection coll = retransmitterBeanMgr.find(findBean);
+			Iterator it = coll.iterator();
+
+			if (it.hasNext()) {
+				SenderBean oldAckBean = (SenderBean) it.next();
+				timeToSend = oldAckBean.getTimeToSend(); // If there is an
+															// old ack. This ack
+															// will be sent in
+															// the old
+															// timeToSend.
+
+				// removing the retransmitted entry for the oldAck
+				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+
+				// removing the message store entry for the old ack
+				storageManager.removeMessageContext(oldAckBean.getMessageContextRefKey());
+			}
+
+			ackBean.setTimeToSend(timeToSend);
+			storageManager.storeMessageContext(key, ackMsgCtx);
+
+			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+			
+			// inserting the new ack.
+			retransmitterBeanMgr.insert(ackBean);
+			// / asyncAckTransaction.commit();
+
+			// passing the message through sandesha2sender
+			ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+			ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+			
+			SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+			SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(), sequenceId);
+		}
+		
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: SequenceProcessor::sendAckIfNeeded");
+	}
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Oct 30 07:43:24 2006
@@ -19,8 +19,6 @@
 
 import java.util.Iterator;
 
-import javax.xml.namespace.QName;
-
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
@@ -31,7 +29,6 @@
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.context.OperationContextFactory;
 import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.OutInAxisOperation;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.util.Utils;
 import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
@@ -358,38 +355,18 @@
 		String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
 				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
 
-		// registring an InOutOperationContext for this.
-		// since the serviceContext.fireAndForget only sets a inOnly One
-		// this does not work when there is a terminateSequnceResponse
-		// TODO do processing of terminateMessagesCorrectly., create a new
-		// message instead of sendign the one given by the serviceClient
-		// TODO important
-
-		AxisOperation outInAxisOp = new OutInAxisOperation(new QName("temp"));
-
-		AxisOperation referenceInOutOperation = msgContext.getAxisService()
-				.getOperation(
-						new QName(Sandesha2Constants.RM_IN_OUT_OPERATION_NAME));
-		if (referenceInOutOperation == null) {
-			String messge = "Cant find the recerence RM InOut operation";
-			throw new SandeshaException(messge);
-		}
-
-		outInAxisOp.setParent(msgContext.getAxisService());
-		// setting flows
-		// outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation.getRemainingPhasesInFlow());
-		outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation
-				.getRemainingPhasesInFlow());
-
+		AxisOperation terminateOp = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
+				rmMsgCtx.getRMSpecVersion(),
+				msgContext.getAxisService());
 		OperationContext opcontext = OperationContextFactory
 				.createOperationContext(
-						WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, outInAxisOp);
+						WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, terminateOp);
 		opcontext.setParent(msgContext.getServiceContext());
-		configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),
-				opcontext);
+		configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),	opcontext);
 
 		msgContext.setOperationContext(opcontext);
-		msgContext.setAxisOperation(outInAxisOp);
+		msgContext.setAxisOperation(terminateOp);
 		
 		if (terminated != null && "true".equals(terminated)) {
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Mon Oct 30 07:43:24 2006
@@ -1,48 +1,110 @@
-/*
- * Copyright  1999-2004 The Apache Software Foundation.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.sandesha2.msgreceivers;
-
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.receivers.AbstractMessageReceiver;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.SandeshaUtil;
-
-/**
-*Currently this is a dummy Msg Receiver.
-*All the necessary RM logic happens at MessageProcessors.
-*This only ensures that the defaults Messsage Receiver does not get called for RM control messages.
-*/
-
-
-public class RMMessageReceiver extends AbstractMessageReceiver {
-
-	private static final Log log = LogFactory.getLog(RMMessageReceiver.class.getName());
-	
-	public final void receive(MessageContext messgeCtx) throws AxisFault {
-		log.debug("RM MESSSAGE RECEIVER WAS CALLED");
-		
-		RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(messgeCtx);
-		log.debug("MsgReceiver got type:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));	
-	}
-	
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgreceivers;
+
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+*Currently this is a dummy Msg Receiver.
+*All the necessary RM logic happens at MessageProcessors.
+*This only ensures that the defaults Messsage Receiver does not get called for RM control messages.
+*/
+
+
+public class RMMessageReceiver extends AbstractMessageReceiver {
+
+	private static final Log log = LogFactory.getLog(RMMessageReceiver.class.getName());
+	
+	public final void receive(MessageContext msgCtx) throws AxisFault {
+		if(log.isDebugEnabled()) log.debug("Entry: RMMessageReceiver::receive");
+		
+		RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+		if(log.isDebugEnabled()) log.debug("MsgReceiver got type:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));	
+
+		// Note that some messages (such as stand-alone acks) will be routed here, but
+		// the headers will already have been processed. Therefore we should not assume
+		// that we will have a MsgProcessor every time.
+		MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+		if(msgProcessor != null) {
+			boolean withinTransaction = false;
+			String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+			if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+				withinTransaction = true;
+			}
+
+			Transaction transaction = null;
+			if (!withinTransaction) {
+				ConfigurationContext context = msgCtx.getConfigurationContext();
+				StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());				transaction = storageManager.getTransaction();
+				msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+			}
+			
+			boolean rolledBack = false;
+			
+			try {
+
+				msgProcessor.processInMessage(rmMsgCtx);
+
+			} catch (AxisFault e) {
+				// message should not be sent in a exception situation.
+				msgCtx.pause();
+	
+				if (!withinTransaction) {
+					try {
+						transaction.rollback();
+						msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+						rolledBack = true;
+					} catch (Exception e1) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+						log.debug(message, e);
+					}
+				}
+	
+				throw e;
+			} finally {
+				if (!withinTransaction && !rolledBack) {
+					try {
+						transaction.commit();
+						msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+					} catch (Exception e) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+						log.debug(message, e);
+					}
+				}
+			}
+		}	
+
+		if(log.isDebugEnabled()) log.debug("Exit: RMMessageReceiver::receive");
+	}
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java Mon Oct 30 07:43:24 2006
@@ -1,327 +1,304 @@
-/*
- * Copyright  1999-2004 The Apache Software Foundation.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.sandesha2.util;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axiom.soap.SOAPHeader;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.AddressingConstants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.wsrm.AcknowledgementRange;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-
-/**
- * Contains logic for managing acknowledgements.
- */
-
-public class AcknowledgementManager {
-
-	private static Log log = LogFactory.getLog(AcknowledgementManager.class);
-
-	/**
-	 * Piggybacks any available acks of the same sequence to the given
-	 * application message.
-	 * 
-	 * @param applicationRMMsgContext
-	 * @throws SandeshaException
-	 */
-	public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager)
-			throws SandeshaException {
-		if (log.isDebugEnabled())
-			log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
-
-		ConfigurationContext configurationContext = rmMessageContext.getConfigurationContext();
-
-		SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
-
-		SenderBean findBean = new SenderBean();
-
-		findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-		findBean.setSend(true);
-		findBean.setReSend(false);
-
-		String carrietTo = rmMessageContext.getTo().getAddress();
-
-		Collection collection = retransmitterBeanMgr.find(findBean);
-
-		Iterator it = collection.iterator();
-
-		piggybackLoop: while (it.hasNext()) {
-			SenderBean ackBean = (SenderBean) it.next();
-
-			long timeNow = System.currentTimeMillis();
-			if (ackBean.getTimeToSend() > timeNow) {
-				// //Piggybacking will happen only if the end of ack interval
-				// (timeToSend) is not reached.
-
-				MessageContext ackMsgContext = storageManager.retrieveMessageContext(ackBean.getMessageContextRefKey(),
-						configurationContext);
-
-				// wsa:To has to match for piggybacking.
-				String to = ackMsgContext.getTo().getAddress();
-				if (!carrietTo.equals(to)) {
-					continue piggybackLoop;
-				}
-
-				if (log.isDebugEnabled()) log.debug("Adding ack headers");
-
-				// deleting the ack entry.
-				retransmitterBeanMgr.delete(ackBean.getMessageID());
-
-				// Adding the ack(s) to the application message
-				boolean acks = false;
-				SOAPHeader appMsgHeaders = rmMessageContext.getMessageContext().getEnvelope().getHeader();
-				
-				SOAPHeader headers = ackMsgContext.getEnvelope().getHeader();
-				if(headers != null) {
-					for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
-
-						QName name = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK);
-						Iterator iter = headers.getChildrenWithName(name);
-						while(iter.hasNext()) {
-							OMElement ackElement = (OMElement) iter.next();
-
-							SequenceAcknowledgement sequenceAcknowledgement = new SequenceAcknowledgement (Sandesha2Constants.SPEC_NS_URIS[i]);
-							sequenceAcknowledgement.fromOMElement(ackElement);
-							
-							sequenceAcknowledgement.toOMElement(appMsgHeaders);
-							acks = true;
-						}
-					}
-				}
-				
-				if (!acks) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckMessageEntry,
-							ackMsgContext.getEnvelope().toString());
-					log.debug(message);
-					throw new SandeshaException(message);
-				}
-			}
-		}
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
-	}
-
-	/**
-	 * this is used to get the acked messages of a sequence. If this is an
-	 * outgoing message the sequenceIdentifier should be the internal
-	 * sequenceID.
-	 * 
-	 * @param sequenceIdentifier
-	 * @param outGoingMessage
-	 * @return
-	 */
-	public static ArrayList getClientCompletedMessagesList(String sequenceID, SequencePropertyBeanMgr seqPropMgr)
-			throws SandeshaException {
-		if (log.isDebugEnabled())
-			log.debug("Enter: AcknowledgementManager::getClientCompletedMessagesList");
-
-		// first trying to get it from the internal sequence id.
-		SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(sequenceID,
-				Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-		String internalSequenceID = null;
-		if (internalSequenceBean != null)
-			internalSequenceID = internalSequenceBean.getValue();
-
-		SequencePropertyBean completedMessagesBean = null;
-		if (internalSequenceID != null)
-			completedMessagesBean = seqPropMgr.retrieve(internalSequenceID,
-					Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
-
-		if (completedMessagesBean == null)
-			completedMessagesBean = seqPropMgr.retrieve(sequenceID,
-					Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
-
-		ArrayList completedMsgList = null;
-		if (completedMessagesBean != null) {
-			completedMsgList = SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
-		} else {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull, sequenceID);
-			SandeshaException e = new SandeshaException(message);
-			if(log.isDebugEnabled()) log.debug("Throwing exception", e);
-			throw e;
-		}
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementManager::getClientCompletedMessagesList");
-		return completedMsgList;
-	}
-
-	public static ArrayList getServerCompletedMessagesList(String sequenceID, SequencePropertyBeanMgr seqPropMgr)
-			throws SandeshaException {
-		if (log.isDebugEnabled())
-			log.debug("Enter: AcknowledgementManager::getServerCompletedMessagesList");
-
-		SequencePropertyBean completedMessagesBean = null;
-
-		completedMessagesBean = seqPropMgr.retrieve(sequenceID,
-				Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-
-		ArrayList completedMsgList = null;
-		if (completedMessagesBean != null) {
-			completedMsgList = SandeshaUtil.getArrayListFromMsgsString(completedMessagesBean.getValue());
-		} else {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull, sequenceID);
-			SandeshaException e = new SandeshaException(message);
-			if(log.isDebugEnabled()) log.debug("Throwing exception", e);
-			throw e;
-		}
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementManager::getServerCompletedMessagesList");
-		return completedMsgList;
-	}
-
-	public static RMMsgContext generateAckMessage(RMMsgContext referenceRMMessage, String sequencePropertyKey ,String sequenceId,
-			StorageManager storageManager) throws AxisFault {
-		if (log.isDebugEnabled())
-			log.debug("Enter: AcknowledgementManager::generateAckMessage");
-
-		MessageContext referenceMsg = referenceRMMessage.getMessageContext();
-
-		ConfigurationContext configurationContext = referenceRMMessage.getMessageContext().getConfigurationContext();
-		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
-		// Setting the ack depending on AcksTo.
-		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequencePropertyKey,
-				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
-
-		EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
-		String acksToStr = acksTo.getAddress();
-
-		if (acksToStr == null)
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
-
-		AxisOperation ackOperation = null;
-
-		ackOperation = AxisOperationFactory.getOperationDescription(WSDL20_2004Constants.MEP_URI_IN_ONLY);
-
-		AxisOperation rmMsgOperation = referenceRMMessage.getMessageContext().getAxisOperation();
-		if (rmMsgOperation != null) {
-			ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
-			if (outFlow != null) {
-				ackOperation.setPhasesOutFlow(outFlow);
-				ackOperation.setPhasesOutFaultFlow(outFlow);
-			}
-		}
-
-		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
-		ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, referenceMsg
-				.getProperty(AddressingConstants.WS_ADDRESSING_VERSION)); // TODO
-																			// do
-																			// this
-																			// in
-																			// the
-																			// RMMsgCreator
-
-		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
-		ackRMMsgCtx.setRMNamespaceValue(referenceRMMessage.getRMNamespaceValue());
-
-		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
-
-		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
-				.getSOAPVersion(referenceMsg.getEnvelope()));
-
-		// Setting new envelope
-		SOAPEnvelope envelope = factory.getDefaultEnvelope();
-
-		ackMsgCtx.setEnvelope(envelope);
-
-		ackMsgCtx.setTo(acksTo);
-
-		// adding the SequenceAcknowledgement part.
-		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequencePropertyKey ,sequenceId, storageManager);
-
-		ackMsgCtx.setProperty(MessageContext.TRANSPORT_IN, null);
-
-		String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
-				Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
-
-		ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, addressingNamespaceURI);
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementManager::generateAckMessage");
-		return ackRMMsgCtx;
-	}
-
-	public static boolean verifySequenceCompletion(Iterator ackRangesIterator, long lastMessageNo) {
-		if (log.isDebugEnabled())
-			log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
-
-		HashMap startMap = new HashMap();
-
-		while (ackRangesIterator.hasNext()) {
-			AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator.next();
-			startMap.put(new Long(temp.getLowerValue()), temp);
-		}
-
-		long start = 1;
-		boolean result = false;
-		while (!result) {
-			AcknowledgementRange temp = (AcknowledgementRange) startMap.get(new Long(start));
-			if (temp == null) {
-				break;
-			}
-
-			if (temp.getUpperValue() >= lastMessageNo)
-				result = true;
-
-			start = temp.getUpperValue() + 1;
-		}
-
-		if (log.isDebugEnabled())
-			log.debug("Enter: AcknowledgementManager::verifySequenceCompletion " + result);
-		return result;
-	}
-	
-	public static void addFinalAcknowledgement () {
-		
-	}
-}
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.wsrm.AcknowledgementRange;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Contains logic for managing acknowledgements.
+ */
+
+public class AcknowledgementManager {
+
+	private static Log log = LogFactory.getLog(AcknowledgementManager.class);
+
+	/**
+	 * Piggybacks any available acks of the same sequence to the given
+	 * application message.
+	 * 
+	 * @param applicationRMMsgContext
+	 * @throws SandeshaException
+	 */
+	public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager)
+			throws SandeshaException {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
+
+		ConfigurationContext configurationContext = rmMessageContext.getConfigurationContext();
+
+		SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
+
+		SenderBean findBean = new SenderBean();
+
+		findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+		findBean.setSend(true);
+		findBean.setReSend(false);
+
+		String carrietTo = rmMessageContext.getTo().getAddress();
+
+		Collection collection = retransmitterBeanMgr.find(findBean);
+
+		Iterator it = collection.iterator();
+
+		piggybackLoop: while (it.hasNext()) {
+			SenderBean ackBean = (SenderBean) it.next();
+
+			long timeNow = System.currentTimeMillis();
+			if (ackBean.getTimeToSend() > timeNow) {
+				// //Piggybacking will happen only if the end of ack interval
+				// (timeToSend) is not reached.
+
+				MessageContext ackMsgContext = storageManager.retrieveMessageContext(ackBean.getMessageContextRefKey(),
+						configurationContext);
+
+				// wsa:To has to match for piggybacking.
+				String to = ackMsgContext.getTo().getAddress();
+				if (!carrietTo.equals(to)) {
+					continue piggybackLoop;
+				}
+
+				if (log.isDebugEnabled()) log.debug("Adding ack headers");
+
+				// deleting the ack entry.
+				retransmitterBeanMgr.delete(ackBean.getMessageID());
+
+				// Adding the ack(s) to the application message
+				boolean acks = false;
+				SOAPHeader appMsgHeaders = rmMessageContext.getMessageContext().getEnvelope().getHeader();
+				
+				SOAPHeader headers = ackMsgContext.getEnvelope().getHeader();
+				if(headers != null) {
+					for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
+
+						QName name = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK);
+						Iterator iter = headers.getChildrenWithName(name);
+						while(iter.hasNext()) {
+							OMElement ackElement = (OMElement) iter.next();
+
+							SequenceAcknowledgement sequenceAcknowledgement = new SequenceAcknowledgement (Sandesha2Constants.SPEC_NS_URIS[i]);
+							sequenceAcknowledgement.fromOMElement(ackElement);
+							
+							sequenceAcknowledgement.toOMElement(appMsgHeaders);
+							acks = true;
+						}
+					}
+				}
+				
+				if (!acks) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckMessageEntry,
+							ackMsgContext.getEnvelope().toString());
+					log.debug(message);
+					throw new SandeshaException(message);
+				}
+			}
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
+	}
+
+	/**
+	 * this is used to get the acked messages of a sequence. If this is an
+	 * outgoing message the sequenceIdentifier should be the internal
+	 * sequenceID.
+	 * 
+	 * @param sequenceIdentifier
+	 * @param outGoingMessage
+	 * @return
+	 */
+	public static ArrayList getClientCompletedMessagesList(String sequenceID, SequencePropertyBeanMgr seqPropMgr)
+			throws SandeshaException {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::getClientCompletedMessagesList");
+
+		// first trying to get it from the internal sequence id.
+		SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(sequenceID,
+				Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+		String internalSequenceID = null;
+		if (internalSequenceBean != null)
+			internalSequenceID = internalSequenceBean.getValue();
+
+		SequencePropertyBean completedMessagesBean = null;
+		if (internalSequenceID != null)
+			completedMessagesBean = seqPropMgr.retrieve(internalSequenceID,
+					Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+
+		if (completedMessagesBean == null)
+			completedMessagesBean = seqPropMgr.retrieve(sequenceID,
+					Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+
+		ArrayList completedMsgList = null;
+		if (completedMessagesBean != null) {
+			completedMsgList = SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
+		} else {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull, sequenceID);
+			SandeshaException e = new SandeshaException(message);
+			if(log.isDebugEnabled()) log.debug("Throwing exception", e);
+			throw e;
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::getClientCompletedMessagesList");
+		return completedMsgList;
+	}
+
+	public static ArrayList getServerCompletedMessagesList(String sequenceID, SequencePropertyBeanMgr seqPropMgr)
+			throws SandeshaException {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::getServerCompletedMessagesList");
+
+		SequencePropertyBean completedMessagesBean = null;
+
+		completedMessagesBean = seqPropMgr.retrieve(sequenceID,
+				Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
+
+		ArrayList completedMsgList = null;
+		if (completedMessagesBean != null) {
+			completedMsgList = SandeshaUtil.getArrayListFromMsgsString(completedMessagesBean.getValue());
+		} else {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull, sequenceID);
+			SandeshaException e = new SandeshaException(message);
+			if(log.isDebugEnabled()) log.debug("Throwing exception", e);
+			throw e;
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::getServerCompletedMessagesList");
+		return completedMsgList;
+	}
+
+	public static RMMsgContext generateAckMessage(RMMsgContext referenceRMMessage, String sequencePropertyKey ,String sequenceId,
+			StorageManager storageManager) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::generateAckMessage");
+
+		MessageContext referenceMsg = referenceRMMessage.getMessageContext();
+
+		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+
+		// Setting the ack depending on AcksTo.
+		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequencePropertyKey,
+				Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+		EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
+		String acksToStr = acksTo.getAddress();
+
+		if (acksToStr == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
+
+		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.ACK,
+				referenceRMMessage.getRMSpecVersion(),
+				referenceMsg.getAxisService());
+
+		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
+		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+		ackRMMsgCtx.setRMNamespaceValue(referenceRMMessage.getRMNamespaceValue());
+
+		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+				.getSOAPVersion(referenceMsg.getEnvelope()));
+
+		// Setting new envelope
+		SOAPEnvelope envelope = factory.getDefaultEnvelope();
+
+		ackMsgCtx.setEnvelope(envelope);
+
+		ackMsgCtx.setTo(acksTo);
+
+		// adding the SequenceAcknowledgement part.
+		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequencePropertyKey ,sequenceId, storageManager);
+
+		String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+				Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, storageManager);
+
+		ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, addressingNamespaceURI);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::generateAckMessage");
+		return ackRMMsgCtx;
+	}
+
+	public static boolean verifySequenceCompletion(Iterator ackRangesIterator, long lastMessageNo) {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
+
+		HashMap startMap = new HashMap();
+
+		while (ackRangesIterator.hasNext()) {
+			AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator.next();
+			startMap.put(new Long(temp.getLowerValue()), temp);
+		}
+
+		long start = 1;
+		boolean result = false;
+		while (!result) {
+			AcknowledgementRange temp = (AcknowledgementRange) startMap.get(new Long(start));
+			if (temp == null) {
+				break;
+			}
+
+			if (temp.getUpperValue() >= lastMessageNo)
+				result = true;
+
+			start = temp.getUpperValue() + 1;
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::verifySequenceCompletion " + result);
+		return result;
+	}
+	
+	public static void addFinalAcknowledgement () {
+		
+	}
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=469167&r1=469166&r2=469167
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java Mon Oct 30 07:43:24 2006
@@ -33,18 +33,8 @@
 import org.apache.axiom.soap.SOAPFaultSubCode;
 import org.apache.axiom.soap.SOAPFaultText;
 import org.apache.axiom.soap.SOAPFaultValue;
-import org.apache.axiom.soap.impl.dom.SOAPTextImpl;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.ServiceContext;
-import org.apache.axis2.context.ServiceGroupContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.util.Utils;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.FaultData;
@@ -97,7 +87,6 @@
 		if (createSequence == null)
 			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqParts));
 
-		ConfigurationContext context = createSequenceMessage.getConfigurationContext();
 		if (storageManager == null)
 			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotGetStorageManager));
 
@@ -146,7 +135,6 @@
 		long messageNumber = sequence.getMessageNumber().getMessageNumber();
 		String sequenceID = sequence.getIdentifier().getIdentifier();
 
-		ConfigurationContext configCtx = applicationRMMessage.getMessageContext().getConfigurationContext();
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
 
 		boolean lastMessageNumberExceeded = false;
@@ -214,7 +202,6 @@
 		MessageContext messageContext = rmMessageContext.getMessageContext();
 
 		CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
-		int type = rmMessageContext.getMessageType();
 
 		boolean validSequence = false;
 
@@ -367,7 +354,6 @@
 			log.debug("Enter: FaultManager::checkForSequenceClosed, " + sequenceID);
 
 		MessageContext referenceMessage = referenceRMMessage.getMessageContext();
-		ConfigurationContext configCtx = referenceMessage.getConfigurationContext();
 
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org