You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/04/23 11:55:16 UTC

svn commit: r531400 [10/18] - in /webservices/sandesha/trunk/java/modules: client/ core/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/sandesha2/ core/src/main/java/org...

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,380 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+/**
+ * Contains logic to remove all the storad data of a sequence. Methods of this
+ * are called by sending side and the receiving side when appropriate
+ */
+
+public class TerminateManager {
+
+	private static Log log = LogFactory.getLog(TerminateManager.class);
+
+	private static String CLEANED_ON_TERMINATE_MSG = "CleanedOnTerminateMsg";
+
+	private static String CLEANED_AFTER_INVOCATION = "CleanedAfterInvocation";
+
+	public static HashMap receivingSideCleanMap = new HashMap();
+
+	public static void checkAndTerminate(ConfigurationContext configurationContext, StorageManager storageManager, RMSBean rmsBean)
+	throws SandeshaStorageException, AxisFault {
+		if(log.isDebugEnabled()) log.debug("Enter: TerminateManager::checkAndTerminate " +rmsBean);
+
+		long lastOutMessage = rmsBean.getLastOutMessage ();
+
+		if (lastOutMessage > 0 && !rmsBean.isTerminateAdded()) {
+			
+			boolean complete = AcknowledgementManager.verifySequenceCompletion(rmsBean.getClientCompletedMessages(), lastOutMessage);
+			
+			//If this is RM 1.1 and RMAnonURI scenario, dont do the termination unless the response side createSequence has been
+			//received (RMDBean has been created) through polling, in this case termination will happen in the create sequence response processor.
+			String rmVersion = rmsBean.getRMVersion();
+			String replyToAddress = rmsBean.getReplyToEPR();
+
+			if (complete &&
+					Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmVersion) && SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
+				RMDBean findBean = new RMDBean ();
+				findBean.setPollingMode(true);
+				findBean.setToAddress(replyToAddress);
+
+				RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+				List beans = rmdBeanMgr.find(findBean);
+				if(beans.isEmpty()) {
+					rmsBean.setTerminationPauserForCS(true);
+					storageManager.getRMSBeanMgr().update(rmsBean);
+					complete = false;
+				}
+			}
+			
+			// If we are doing sync 2-way over WSRM 1.0 then we may need to keep sending messages,
+			// so check to see if all the senders have been removed
+			EndpointReference replyTo = new EndpointReference (replyToAddress);
+			if (complete &&
+					Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && (replyToAddress==null || replyTo.hasAnonymousAddress())) {
+				SenderBean matcher = new SenderBean();
+				matcher.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+				matcher.setSequenceID(rmsBean.getSequenceID());
+				
+				List matches = storageManager.getSenderBeanMgr().find(matcher);
+				if(!matches.isEmpty()) complete = false;
+			}
+			
+			if (complete) {
+				
+				String referenceMsgKey = rmsBean.getReferenceMessageStoreKey();
+				if (referenceMsgKey==null) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referenceMessageNotSetForSequence,rmsBean.getSequenceID());
+					throw new SandeshaException (message);
+				}
+				
+				MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey, configurationContext);
+				
+				if (referenceMessage==null) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getSequenceID());
+					throw new SandeshaException (message);
+				}
+				
+				RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
+				addTerminateSequenceMessage(referenceRMMsg, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+			}
+			
+		}
+
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::checkAndTerminate");
+	}
+	
+	
+	/**
+	 * Called by the receiving side to remove data related to a sequence. e.g.
+	 * After sending the TerminateSequence message. Calling this methods will
+	 * complete all the data if InOrder invocation is not sequired.
+	 * 
+	 * @param configContext
+	 * @param sequenceID
+	 * @throws SandeshaException
+	 */
+	public static void cleanReceivingSideOnTerminateMessage(ConfigurationContext configContext, String sequenceId,
+			StorageManager storageManager) throws SandeshaException {
+
+		// clean senderMap
+
+		//removing any un-sent ack messages.
+		SenderBean findAckBean = new SenderBean ();
+		findAckBean.setSequenceID(sequenceId);
+		findAckBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+		
+		SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+		Iterator ackBeans = senderBeanMgr.find(findAckBean).iterator();
+		while (ackBeans.hasNext()) {
+			SenderBean ackBean = (SenderBean) ackBeans.next();
+			senderBeanMgr.delete(ackBean.getMessageID());
+			
+			storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
+		}
+		
+		// Currently in-order invocation is done for default values.
+		boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(configContext.getAxisConfiguration())
+				.isInOrder();
+
+		if (!inOrderInvocation) {
+			// there is no invoking by Sandesha2. So clean invocations storages.
+			
+			receivingSideCleanMap.put(sequenceId, CLEANED_ON_TERMINATE_MSG);
+			cleanReceivingSideAfterInvocation(sequenceId, storageManager);
+		} else {
+
+			String cleanStatus = (String) receivingSideCleanMap.get(sequenceId);
+			if (cleanStatus != null
+					&& CLEANED_AFTER_INVOCATION.equals(cleanStatus))
+				// Remove the sequence from the map
+				receivingSideCleanMap.remove(sequenceId);
+				//completeTerminationOfReceivingSide(configContext,
+				//		sequenceId, storageManager);
+			else
+				receivingSideCleanMap.put(sequenceId, CLEANED_ON_TERMINATE_MSG);
+		}
+	}
+
+	/**
+	 * When InOrder invocation is anabled this had to be called to clean the
+	 * data left by the above method. This had to be called after the Invocation
+	 * of the Last Message.
+	 * 
+	 * @param sequenceID
+	 * @throws SandeshaException
+	 */
+	public static void cleanReceivingSideAfterInvocation(String sequenceId,
+			StorageManager storageManager) throws SandeshaException {
+		if(log.isDebugEnabled()) log.debug("Enter: TerminateManager::cleanReceivingSideAfterInvocation " +sequenceId);
+		
+		InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
+
+		// removing InvokerBean entries
+		InvokerBean invokerFindBean = new InvokerBean();
+		invokerFindBean.setSequenceID(sequenceId);
+		Collection collection = invokerBeanMgr.find(invokerFindBean);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			InvokerBean invokerBean = (InvokerBean) iterator.next();
+			String messageStoreKey = invokerBean.getMessageContextRefKey();
+			invokerBeanMgr.delete(messageStoreKey);
+
+			// removing the respective message context from the message store.
+			storageManager.removeMessageContext(messageStoreKey);
+		}
+
+		String cleanStatus = (String) receivingSideCleanMap.get(sequenceId);
+		if (cleanStatus != null && CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
+			// Remove the sequence id from the map
+			receivingSideCleanMap.remove(sequenceId);
+			//completeTerminationOfReceivingSide(configContext, sequenceId, storageManager);
+		else 
+			receivingSideCleanMap.put(sequenceId, CLEANED_AFTER_INVOCATION);		
+		
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::cleanReceivingSideAfterInvocation");
+	}
+
+	/**
+	 * This is called by the sending side to clean data related to a sequence.
+	 * e.g. after sending the TerminateSequence message.
+	 * 
+	 * @param configContext
+	 * @param sequenceID
+	 * @throws SandeshaException
+	 */
+	public static void terminateSendingSide(RMSBean rmsBean, 
+			StorageManager storageManager) throws SandeshaException {
+
+		// Indicate that the sequence is terminated
+		rmsBean.setTerminated(true);		
+		storageManager.getRMSBeanMgr().update(rmsBean);
+		
+		cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager);
+	}
+
+	public static void timeOutSendingSideSequence(String internalSequenceId,
+			StorageManager storageManager) throws SandeshaException {
+
+		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+		rmsBean.setTimedOut(true);
+		rmsBean.setLastActivatedTime(System.currentTimeMillis());
+		storageManager.getRMSBeanMgr().update(rmsBean);
+
+		cleanSendingSideData(internalSequenceId, storageManager);
+	}
+
+	private static void cleanSendingSideData(String internalSequenceId, StorageManager storageManager) throws SandeshaException {
+
+		SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+		// removing retransmitterMgr entries and corresponding message contexts.
+		Collection collection = retransmitterBeanMgr.find(internalSequenceId);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			SenderBean retransmitterBean = (SenderBean) iterator.next();
+			retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+			String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+			storageManager.removeMessageContext(messageStoreKey);
+		}
+	}
+
+	public static void addTerminateSequenceMessage(RMMsgContext referenceMessage, String internalSequenceID, String outSequenceId, StorageManager storageManager) throws AxisFault {
+	
+		if(log.isDebugEnabled())
+			log.debug("Enter: TerminateManager::addTerminateSequenceMessage " + outSequenceId + ", " + internalSequenceID);
+
+		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
+
+		if (rmsBean.isTerminateAdded()) {
+			if(log.isDebugEnabled())
+				log.debug("Exit: TerminateManager::addTerminateSequenceMessage - terminate was added previously.");
+			return;
+		}
+
+		RMMsgContext terminateRMMessage = RMMsgCreator.createTerminateSequenceMessage(referenceMessage, rmsBean, storageManager);
+		terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+		terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		
+		//setting the To EPR.
+		//First try to get it from an Endpoint property.
+		//If not get it from the To property.
+		
+		EndpointReference toEPR = null;
+		
+		if (rmsBean.getOfferedEndPoint() != null)
+			toEPR = new EndpointReference (rmsBean.getOfferedEndPoint());
+		
+		if (toEPR==null) {
+
+			if (rmsBean.getToEPR()!=null) {
+				toEPR = new EndpointReference(rmsBean.getToEPR());
+				if (toEPR == null) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+					throw new SandeshaException(message);
+				}
+			}
+		}
+
+		if (toEPR!=null)
+			terminateRMMessage.setTo(toEPR);
+		
+		if (rmsBean.getReplyToEPR()!=null) {
+			terminateRMMessage.setReplyTo(new EndpointReference (rmsBean.getReplyToEPR()));
+		}
+		
+		String rmVersion = rmsBean.getRMVersion();
+		terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+		terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+		if (rmsBean.getTransportTo() != null) {
+			terminateRMMessage.setProperty(Constants.Configuration.TRANSPORT_URL, rmsBean.getTransportTo());
+		}
+
+		terminateRMMessage.addSOAPEnvelope();
+
+		String key = SandeshaUtil.getUUID();
+
+		SenderBean terminateBean = new SenderBean();
+		terminateBean.setInternalSequenceID(internalSequenceID);
+		terminateBean.setSequenceID(outSequenceId);
+		terminateBean.setMessageContextRefKey(key);
+		terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+
+		// Set a retransmitter lastSentTime so that terminate will be send with
+		// some delay.
+		// Otherwise this get send before return of the current request (ack).
+		// TODO: refine the terminate delay.
+		terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
+
+		terminateBean.setMessageID(terminateRMMessage.getMessageId());
+
+		// this will be set to true at the sender.
+		terminateBean.setSend(true);
+
+		terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+				Sandesha2Constants.VALUE_FALSE);
+
+		terminateBean.setReSend(false);
+		
+		terminateBean.setSequenceID(outSequenceId);
+		
+		terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+		terminateBean.setInternalSequenceID(internalSequenceID);
+		
+		
+		EndpointReference to = terminateRMMessage.getTo();
+		if (to!=null)
+			terminateBean.setToAddress(to.getAddress());
+
+		// If this message is targetted at an anonymous address then we must not have a transport
+		// ready for it, as the terminate sequence is not a reply.
+		if(to == null || to.hasAnonymousAddress())
+			terminateBean.setTransportAvailable(false);
+
+		rmsBean.setTerminateAdded(true);
+
+		storageManager.getRMSBeanMgr().update(rmsBean);
+
+		terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+		
+		//the propertyKey of the ackMessage will be the propertyKey for the terminate message as well.
+//		terminateRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY, sequencePropertyKey);
+		
+		// / addTerminateSeqTransaction.commit();
+		SandeshaUtil.executeAndStore(terminateRMMessage, key);
+		
+		SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
+		
+		
+		retramsmitterMgr.insert(terminateBean);
+
+		if(log.isDebugEnabled())
+			log.debug("Exit: TerminateManager::addTerminateSequenceMessage");
+	}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,220 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+public class WSRMMessageSender  {
+
+	private static final Log log = LogFactory.getLog(WSRMMessageSender.class);
+	
+	private MessageContext msgContext;
+	private StorageManager storageManager;
+	private ConfigurationContext configurationContext;
+	private String toAddress;
+	private String sequenceKey;
+	private String internalSequenceID;
+	private boolean sequenceExists;
+	private String outSequenceID;
+	private String rmVersion;
+	private RMSBean rmsBean;
+	
+	/**
+	 * Extracts information from the rmMsgCtx specific for processing out messages
+	 * 
+	 * @param rmMsgCtx
+	 * @throws AxisFault
+	 */
+	protected void setupOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: WSRMParentProcessor::setupOutMessage");
+
+		msgContext = rmMsgCtx.getMessageContext();
+		configurationContext = msgContext.getConfigurationContext();
+		Options options = msgContext.getOptions();
+
+		storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+				configurationContext.getAxisConfiguration());
+
+		internalSequenceID = 
+			(String)rmMsgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+		
+		toAddress = rmMsgCtx.getTo().getAddress();
+		sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+		
+		if(internalSequenceID==null)
+		{
+			internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);			
+		}
+
+		// Does the sequence exist ?
+		sequenceExists = false;
+		outSequenceID = null;
+		
+		// Get the RMSBean with the matching internal sequenceid 
+		rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
+		
+		if (rmsBean == null)
+		{
+			if (log.isDebugEnabled())
+				log.debug("Exit: WSRMParentProcessor::setupOutMessage Sequence doesn't exist");
+			
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));			
+		}
+		
+		if (rmsBean.getSequenceID() != null)
+		{
+			sequenceExists = true;		
+			outSequenceID = rmsBean.getSequenceID();
+		}
+		else
+			outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;			
+
+		rmVersion = rmsBean.getRMVersion();
+		if (rmVersion == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: WSRMParentProcessor::setupOutMessage");
+  }
+	
+	
+	protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
+		
+		rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+		getMsgContext().setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		rmMsgCtx.setTo(new EndpointReference(toAddress));
+		
+		String transportTo = rmsBean.getTransportTo();
+		if (transportTo != null) {
+			rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
+		}		
+		
+		//setting msg context properties
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
+
+		rmMsgCtx.addSOAPEnvelope();
+
+		// Ensure the outbound message us secured using the correct token
+		RMMsgCreator.secureOutboundMessage(getRMSBean(), msgContext);
+		
+		String key = SandeshaUtil.getUUID();
+
+		SenderBean senderBean = new SenderBean();
+		senderBean.setMessageType(msgType);
+		senderBean.setMessageContextRefKey(key);
+		senderBean.setTimeToSend(System.currentTimeMillis() + delay);
+		senderBean.setMessageID(msgContext.getMessageID());
+		
+		// Set the internal sequence id and outgoing sequence id for the terminate message
+		senderBean.setInternalSequenceID(internalSequenceID);
+		if (sequenceExists)
+		{
+			senderBean.setSend(true);
+			senderBean.setSequenceID(outSequenceID);
+		}
+		else
+			senderBean.setSend(false);			
+		
+		EndpointReference to = msgContext.getTo();
+		if (to!=null)
+			senderBean.setToAddress(to.getAddress());
+
+		// If this message is targetted at an anonymous address then we must not have a transport
+		// ready for it, as the current message is not a reply.
+		if(to == null || to.hasAnonymousAddress())
+			senderBean.setTransportAvailable(false);
+		
+		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+		senderBean.setReSend(false);
+
+		SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
+		
+		SandeshaUtil.executeAndStore(rmMsgCtx, key);
+	
+		retramsmitterMgr.insert(senderBean);
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
+
+	}
+	
+
+	public final StorageManager getStorageManager() {
+  	return storageManager;
+  }
+
+	public final String getInternalSequenceID() {
+  	return internalSequenceID;
+  }
+
+	public final MessageContext getMsgContext() {
+  	return msgContext;
+  }
+
+	public final String getOutSequenceID() {
+  	return outSequenceID;
+  }
+
+	public final boolean isSequenceExists() {
+  	return sequenceExists;
+  }
+
+	public final String getSequenceKey() {
+  	return sequenceKey;
+  }
+
+	public final String getToAddress() {
+  	return toAddress;
+  }
+
+	public final ConfigurationContext getConfigurationContext() {
+  	return configurationContext;
+  }
+
+	public final String getRMVersion() {
+  	return rmVersion;
+  }
+	
+	public final RMSBean getRMSBean() {
+		return rmsBean;
+	}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,363 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * This is used when InOrder invocation is required. This is a seperated Thread
+ * that keep running all the time. At each iteration it checks the InvokerTable
+ * to find weather there are any messages to me invoked.
+ */
+
+public class Invoker extends SandeshaThread {
+
+	private static final Log log = LogFactory.getLog(Invoker.class);
+
+	// If this invoker is working for several sequences, we use round-robin to
+	// try and give them all a chance to invoke messages.
+	int nextIndex = 0;
+	boolean processedMessage = false;
+
+	public Invoker() {
+		super(Sandesha2Constants.INVOKER_SLEEP_TIME);
+	}
+	
+	/**
+	 * Forces dispatch of queued messages to the application.
+	 * NOTE: may break ordering
+	 * @param ctx
+	 * @param sequenceID
+	 * @param allowLaterDeliveryOfMissingMessages if true, messages skipped over during this
+	 * action will be invoked if they arrive on the system at a later time. 
+	 * Otherwise messages skipped over will be ignored
+	 * @throws SandeshaException
+	 */
+	public synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx, 
+			String sequenceID,
+			boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
+		//first we block while we wait for the invoking thread to pause
+		blockForPause();
+		try{
+			//get all invoker beans for the sequence
+			InvokerBeanMgr storageMapMgr = storageManager
+					.getInvokerBeanMgr();
+			RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+			RMDBean rMDBean = rmdBeanMgr.retrieve(sequenceID);
+			
+			if (rMDBean != null) {
+				
+				//The outOfOrder window is the set of known sequence messages (including those
+				//that are missing) at the time the button is pressed.
+				long firstMessageInOutOfOrderWindow = rMDBean.getNextMsgNoToProcess();
+			
+				InvokerBean selector = new InvokerBean();
+				selector.setSequenceID(sequenceID);
+				Iterator stMapIt = storageMapMgr.find(selector).iterator();
+				
+				long highestMsgNumberInvoked = 0;
+				Transaction transaction = null;
+				
+				//invoke each bean in turn. 
+				//NOTE: here we are breaking ordering
+				while(stMapIt.hasNext()){
+					transaction = storageManager.getTransaction();
+					InvokerBean invoker = (InvokerBean)stMapIt.next();
+					
+					//invoke the app
+					try{
+						// start a new worker thread and let it do the invocation.
+						String workId = sequenceID + "::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
+					   //piece of work that will be assigned to the Worker.
+						
+						String messageContextKey = invoker.getMessageContextRefKey();
+						InvokerWorker worker = new InvokerWorker(context,
+								messageContextKey, 
+								true); //want to ignore the enxt msg number
+						
+						worker.setLock(getWorkerLock());
+						worker.setWorkId(workId);
+						
+						//before we execute we need to set the 
+						
+						threadPool.execute(worker);
+					
+						//adding the workId to the lock after assigning it to a thread makes sure 
+						//that all the workIds in the Lock are handled by threads.
+						getWorkerLock().addWork(workId);
+
+						long msgNumber = invoker.getMsgNo();
+						//if necessary, update the "next message number" bean under this transaction
+						if(msgNumber>highestMsgNumberInvoked){
+							highestMsgNumberInvoked = invoker.getMsgNo();
+							rMDBean.setNextMsgNoToProcess(highestMsgNumberInvoked+1);
+							
+							if(allowLaterDeliveryOfMissingMessages){
+								//we also need to update the sequence OUT_OF_ORDER_RANGES property
+								//so as to include our latest view of this outOfOrder range.
+								//We do that here (rather than once at the end) so that we reamin
+								//transactionally consistent
+								Range r = new Range(firstMessageInOutOfOrderWindow,highestMsgNumberInvoked);
+										
+								RangeString rangeString = null;
+								if(rMDBean.getOutOfOrderRanges()==null){
+									//insert a new blank one one
+									rangeString = new RangeString();
+								}
+								else{
+									rangeString = rMDBean.getOutOfOrderRanges();
+								}
+								//update the range String with the new value
+								rangeString.addRange(r);
+								rMDBean.setOutOfOrderRanges(rangeString);
+							}
+							
+							rmdBeanMgr.update(rMDBean);
+						}
+						
+					}
+					catch(Exception e){
+						if(transaction != null) {
+							transaction.rollback();
+							transaction = null;
+						}
+					} finally {
+						if(transaction != null) {
+							transaction.commit();
+							transaction = null;
+						}
+					}
+		
+				}//end while
+			}
+		}
+		finally{
+			//restart the invoker
+			finishPause();
+		}
+	}
+
+	private void addOutOfOrderInvokerBeansToList(String sequenceID, 
+			StorageManager storageManager, List list)throws SandeshaException{
+		if (log.isDebugEnabled())
+			log.debug("Enter: InOrderInvoker::addOutOfOrderInvokerBeansToList " + sequenceID + ", " + list);
+		
+		RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
+		
+		if(rmdBean != null && rmdBean.getOutOfOrderRanges() != null){
+			RangeString rangeString = rmdBean.getOutOfOrderRanges();
+			//we now have the set of ranges that can be delivered out of order.
+			//Look for any invokable message that lies in one of those ranges
+			InvokerBean selector = new InvokerBean();
+			selector.setSequenceID(sequenceID);
+			Iterator invokerBeansIterator = 
+				storageManager.getInvokerBeanMgr().find(selector).iterator();
+			
+			while(invokerBeansIterator.hasNext()){
+				InvokerBean invokerBean = (InvokerBean)invokerBeansIterator.next();
+				
+				if(rangeString.isMessageNumberInRanges(invokerBean.getMsgNo())){
+					//an invoker bean that has not been deleted and lies in an out
+					//or order range - we can add this to the list
+					list.add(invokerBean);
+				}
+			}
+			
+		}
+			
+		if (log.isDebugEnabled())
+			log.debug("Exit: InOrderInvoker::addOutOfOrderInvokerBeansToList");
+	}
+	
+	protected boolean internalRun() {
+		if (log.isDebugEnabled()) log.debug("Enter: Invoker::internalRun");
+		
+		boolean sleep = false;
+		Transaction transaction = null;
+
+		try {
+			RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
+
+			InvokerBeanMgr storageMapMgr = storageManager
+					.getInvokerBeanMgr();
+
+			transaction = storageManager.getTransaction();
+			
+			// Pick a sequence using a round-robin approach
+			ArrayList allSequencesList = getSequences();
+			int size = allSequencesList.size();
+			log.debug("Choosing one from " + size + " sequences");
+			if(nextIndex >= size) {
+				nextIndex = 0;
+
+				// We just looped over the set of sequences. If we didn't process any
+				// messages on this loop then we sleep before the next one
+				if(size == 0 || !processedMessage) {
+					sleep = true;
+				}
+				processedMessage = false;
+				
+				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, looped over all sequences, sleep " + sleep);
+				return sleep;
+			}
+
+			SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+			String sequenceId = entry.getSequenceId();
+			log.debug("Chose sequence " + sequenceId);
+
+			RMDBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+			if (nextMsgBean == null) {
+				log.debug("Next message not set correctly. Removing invalid entry.");
+
+				stopThreadForSequence(sequenceId, entry.isRmSource());
+				allSequencesList = getSequences();
+				if (allSequencesList.size() == 0)
+					sleep = true;
+
+				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, sleep " + sleep);
+				return sleep;
+			}
+
+			long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
+			if (nextMsgno <= 0) {
+				// Make sure we sleep on the next loop, so that we don't spin in a tight loop
+				sleep = true;
+				if (log.isDebugEnabled())
+					log.debug("Invalid Next Message Number " + nextMsgno);
+				String message = SandeshaMessageHelper.getMessage(
+						SandeshaMessageKeys.invalidMsgNumber, Long
+								.toString(nextMsgno));
+				throw new SandeshaException(message);
+			}
+
+			InvokerBean selector = new InvokerBean();
+			selector.setSequenceID(sequenceId);
+			selector.setMsgNo(nextMsgno);
+			List invokerBeans = storageMapMgr.find(selector);
+			
+			//add any msgs that belong to out of order windows
+			addOutOfOrderInvokerBeansToList(sequenceId, 
+					storageManager, invokerBeans);
+			
+			// If there aren't any beans to process then move on to the next sequence
+			if (invokerBeans.size() == 0) {
+				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep " + sleep);
+				return sleep;
+			}
+			
+			Iterator stMapIt = invokerBeans.iterator();
+
+			//TODO correct the locking mechanism to have one lock per sequence.
+			//TODO should this be a while, not an if?
+			if (stMapIt.hasNext()) { //some invokation work is present
+				
+				InvokerBean bean = (InvokerBean) stMapIt.next();
+				//see if this is an out of order msg
+				boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
+				
+				String workId = sequenceId + "::" + bean.getMsgNo(); 
+																	//creating a workId to uniquely identify the
+															   //piece of work that will be assigned to the Worker.
+									
+				//check whether the bean is already assigned to a worker.
+				if (getWorkerLock().isWorkPresent(workId)) {
+					// As there is already a worker assigned we are probably dispatching
+					// messages too quickly, so we sleep before trying the next sequence.
+					sleep = true;
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
+					if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
+					return sleep;
+				}
+
+				String messageContextKey = bean.getMessageContextRefKey();
+				
+				if(transaction != null) {
+					transaction.commit();
+					transaction = null;
+				}
+
+				// start a new worker thread and let it do the invocation.
+				InvokerWorker worker = new InvokerWorker(context,
+						messageContextKey, 
+						beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
+																	//out of order message
+				
+				worker.setLock(getWorkerLock());
+				worker.setWorkId(workId);
+				
+				threadPool.execute(worker);
+				
+				//adding the workId to the lock after assigning it to a thread makes sure 
+				//that all the workIds in the Lock are handled by threads.
+				getWorkerLock().addWork(workId);
+				
+				processedMessage = true;
+			}
+		} catch (Exception e) {
+			if (transaction != null) {
+				try {
+					transaction.rollback();
+					transaction = null;
+				} catch (Exception e1) {
+					String message = SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.rollbackError, e1
+									.toString());
+					log.debug(message, e1);
+				}
+			}
+			String message = SandeshaMessageHelper
+					.getMessage(SandeshaMessageKeys.invokeMsgError);
+			log.debug(message, e);
+		} finally {
+			if (transaction != null) {
+				try {
+					transaction.commit();
+					transaction = null;
+				} catch (Exception e) {
+					String message = SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.commitError, e.toString());
+					log.debug(message, e);
+				}
+			}
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: InOrderInvoker::internalRun");
+		return sleep;
+	}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,215 @@
+package org.apache.sandesha2.workers;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+public class InvokerWorker extends SandeshaWorker implements Runnable {
+
+	ConfigurationContext configurationContext = null;
+	String messageContextKey;
+	boolean ignoreNextMsg = false;
+	
+	Log log = LogFactory.getLog(InvokerWorker.class);
+	
+	public InvokerWorker (ConfigurationContext configurationContext, String messageContextKey, boolean ignoreNextMsg) {
+		this.configurationContext = configurationContext;
+		this.messageContextKey = messageContextKey;
+		this.ignoreNextMsg = ignoreNextMsg;
+	}
+	
+	public void run() {
+		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run");
+		
+		Transaction transaction = null;
+		MessageContext msgToInvoke = null;
+		
+		try {
+			
+			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+			InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
+			
+			//starting a transaction
+			transaction = storageManager.getTransaction();
+			
+			InvokerBean invokerBean = invokerBeanMgr.retrieve(messageContextKey);
+			
+			msgToInvoke = storageManager.retrieveMessageContext(messageContextKey, configurationContext);
+			RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
+
+			// ending the transaction before invocation.
+			if(transaction != null) {
+				transaction.commit();
+				transaction = null;
+			}
+					
+			//starting a transaction for the invocation work.
+			transaction = storageManager.getTransaction();
+			// Depending on the transaction  support, the service will be invoked only once. 
+			// Therefore we delete the invoker bean and message now, ahead of time
+			invokerBeanMgr.delete(messageContextKey);
+			// removing the corresponding message context as well.
+			storageManager.removeMessageContext(messageContextKey);
+
+			try {
+
+				boolean postFailureInvocation = false;
+
+				// StorageManagers should st following property to
+				// true, to indicate that the message received comes
+				// after a failure.
+				String postFaulureProperty = (String) msgToInvoke
+						.getProperty(Sandesha2Constants.POST_FAILURE_MESSAGE);
+				if (postFaulureProperty != null
+						&& Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
+					postFailureInvocation = true;
+
+				AxisEngine engine = new AxisEngine(msgToInvoke.getConfigurationContext());
+				if (postFailureInvocation) {
+					makeMessageReadyForReinjection(msgToInvoke);
+					if (log.isDebugEnabled())
+						log.debug("Receiving message, key=" + messageContextKey + ", msgCtx="
+								+ msgToInvoke.getEnvelope().getHeader());
+					engine.receive(msgToInvoke);
+				} else {
+					if (log.isDebugEnabled())
+						log.debug("Resuming message, key=" + messageContextKey + ", msgCtx="
+								+ msgToInvoke.getEnvelope().getHeader());
+					msgToInvoke.setPaused(false);
+					engine.resumeReceive(msgToInvoke);
+				}
+				
+				if(transaction!=null){
+					transaction.commit();
+					transaction = storageManager.getTransaction();
+				}
+
+			} catch (Exception e) {
+				if (log.isDebugEnabled())
+					log.debug("Exception :", e);
+				if(transaction!=null){
+					transaction.rollback();
+					transaction = storageManager.getTransaction();
+				}
+				handleFault(rmMsg, e);
+			}
+
+			if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+				Sequence sequence = (Sequence) rmMsg
+						.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+				
+				boolean highestMessage = false;
+				if (sequence.getLastMessage() != null) {
+					//this will work for RM 1.0 only
+					highestMessage = true;
+				} else {
+					RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, invokerBean.getSequenceID());
+					
+					if (rmdBean!=null && rmdBean.isTerminated()) {
+						long highestInMsgNo = rmdBean.getHighestInMessageNumber();
+						if (invokerBean.getMsgNo()==highestInMsgNo)
+							highestMessage = true;
+					}
+				}
+				
+				if (highestMessage) {
+					//do cleaning stuff that hs to be done after the invocation of the last message.
+					TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), storageManager);
+					// exit from current iteration. (since an entry
+					// was removed)
+					if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run Last message return");					
+					return;
+				}
+			}
+			
+			if(!ignoreNextMsg){
+				// updating the next msg to invoke
+				RMDBean rMDBean = storageManager.getRMDBeanMgr().retrieve(invokerBean.getSequenceID());
+				long nextMsgNo = rMDBean.getNextMsgNoToProcess();
+				
+				if (!(invokerBean.getMsgNo()==nextMsgNo)) {
+					String message = "Operated message number is different from the Next Message Number to invoke";
+					throw new SandeshaException (message);
+				}
+				
+				nextMsgNo++;
+				rMDBean.setNextMsgNoToProcess(nextMsgNo);
+				storageManager.getRMDBeanMgr().update(rMDBean);
+			}
+		} catch (Exception e) {
+			if (log.isErrorEnabled())
+				log.error(e.toString(), e);
+			if(transaction != null) {
+				transaction.rollback();
+				transaction = null;
+			}
+		} finally {
+			if (transaction!=null) transaction.commit();
+			
+			if (workId !=null && lock!=null) {
+				lock.removeWork(workId);
+			}
+		}
+		
+		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+	}
+
+	private void makeMessageReadyForReinjection(MessageContext messageContext) {
+		messageContext.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, null);
+		messageContext.getOptions().setMessageId(null);
+		messageContext.getOptions().setTo(null);
+		messageContext.getOptions().setAction(null);
+		messageContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, Sandesha2Constants.VALUE_TRUE);
+	}
+
+	private void handleFault(RMMsgContext inRMMsgContext, Exception e) {
+		MessageContext inMsgContext = inRMMsgContext.getMessageContext();
+		AxisEngine engine = new AxisEngine(inMsgContext.getConfigurationContext());
+		try {					
+			MessageContext faultContext = MessageContextBuilder.createFaultMessageContext(inMsgContext, e);
+			// Copy some of the parameters to the new message context.
+			faultContext.setProperty(Constants.Configuration.CONTENT_TYPE, inMsgContext
+					.getProperty(Constants.Configuration.CONTENT_TYPE));
+
+			EndpointReference faultEPR = inRMMsgContext.getFaultTo();
+			if (faultEPR==null)
+				faultEPR = inRMMsgContext.getReplyTo();
+			
+			//we handler the WSRM Anon InOut scenario differently here
+			if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(inRMMsgContext.getRMSpecVersion())
+					&& (faultEPR==null || faultEPR.hasAnonymousAddress())) {
+				RequestResponseTransport requestResponseTransport = (RequestResponseTransport) inRMMsgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+				
+				//this will cause the fault to be thrown out of thread waiting on this transport object.
+				AxisFault fault = new AxisFault ("Sandesha2 got a fault when doing the invocation", faultContext);
+				requestResponseTransport.signalFaultReady(fault);
+			} else	
+				engine.sendFault(faultContext);
+			
+		} catch (AxisFault e1) {
+			if (log.isErrorEnabled())
+				log.error("Unable to send fault message ", e1);
+		}
+	}
+	
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,303 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * Aggregates pause and stop logic between sender and invoker threads.
+ */
+public abstract class SandeshaThread extends Thread{
+
+	private static final Log log = LogFactory.getLog(SandeshaThread.class);
+
+	private boolean runThread = false;
+	private boolean hasStoppedRunning = false;
+	private boolean hasPausedRunning = false;
+	private boolean pauseRequired = false;
+	
+	private int sleepTime;
+  private WorkerLock lock = null;
+
+	private ArrayList workingSequences = new ArrayList();
+	
+	protected transient ThreadFactory threadPool;
+	protected ConfigurationContext context = null;
+	protected StorageManager storageManager = null;
+	private boolean reRunThread;
+
+	public SandeshaThread(int sleepTime) {
+		this.sleepTime = sleepTime;
+  	lock = new WorkerLock ();
+	}
+	
+	public final WorkerLock getWorkerLock() {
+		return lock;
+	}
+	
+	public synchronized void stopThreadForSequence(String sequenceID, boolean rmSource){
+		if (log.isDebugEnabled())
+			log.debug("Enter: SandeshaThread::stopThreadForSequence, " + sequenceID);
+		
+		// We do not actually stop the thread here, as the workers are smart enough
+		// to sleep when there is no work to do. If we were to exit the thread then
+		// we wouldn't be able to start back up when the thread gets some more work
+		// to do.
+		workingSequences.remove(new SequenceEntry(sequenceID, rmSource));
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: SandeshaThread::stopThreadForSequence");		
+	}
+	
+	/**
+	 * Waits for the invoking thread to pause
+	 */
+	public synchronized void blockForPause(){
+		while(pauseRequired){
+			//someone else is requesting a pause - wait for them to finish
+			try{
+				wait(sleepTime);
+			}catch(InterruptedException e){
+				//ignore
+			}
+		}
+		
+	  //we can now request a pause - the next pause will be ours
+	  pauseRequired = true;
+				
+		if(hasStoppedRunning() || !isThreadStarted()){
+			throw new IllegalStateException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotPauseThread));
+		}
+		while(!hasPausedRunning){
+			//wait for our pause to come around
+			try{
+				wait(sleepTime);
+			}catch(InterruptedException e){
+				//ignore
+			}
+			
+		}
+		//the sandesha thread is now paused
+	}
+	
+	public synchronized void finishPause(){
+		//indicate that the current pause is no longer required.
+		pauseRequired = false;
+		notifyAll();
+	}
+	
+	public synchronized void stopRunning() {
+		if (log.isDebugEnabled())
+			log.debug("Enter: SandeshaThread::stopRunning, " + this);
+
+		//NOTE: we do not take acount of pausing when stopping.
+		//The call to stop will wait until the invoker has exited the loop
+		if (isThreadStarted()) {
+			// the invoker is started so stop it
+			runThread = false;
+			// wait for it to finish
+			while (!hasStoppedRunning()) {
+				try {
+					wait(sleepTime);
+				} catch (InterruptedException e1) {
+					//ignore
+				}
+			}
+		}
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: SandeshaThread::stopRunning, " + this);
+	}
+	
+	public synchronized boolean isThreadStarted() {
+
+		if (!runThread && log.isDebugEnabled())
+			log.debug("SandeshaThread not started");	
+
+		return runThread;
+	}
+	
+
+	/**
+	 * Ensure that the worker thread is aware of the given sequence. As source sequences
+	 * do not have a proper sequence id at the time they are bootstrapped, the caller
+	 * must pass in the internal sequence id when rmSource is true.
+	 */
+	public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID, boolean rmSource){
+		if(log.isDebugEnabled()) 
+			log.debug("Entry: SandeshaThread::runThreadForSequence, " + this + ", " + sequenceID + ", " + rmSource);
+
+		SequenceEntry entry = new SequenceEntry(sequenceID, rmSource);
+		if (!workingSequences.contains(entry)) workingSequences.add(entry);
+		
+		if (!isThreadStarted()) {
+			if(log.isDebugEnabled()) log.debug("Starting thread");
+
+			this.context = context;
+			
+			// Get the axis2 thread pool
+			threadPool = context.getThreadPool();
+			
+			runThread = true; // so that isStarted()=true.
+			
+			super.start();
+			
+			// Set the SandeshaThread to have the same context classloader as the application
+			this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
+		} else {
+			if(log.isDebugEnabled()) log.debug("Waking thread");
+			wakeThread();
+		}
+
+		if(log.isDebugEnabled()) log.debug("Exit: SandeshaThread::runThreadForSequence");
+	}
+	
+	/**
+	 * 
+	 * @return a List of SequenceEntry instances
+	 */
+	public synchronized ArrayList getSequences() {
+		// Need to copy the list for thread safety
+		ArrayList result = new ArrayList();
+		result.addAll(workingSequences);
+		return result;
+	}
+
+	protected synchronized boolean hasStoppedRunning() {
+		return hasStoppedRunning;
+	}
+	
+	protected synchronized void doPauseIfNeeded(){
+		//see if we need to pause
+			
+			while(pauseRequired){
+				if(!hasPausedRunning){
+					//let the requester of this pause know we are now pausing
+				  hasPausedRunning = true;
+				  notifyAll();						
+				}
+				//now we pause
+			  try{
+			  	wait(sleepTime);
+			  }catch(InterruptedException e){
+			  	//ignore
+			  }
+			}//end while
+			//the request to pause has finished so we are no longer pausing
+			hasPausedRunning = false;
+	}
+
+	/**
+	 * Wake the current thread as there is work to be done.
+	 * Also flag that if we miss a notify, then there is 
+	 * work to be done.  Implementing threads should check this value
+	 * before waiting
+	 *
+	 */
+	public synchronized void wakeThread() {
+		reRunThread = true;
+		
+		if (!hasPausedRunning)
+			notify();
+	}
+	
+	/**
+	 * Indicate that the main loop has been run	 
+	 */
+	public synchronized void setRanMainLoop() {
+		reRunThread = false;
+	}
+	
+	/**
+	 * Test to check if a notify has been called when not waiting
+	 * 
+	 * @return
+	 */
+	protected synchronized boolean runMainLoop () {
+		return reRunThread;
+	}
+	
+	/**
+	 * The main work loop, to be implemented by any child class. If the child wants
+	 * to sleep before the next loop then they should return true.
+	 */
+	protected abstract boolean internalRun();
+	
+	public void run() {
+		try {
+			boolean sleep = false;
+
+			while (isThreadStarted()) {
+				try {
+					synchronized (this) {		
+						if(sleep && !runMainLoop()) wait(sleepTime);
+						// Indicate that we are running the main loop
+						setRanMainLoop();
+					}
+				} catch (InterruptedException e1) {
+					log.debug("SandeshaThread was interupted...");
+					log.debug(e1.getMessage());
+					log.debug("End printing Interrupt...");
+				}
+
+				//pause if we have to
+				doPauseIfNeeded();
+
+				// Ensure we have context and a storage manager
+				if (context == null) {
+					String message = SandeshaMessageHelper
+							.getMessage(SandeshaMessageKeys.configContextNotSet);
+					message = SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.cannotCointinueSender, message);
+					log.debug(message);
+					throw new RuntimeException(message);
+				}
+
+				if(storageManager == null) {
+					try {
+						storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+					} catch (SandeshaException e2) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotCointinueSender, e2.toString());
+						log.debug(message);
+						throw new RuntimeException(message);
+					}
+				}
+
+				// Call into the real function
+				sleep = internalRun();
+			}
+		} finally {
+			// flag that we have exited the run loop and notify any waiting
+			// threads
+			synchronized (this) {
+				if(log.isDebugEnabled()) log.debug("SandeshaThread really stopping " + this);
+				hasStoppedRunning = true;
+				notify();
+			}
+		}
+	}
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,23 @@
+package org.apache.sandesha2.workers;
+
+public class SandeshaWorker {
+	WorkerLock lock = null;
+	String workId = null;
+	
+	public WorkerLock getLock() {
+		return lock;
+	}
+	public void setLock(WorkerLock lock) {
+		this.lock = lock;
+	}
+	public String getWorkId() {
+		return workId;
+	}
+	public void setWorkId(String workId) {
+		this.workId = workId;
+	}
+	
+
+	
+	
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,335 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+
+/**
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
+ */
+
+public class Sender extends SandeshaThread {
+
+	private static final Log log = LogFactory.getLog(Sender.class);
+
+	// If this sender is working for several sequences, we use round-robin to
+	// try and give them all a chance to invoke messages.
+	int nextIndex = 0;
+	boolean processedMessage = false;
+	
+	public Sender () {
+		super(Sandesha2Constants.SENDER_SLEEP_TIME);
+	}
+
+	protected boolean internalRun() {
+		if (log.isDebugEnabled()) log.debug("Enter: Sender::internalRun");
+
+		Transaction transaction = null;
+		boolean sleep = false;
+
+		try {
+			// Pick a sequence using a round-robin approach
+			ArrayList allSequencesList = getSequences();
+			int size = allSequencesList.size();
+			if (log.isDebugEnabled())
+				log.debug("Choosing one from " + size + " sequences");
+			if(nextIndex >= size) {
+				nextIndex = 0;
+
+				// We just looped over the set of sequences. If we didn't process any
+				// messages on this loop then we sleep before the next one
+				if(size == 0 || !processedMessage) {
+					sleep = true;
+				}
+				processedMessage = false;
+				
+				// At this point - delete any sequences that have timed out, or been terminated.
+				deleteTerminatedSequences(storageManager);
+				
+				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
+				return sleep;
+			}
+
+			SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+			String sequenceId = entry.getSequenceId();
+			if (log.isDebugEnabled())
+				log.debug("Chose sequence " + sequenceId);
+
+			transaction = storageManager.getTransaction();
+
+			// Check that the sequence is still valid
+			boolean found = false;
+			if(entry.isRmSource()) {
+				RMSBean matcher = new RMSBean();
+				matcher.setInternalSequenceID(sequenceId);
+				matcher.setTerminated(false);
+				RMSBean rms = storageManager.getRMSBeanMgr().findUnique(matcher);
+				if(rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
+					sequenceId = rms.getSequenceID();					
+					if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))					
+						SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, storageManager);
+					else
+						found = true;
+				}
+			} else {
+				RMDBean matcher = new RMDBean();
+				matcher.setSequenceID(sequenceId);
+				matcher.setTerminated(false);
+				RMDBean rmd = storageManager.getRMDBeanMgr().findUnique(matcher);
+				if(rmd != null) found = true;
+			}
+			if (!found) {
+				stopThreadForSequence(sequenceId, entry.isRmSource());
+				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, sequence has ended");
+				return false;
+			}
+			
+			SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
+			SenderBean senderBean = mgr.getNextMsgToSend(sequenceId);
+			
+			if (senderBean == null) {
+				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, no message for this sequence");
+				return false; // Move on to the next sequence in the list
+			}
+
+			// work Id is used to define the piece of work that will be
+			// assigned to the Worker thread,
+			// to handle this Sender bean.
+			
+			//workId contains a timeTiSend part to cater for retransmissions.
+			//This will cause retransmissions to be treated as new work.
+			String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
+
+			// check weather the bean is already assigned to a worker.
+			if (getWorkerLock().isWorkPresent(workId)) {
+				// As there is already a worker running we are probably looping
+				// too fast, so sleep on the next loop.
+				if (log.isDebugEnabled()) {
+					String message = SandeshaMessageHelper.getMessage(
+									SandeshaMessageKeys.workAlreadyAssigned,
+									workId);
+					log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
+				}
+				return true;
+			}
+
+			if(transaction != null) {
+				transaction.commit();
+				transaction = null;
+			}
+
+			// start a worker which will work on this messages.
+			SenderWorker worker = new SenderWorker(context, senderBean);
+			worker.setLock(getWorkerLock());
+			worker.setWorkId(workId);
+			threadPool.execute(worker);
+
+			// adding the workId to the lock after assigning it to a thread
+			// makes sure
+			// that all the workIds in the Lock are handled by threads.
+			getWorkerLock().addWork(workId);
+
+			// If we got to here then we found work to do on the sequence, so we should
+			// remember not to sleep at the end of the list of sequences.
+			processedMessage = true;
+
+		} catch (Exception e) {
+
+			// TODO : when this is the client side throw the exception to
+			// the client when necessary.
+
+			
+			//TODO rollback only if a SandeshaStorageException.
+			//This allows the other Exceptions to be used within the Normal flow.
+			
+			if (transaction != null) {
+				try {
+					transaction.rollback();
+					transaction = null;
+				} catch (Exception e1) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
+							.toString());
+					log.debug(message, e1);
+				}
+			}
+
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
+
+			log.debug(message, e);
+		} finally {
+			if (transaction != null) {
+				try {
+					transaction.commit();
+					transaction = null;
+				} catch (Exception e) {
+					String message = SandeshaMessageHelper
+							.getMessage(SandeshaMessageKeys.commitError, e.toString());
+					log.debug(message, e);
+				}
+			}
+		}
+		if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, not sleeping");
+		return false;
+	}
+
+	/**
+	 * Finds any RMDBeans that have not been used inside the set InnactivityTimeoutInterval
+	 * 
+	 * Iterates through RMSBeans and RMDBeans that have been terminated or timed out and 
+	 * deletes them.
+	 *
+	 */
+	private void deleteTerminatedSequences(StorageManager storageManager) {
+		if (log.isDebugEnabled()) 
+			log.debug("Enter: Sender::deleteTerminatedSequences");
+
+		RMSBean finderBean = new RMSBean();
+		finderBean.setTerminated(true);
+		
+		Transaction transaction = storageManager.getTransaction();
+		
+		try {
+			
+			SandeshaPolicyBean propertyBean = 
+				SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());			
+
+    	long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
+    	if (deleteTime < 0)
+    		deleteTime = 0;
+
+    	if (deleteTime > 0) {
+				// Find terminated sequences.
+		    List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+		    
+		    deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+		    
+		    finderBean.setTerminated(false);
+		    finderBean.setTimedOut(true);
+		    
+		    // Find timed out sequences
+		    rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+		    	    
+		    deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+		    
+		    // Remove any terminated RMDBeans.
+		    RMDBean finderRMDBean = new RMDBean();
+		    finderRMDBean.setTerminated(true);
+		    
+		    List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+	
+		    Iterator beans = rmdBeans.iterator();
+		    while (beans.hasNext()) {
+		    	RMDBean rmdBean = (RMDBean)beans.next();
+		    	
+		    	long timeNow = System.currentTimeMillis();
+		    	long lastActivated = rmdBean.getLastActivatedTime();
+	
+		    	// delete sequences that have been timedout or deleted for more than 
+		    	// the SequenceRemovalTimeoutInterval
+		    	if ((lastActivated + deleteTime) < timeNow) {
+		    		if (log.isDebugEnabled())
+		    			log.debug("Deleting RMDBean " + deleteTime + " : " + rmdBean);
+		    		storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
+		    	}	    		    	
+		    }
+    	}
+
+	    // Terminate RMD Sequences that have been inactive.			
+			if (propertyBean.getInactivityTimeoutInterval() > 0) {
+		    RMDBean finderRMDBean = new RMDBean();
+		    finderRMDBean.setTerminated(false);
+				
+		    List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+			
+		    Iterator beans = rmdBeans.iterator();
+		    while (beans.hasNext()) {
+		    	RMDBean rmdBean = (RMDBean)beans.next();
+		    	
+		    	long timeNow = System.currentTimeMillis();
+		    	long lastActivated = rmdBean.getLastActivatedTime();
+		    	
+		    	if ((lastActivated + propertyBean.getInactivityTimeoutInterval()) < timeNow) {
+		    		// Terminate
+		    		rmdBean.setTerminated(true);
+		    		rmdBean.setLastActivatedTime(timeNow);
+		    		if (log.isDebugEnabled())
+		    			log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + rmdBean);
+		    		storageManager.getRMDBeanMgr().update(rmdBean);
+		    	}	    		    	
+		    }
+			} 	    
+	    
+    } catch (SandeshaException e) {
+    	if (log.isErrorEnabled())
+    		log.error(e);
+    } finally {
+			transaction.commit();
+		}
+		
+		if (log.isDebugEnabled()) 
+			log.debug("Exit: Sender::deleteTerminatedSequences");
+	}
+	
+	private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime) 
+
+	throws SandeshaStorageException {		
+		if (log.isDebugEnabled()) 
+			log.debug("Enter: Sender::deleteRMSBeans");
+
+    Iterator beans = rmsBeans.iterator();
+    
+    while (beans.hasNext())
+    {
+    	RMSBean rmsBean = (RMSBean)beans.next();
+    	long timeNow = System.currentTimeMillis();
+    	long lastActivated = rmsBean.getLastActivatedTime();
+    	// delete sequences that have been timedout or deleted for more than 
+    	// the SequenceRemovalTimeoutInterval
+   	
+    	if ((lastActivated + deleteTime) < timeNow) {
+    		if (log.isDebugEnabled())
+    			log.debug("Removing RMSBean " + rmsBean);
+    		storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+    	}	    	
+    }
+
+		if (log.isDebugEnabled()) 
+			log.debug("Exit: Sender::deleteRMSBeans");
+	}
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org