You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/04/23 11:55:16 UTC
svn commit: r531400 [10/18] - in /webservices/sandesha/trunk/java/modules:
client/ core/ core/src/ core/src/main/ core/src/main/java/
core/src/main/java/org/ core/src/main/java/org/apache/
core/src/main/java/org/apache/sandesha2/ core/src/main/java/org...
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,380 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+/**
+ * Contains logic to remove all the storad data of a sequence. Methods of this
+ * are called by sending side and the receiving side when appropriate
+ */
+
+public class TerminateManager {
+
+ private static Log log = LogFactory.getLog(TerminateManager.class);
+
+ private static String CLEANED_ON_TERMINATE_MSG = "CleanedOnTerminateMsg";
+
+ private static String CLEANED_AFTER_INVOCATION = "CleanedAfterInvocation";
+
+ public static HashMap receivingSideCleanMap = new HashMap();
+
+ public static void checkAndTerminate(ConfigurationContext configurationContext, StorageManager storageManager, RMSBean rmsBean)
+ throws SandeshaStorageException, AxisFault {
+ if(log.isDebugEnabled()) log.debug("Enter: TerminateManager::checkAndTerminate " +rmsBean);
+
+ long lastOutMessage = rmsBean.getLastOutMessage ();
+
+ if (lastOutMessage > 0 && !rmsBean.isTerminateAdded()) {
+
+ boolean complete = AcknowledgementManager.verifySequenceCompletion(rmsBean.getClientCompletedMessages(), lastOutMessage);
+
+ //If this is RM 1.1 and RMAnonURI scenario, dont do the termination unless the response side createSequence has been
+ //received (RMDBean has been created) through polling, in this case termination will happen in the create sequence response processor.
+ String rmVersion = rmsBean.getRMVersion();
+ String replyToAddress = rmsBean.getReplyToEPR();
+
+ if (complete &&
+ Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmVersion) && SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
+ RMDBean findBean = new RMDBean ();
+ findBean.setPollingMode(true);
+ findBean.setToAddress(replyToAddress);
+
+ RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+ List beans = rmdBeanMgr.find(findBean);
+ if(beans.isEmpty()) {
+ rmsBean.setTerminationPauserForCS(true);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ complete = false;
+ }
+ }
+
+ // If we are doing sync 2-way over WSRM 1.0 then we may need to keep sending messages,
+ // so check to see if all the senders have been removed
+ EndpointReference replyTo = new EndpointReference (replyToAddress);
+ if (complete &&
+ Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && (replyToAddress==null || replyTo.hasAnonymousAddress())) {
+ SenderBean matcher = new SenderBean();
+ matcher.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ matcher.setSequenceID(rmsBean.getSequenceID());
+
+ List matches = storageManager.getSenderBeanMgr().find(matcher);
+ if(!matches.isEmpty()) complete = false;
+ }
+
+ if (complete) {
+
+ String referenceMsgKey = rmsBean.getReferenceMessageStoreKey();
+ if (referenceMsgKey==null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referenceMessageNotSetForSequence,rmsBean.getSequenceID());
+ throw new SandeshaException (message);
+ }
+
+ MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey, configurationContext);
+
+ if (referenceMessage==null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getSequenceID());
+ throw new SandeshaException (message);
+ }
+
+ RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
+ addTerminateSequenceMessage(referenceRMMsg, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+ }
+
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::checkAndTerminate");
+ }
+
+
+ /**
+ * Called by the receiving side to remove data related to a sequence. e.g.
+ * After sending the TerminateSequence message. Calling this methods will
+ * complete all the data if InOrder invocation is not sequired.
+ *
+ * @param configContext
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void cleanReceivingSideOnTerminateMessage(ConfigurationContext configContext, String sequenceId,
+ StorageManager storageManager) throws SandeshaException {
+
+ // clean senderMap
+
+ //removing any un-sent ack messages.
+ SenderBean findAckBean = new SenderBean ();
+ findAckBean.setSequenceID(sequenceId);
+ findAckBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+ Iterator ackBeans = senderBeanMgr.find(findAckBean).iterator();
+ while (ackBeans.hasNext()) {
+ SenderBean ackBean = (SenderBean) ackBeans.next();
+ senderBeanMgr.delete(ackBean.getMessageID());
+
+ storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
+ }
+
+ // Currently in-order invocation is done for default values.
+ boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(configContext.getAxisConfiguration())
+ .isInOrder();
+
+ if (!inOrderInvocation) {
+ // there is no invoking by Sandesha2. So clean invocations storages.
+
+ receivingSideCleanMap.put(sequenceId, CLEANED_ON_TERMINATE_MSG);
+ cleanReceivingSideAfterInvocation(sequenceId, storageManager);
+ } else {
+
+ String cleanStatus = (String) receivingSideCleanMap.get(sequenceId);
+ if (cleanStatus != null
+ && CLEANED_AFTER_INVOCATION.equals(cleanStatus))
+ // Remove the sequence from the map
+ receivingSideCleanMap.remove(sequenceId);
+ //completeTerminationOfReceivingSide(configContext,
+ // sequenceId, storageManager);
+ else
+ receivingSideCleanMap.put(sequenceId, CLEANED_ON_TERMINATE_MSG);
+ }
+ }
+
+ /**
+ * When InOrder invocation is anabled this had to be called to clean the
+ * data left by the above method. This had to be called after the Invocation
+ * of the Last Message.
+ *
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void cleanReceivingSideAfterInvocation(String sequenceId,
+ StorageManager storageManager) throws SandeshaException {
+ if(log.isDebugEnabled()) log.debug("Enter: TerminateManager::cleanReceivingSideAfterInvocation " +sequenceId);
+
+ InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
+
+ // removing InvokerBean entries
+ InvokerBean invokerFindBean = new InvokerBean();
+ invokerFindBean.setSequenceID(sequenceId);
+ Collection collection = invokerBeanMgr.find(invokerFindBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ InvokerBean invokerBean = (InvokerBean) iterator.next();
+ String messageStoreKey = invokerBean.getMessageContextRefKey();
+ invokerBeanMgr.delete(messageStoreKey);
+
+ // removing the respective message context from the message store.
+ storageManager.removeMessageContext(messageStoreKey);
+ }
+
+ String cleanStatus = (String) receivingSideCleanMap.get(sequenceId);
+ if (cleanStatus != null && CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
+ // Remove the sequence id from the map
+ receivingSideCleanMap.remove(sequenceId);
+ //completeTerminationOfReceivingSide(configContext, sequenceId, storageManager);
+ else
+ receivingSideCleanMap.put(sequenceId, CLEANED_AFTER_INVOCATION);
+
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::cleanReceivingSideAfterInvocation");
+ }
+
+ /**
+ * This is called by the sending side to clean data related to a sequence.
+ * e.g. after sending the TerminateSequence message.
+ *
+ * @param configContext
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void terminateSendingSide(RMSBean rmsBean,
+ StorageManager storageManager) throws SandeshaException {
+
+ // Indicate that the sequence is terminated
+ rmsBean.setTerminated(true);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager);
+ }
+
+ public static void timeOutSendingSideSequence(String internalSequenceId,
+ StorageManager storageManager) throws SandeshaException {
+
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+ rmsBean.setTimedOut(true);
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ cleanSendingSideData(internalSequenceId, storageManager);
+ }
+
+ private static void cleanSendingSideData(String internalSequenceId, StorageManager storageManager) throws SandeshaException {
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+ // removing retransmitterMgr entries and corresponding message contexts.
+ Collection collection = retransmitterBeanMgr.find(internalSequenceId);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SenderBean retransmitterBean = (SenderBean) iterator.next();
+ retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+ String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+ storageManager.removeMessageContext(messageStoreKey);
+ }
+ }
+
+ public static void addTerminateSequenceMessage(RMMsgContext referenceMessage, String internalSequenceID, String outSequenceId, StorageManager storageManager) throws AxisFault {
+
+ if(log.isDebugEnabled())
+ log.debug("Enter: TerminateManager::addTerminateSequenceMessage " + outSequenceId + ", " + internalSequenceID);
+
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
+
+ if (rmsBean.isTerminateAdded()) {
+ if(log.isDebugEnabled())
+ log.debug("Exit: TerminateManager::addTerminateSequenceMessage - terminate was added previously.");
+ return;
+ }
+
+ RMMsgContext terminateRMMessage = RMMsgCreator.createTerminateSequenceMessage(referenceMessage, rmsBean, storageManager);
+ terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+ terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+
+ //setting the To EPR.
+ //First try to get it from an Endpoint property.
+ //If not get it from the To property.
+
+ EndpointReference toEPR = null;
+
+ if (rmsBean.getOfferedEndPoint() != null)
+ toEPR = new EndpointReference (rmsBean.getOfferedEndPoint());
+
+ if (toEPR==null) {
+
+ if (rmsBean.getToEPR()!=null) {
+ toEPR = new EndpointReference(rmsBean.getToEPR());
+ if (toEPR == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+ throw new SandeshaException(message);
+ }
+ }
+ }
+
+ if (toEPR!=null)
+ terminateRMMessage.setTo(toEPR);
+
+ if (rmsBean.getReplyToEPR()!=null) {
+ terminateRMMessage.setReplyTo(new EndpointReference (rmsBean.getReplyToEPR()));
+ }
+
+ String rmVersion = rmsBean.getRMVersion();
+ terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+ terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+ if (rmsBean.getTransportTo() != null) {
+ terminateRMMessage.setProperty(Constants.Configuration.TRANSPORT_URL, rmsBean.getTransportTo());
+ }
+
+ terminateRMMessage.addSOAPEnvelope();
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean terminateBean = new SenderBean();
+ terminateBean.setInternalSequenceID(internalSequenceID);
+ terminateBean.setSequenceID(outSequenceId);
+ terminateBean.setMessageContextRefKey(key);
+ terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+
+ // Set a retransmitter lastSentTime so that terminate will be send with
+ // some delay.
+ // Otherwise this get send before return of the current request (ack).
+ // TODO: refine the terminate delay.
+ terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
+
+ terminateBean.setMessageID(terminateRMMessage.getMessageId());
+
+ // this will be set to true at the sender.
+ terminateBean.setSend(true);
+
+ terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+ Sandesha2Constants.VALUE_FALSE);
+
+ terminateBean.setReSend(false);
+
+ terminateBean.setSequenceID(outSequenceId);
+
+ terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+ terminateBean.setInternalSequenceID(internalSequenceID);
+
+
+ EndpointReference to = terminateRMMessage.getTo();
+ if (to!=null)
+ terminateBean.setToAddress(to.getAddress());
+
+ // If this message is targetted at an anonymous address then we must not have a transport
+ // ready for it, as the terminate sequence is not a reply.
+ if(to == null || to.hasAnonymousAddress())
+ terminateBean.setTransportAvailable(false);
+
+ rmsBean.setTerminateAdded(true);
+
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+
+ //the propertyKey of the ackMessage will be the propertyKey for the terminate message as well.
+// terminateRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY, sequencePropertyKey);
+
+ // / addTerminateSeqTransaction.commit();
+ SandeshaUtil.executeAndStore(terminateRMMessage, key);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
+
+
+ retramsmitterMgr.insert(terminateBean);
+
+ if(log.isDebugEnabled())
+ log.debug("Exit: TerminateManager::addTerminateSequenceMessage");
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,220 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+public class WSRMMessageSender {
+
+ private static final Log log = LogFactory.getLog(WSRMMessageSender.class);
+
+ private MessageContext msgContext;
+ private StorageManager storageManager;
+ private ConfigurationContext configurationContext;
+ private String toAddress;
+ private String sequenceKey;
+ private String internalSequenceID;
+ private boolean sequenceExists;
+ private String outSequenceID;
+ private String rmVersion;
+ private RMSBean rmsBean;
+
+ /**
+ * Extracts information from the rmMsgCtx specific for processing out messages
+ *
+ * @param rmMsgCtx
+ * @throws AxisFault
+ */
+ protected void setupOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: WSRMParentProcessor::setupOutMessage");
+
+ msgContext = rmMsgCtx.getMessageContext();
+ configurationContext = msgContext.getConfigurationContext();
+ Options options = msgContext.getOptions();
+
+ storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+ configurationContext.getAxisConfiguration());
+
+ internalSequenceID =
+ (String)rmMsgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+
+ toAddress = rmMsgCtx.getTo().getAddress();
+ sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+
+ if(internalSequenceID==null)
+ {
+ internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
+ }
+
+ // Does the sequence exist ?
+ sequenceExists = false;
+ outSequenceID = null;
+
+ // Get the RMSBean with the matching internal sequenceid
+ rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
+
+ if (rmsBean == null)
+ {
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::setupOutMessage Sequence doesn't exist");
+
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));
+ }
+
+ if (rmsBean.getSequenceID() != null)
+ {
+ sequenceExists = true;
+ outSequenceID = rmsBean.getSequenceID();
+ }
+ else
+ outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;
+
+ rmVersion = rmsBean.getRMVersion();
+ if (rmVersion == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::setupOutMessage");
+ }
+
+
+ protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
+
+ rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+ getMsgContext().setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ rmMsgCtx.setTo(new EndpointReference(toAddress));
+
+ String transportTo = rmsBean.getTransportTo();
+ if (transportTo != null) {
+ rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
+ }
+
+ //setting msg context properties
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
+
+ rmMsgCtx.addSOAPEnvelope();
+
+ // Ensure the outbound message us secured using the correct token
+ RMMsgCreator.secureOutboundMessage(getRMSBean(), msgContext);
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean senderBean = new SenderBean();
+ senderBean.setMessageType(msgType);
+ senderBean.setMessageContextRefKey(key);
+ senderBean.setTimeToSend(System.currentTimeMillis() + delay);
+ senderBean.setMessageID(msgContext.getMessageID());
+
+ // Set the internal sequence id and outgoing sequence id for the terminate message
+ senderBean.setInternalSequenceID(internalSequenceID);
+ if (sequenceExists)
+ {
+ senderBean.setSend(true);
+ senderBean.setSequenceID(outSequenceID);
+ }
+ else
+ senderBean.setSend(false);
+
+ EndpointReference to = msgContext.getTo();
+ if (to!=null)
+ senderBean.setToAddress(to.getAddress());
+
+ // If this message is targetted at an anonymous address then we must not have a transport
+ // ready for it, as the current message is not a reply.
+ if(to == null || to.hasAnonymousAddress())
+ senderBean.setTransportAvailable(false);
+
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ senderBean.setReSend(false);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
+
+ SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+ retramsmitterMgr.insert(senderBean);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
+
+ }
+
+
+ public final StorageManager getStorageManager() {
+ return storageManager;
+ }
+
+ public final String getInternalSequenceID() {
+ return internalSequenceID;
+ }
+
+ public final MessageContext getMsgContext() {
+ return msgContext;
+ }
+
+ public final String getOutSequenceID() {
+ return outSequenceID;
+ }
+
+ public final boolean isSequenceExists() {
+ return sequenceExists;
+ }
+
+ public final String getSequenceKey() {
+ return sequenceKey;
+ }
+
+ public final String getToAddress() {
+ return toAddress;
+ }
+
+ public final ConfigurationContext getConfigurationContext() {
+ return configurationContext;
+ }
+
+ public final String getRMVersion() {
+ return rmVersion;
+ }
+
+ public final RMSBean getRMSBean() {
+ return rmsBean;
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,363 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * This is used when InOrder invocation is required. This is a seperated Thread
+ * that keep running all the time. At each iteration it checks the InvokerTable
+ * to find weather there are any messages to me invoked.
+ */
+
+public class Invoker extends SandeshaThread {
+
+ private static final Log log = LogFactory.getLog(Invoker.class);
+
+ // If this invoker is working for several sequences, we use round-robin to
+ // try and give them all a chance to invoke messages.
+ int nextIndex = 0;
+ boolean processedMessage = false;
+
+ public Invoker() {
+ super(Sandesha2Constants.INVOKER_SLEEP_TIME);
+ }
+
+ /**
+ * Forces dispatch of queued messages to the application.
+ * NOTE: may break ordering
+ * @param ctx
+ * @param sequenceID
+ * @param allowLaterDeliveryOfMissingMessages if true, messages skipped over during this
+ * action will be invoked if they arrive on the system at a later time.
+ * Otherwise messages skipped over will be ignored
+ * @throws SandeshaException
+ */
+ public synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx,
+ String sequenceID,
+ boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
+ //first we block while we wait for the invoking thread to pause
+ blockForPause();
+ try{
+ //get all invoker beans for the sequence
+ InvokerBeanMgr storageMapMgr = storageManager
+ .getInvokerBeanMgr();
+ RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+ RMDBean rMDBean = rmdBeanMgr.retrieve(sequenceID);
+
+ if (rMDBean != null) {
+
+ //The outOfOrder window is the set of known sequence messages (including those
+ //that are missing) at the time the button is pressed.
+ long firstMessageInOutOfOrderWindow = rMDBean.getNextMsgNoToProcess();
+
+ InvokerBean selector = new InvokerBean();
+ selector.setSequenceID(sequenceID);
+ Iterator stMapIt = storageMapMgr.find(selector).iterator();
+
+ long highestMsgNumberInvoked = 0;
+ Transaction transaction = null;
+
+ //invoke each bean in turn.
+ //NOTE: here we are breaking ordering
+ while(stMapIt.hasNext()){
+ transaction = storageManager.getTransaction();
+ InvokerBean invoker = (InvokerBean)stMapIt.next();
+
+ //invoke the app
+ try{
+ // start a new worker thread and let it do the invocation.
+ String workId = sequenceID + "::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
+ //piece of work that will be assigned to the Worker.
+
+ String messageContextKey = invoker.getMessageContextRefKey();
+ InvokerWorker worker = new InvokerWorker(context,
+ messageContextKey,
+ true); //want to ignore the enxt msg number
+
+ worker.setLock(getWorkerLock());
+ worker.setWorkId(workId);
+
+ //before we execute we need to set the
+
+ threadPool.execute(worker);
+
+ //adding the workId to the lock after assigning it to a thread makes sure
+ //that all the workIds in the Lock are handled by threads.
+ getWorkerLock().addWork(workId);
+
+ long msgNumber = invoker.getMsgNo();
+ //if necessary, update the "next message number" bean under this transaction
+ if(msgNumber>highestMsgNumberInvoked){
+ highestMsgNumberInvoked = invoker.getMsgNo();
+ rMDBean.setNextMsgNoToProcess(highestMsgNumberInvoked+1);
+
+ if(allowLaterDeliveryOfMissingMessages){
+ //we also need to update the sequence OUT_OF_ORDER_RANGES property
+ //so as to include our latest view of this outOfOrder range.
+ //We do that here (rather than once at the end) so that we reamin
+ //transactionally consistent
+ Range r = new Range(firstMessageInOutOfOrderWindow,highestMsgNumberInvoked);
+
+ RangeString rangeString = null;
+ if(rMDBean.getOutOfOrderRanges()==null){
+ //insert a new blank one one
+ rangeString = new RangeString();
+ }
+ else{
+ rangeString = rMDBean.getOutOfOrderRanges();
+ }
+ //update the range String with the new value
+ rangeString.addRange(r);
+ rMDBean.setOutOfOrderRanges(rangeString);
+ }
+
+ rmdBeanMgr.update(rMDBean);
+ }
+
+ }
+ catch(Exception e){
+ if(transaction != null) {
+ transaction.rollback();
+ transaction = null;
+ }
+ } finally {
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+ }
+
+ }//end while
+ }
+ }
+ finally{
+ //restart the invoker
+ finishPause();
+ }
+ }
+
+ private void addOutOfOrderInvokerBeansToList(String sequenceID,
+ StorageManager storageManager, List list)throws SandeshaException{
+ if (log.isDebugEnabled())
+ log.debug("Enter: InOrderInvoker::addOutOfOrderInvokerBeansToList " + sequenceID + ", " + list);
+
+ RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
+
+ if(rmdBean != null && rmdBean.getOutOfOrderRanges() != null){
+ RangeString rangeString = rmdBean.getOutOfOrderRanges();
+ //we now have the set of ranges that can be delivered out of order.
+ //Look for any invokable message that lies in one of those ranges
+ InvokerBean selector = new InvokerBean();
+ selector.setSequenceID(sequenceID);
+ Iterator invokerBeansIterator =
+ storageManager.getInvokerBeanMgr().find(selector).iterator();
+
+ while(invokerBeansIterator.hasNext()){
+ InvokerBean invokerBean = (InvokerBean)invokerBeansIterator.next();
+
+ if(rangeString.isMessageNumberInRanges(invokerBean.getMsgNo())){
+ //an invoker bean that has not been deleted and lies in an out
+ //or order range - we can add this to the list
+ list.add(invokerBean);
+ }
+ }
+
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: InOrderInvoker::addOutOfOrderInvokerBeansToList");
+ }
+
+ protected boolean internalRun() {
+ if (log.isDebugEnabled()) log.debug("Enter: Invoker::internalRun");
+
+ boolean sleep = false;
+ Transaction transaction = null;
+
+ try {
+ RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
+
+ InvokerBeanMgr storageMapMgr = storageManager
+ .getInvokerBeanMgr();
+
+ transaction = storageManager.getTransaction();
+
+ // Pick a sequence using a round-robin approach
+ ArrayList allSequencesList = getSequences();
+ int size = allSequencesList.size();
+ log.debug("Choosing one from " + size + " sequences");
+ if(nextIndex >= size) {
+ nextIndex = 0;
+
+ // We just looped over the set of sequences. If we didn't process any
+ // messages on this loop then we sleep before the next one
+ if(size == 0 || !processedMessage) {
+ sleep = true;
+ }
+ processedMessage = false;
+
+ if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, looped over all sequences, sleep " + sleep);
+ return sleep;
+ }
+
+ SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+ String sequenceId = entry.getSequenceId();
+ log.debug("Chose sequence " + sequenceId);
+
+ RMDBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+ if (nextMsgBean == null) {
+ log.debug("Next message not set correctly. Removing invalid entry.");
+
+ stopThreadForSequence(sequenceId, entry.isRmSource());
+ allSequencesList = getSequences();
+ if (allSequencesList.size() == 0)
+ sleep = true;
+
+ if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, sleep " + sleep);
+ return sleep;
+ }
+
+ long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
+ if (nextMsgno <= 0) {
+ // Make sure we sleep on the next loop, so that we don't spin in a tight loop
+ sleep = true;
+ if (log.isDebugEnabled())
+ log.debug("Invalid Next Message Number " + nextMsgno);
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.invalidMsgNumber, Long
+ .toString(nextMsgno));
+ throw new SandeshaException(message);
+ }
+
+ InvokerBean selector = new InvokerBean();
+ selector.setSequenceID(sequenceId);
+ selector.setMsgNo(nextMsgno);
+ List invokerBeans = storageMapMgr.find(selector);
+
+ //add any msgs that belong to out of order windows
+ addOutOfOrderInvokerBeansToList(sequenceId,
+ storageManager, invokerBeans);
+
+ // If there aren't any beans to process then move on to the next sequence
+ if (invokerBeans.size() == 0) {
+ if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep " + sleep);
+ return sleep;
+ }
+
+ Iterator stMapIt = invokerBeans.iterator();
+
+ //TODO correct the locking mechanism to have one lock per sequence.
+ //TODO should this be a while, not an if?
+ if (stMapIt.hasNext()) { //some invokation work is present
+
+ InvokerBean bean = (InvokerBean) stMapIt.next();
+ //see if this is an out of order msg
+ boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
+
+ String workId = sequenceId + "::" + bean.getMsgNo();
+ //creating a workId to uniquely identify the
+ //piece of work that will be assigned to the Worker.
+
+ //check whether the bean is already assigned to a worker.
+ if (getWorkerLock().isWorkPresent(workId)) {
+ // As there is already a worker assigned we are probably dispatching
+ // messages too quickly, so we sleep before trying the next sequence.
+ sleep = true;
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
+ if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
+ return sleep;
+ }
+
+ String messageContextKey = bean.getMessageContextRefKey();
+
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+
+ // start a new worker thread and let it do the invocation.
+ InvokerWorker worker = new InvokerWorker(context,
+ messageContextKey,
+ beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
+ //out of order message
+
+ worker.setLock(getWorkerLock());
+ worker.setWorkId(workId);
+
+ threadPool.execute(worker);
+
+ //adding the workId to the lock after assigning it to a thread makes sure
+ //that all the workIds in the Lock are handled by threads.
+ getWorkerLock().addWork(workId);
+
+ processedMessage = true;
+ }
+ } catch (Exception e) {
+ if (transaction != null) {
+ try {
+ transaction.rollback();
+ transaction = null;
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.rollbackError, e1
+ .toString());
+ log.debug(message, e1);
+ }
+ }
+ String message = SandeshaMessageHelper
+ .getMessage(SandeshaMessageKeys.invokeMsgError);
+ log.debug(message, e);
+ } finally {
+ if (transaction != null) {
+ try {
+ transaction.commit();
+ transaction = null;
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: InOrderInvoker::internalRun");
+ return sleep;
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,215 @@
+package org.apache.sandesha2.workers;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+public class InvokerWorker extends SandeshaWorker implements Runnable {
+
+ ConfigurationContext configurationContext = null;
+ String messageContextKey;
+ boolean ignoreNextMsg = false;
+
+ Log log = LogFactory.getLog(InvokerWorker.class);
+
+ public InvokerWorker (ConfigurationContext configurationContext, String messageContextKey, boolean ignoreNextMsg) {
+ this.configurationContext = configurationContext;
+ this.messageContextKey = messageContextKey;
+ this.ignoreNextMsg = ignoreNextMsg;
+ }
+
+ public void run() {
+ if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run");
+
+ Transaction transaction = null;
+ MessageContext msgToInvoke = null;
+
+ try {
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
+
+ //starting a transaction
+ transaction = storageManager.getTransaction();
+
+ InvokerBean invokerBean = invokerBeanMgr.retrieve(messageContextKey);
+
+ msgToInvoke = storageManager.retrieveMessageContext(messageContextKey, configurationContext);
+ RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
+
+ // ending the transaction before invocation.
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+
+ //starting a transaction for the invocation work.
+ transaction = storageManager.getTransaction();
+ // Depending on the transaction support, the service will be invoked only once.
+ // Therefore we delete the invoker bean and message now, ahead of time
+ invokerBeanMgr.delete(messageContextKey);
+ // removing the corresponding message context as well.
+ storageManager.removeMessageContext(messageContextKey);
+
+ try {
+
+ boolean postFailureInvocation = false;
+
+ // StorageManagers should st following property to
+ // true, to indicate that the message received comes
+ // after a failure.
+ String postFaulureProperty = (String) msgToInvoke
+ .getProperty(Sandesha2Constants.POST_FAILURE_MESSAGE);
+ if (postFaulureProperty != null
+ && Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
+ postFailureInvocation = true;
+
+ AxisEngine engine = new AxisEngine(msgToInvoke.getConfigurationContext());
+ if (postFailureInvocation) {
+ makeMessageReadyForReinjection(msgToInvoke);
+ if (log.isDebugEnabled())
+ log.debug("Receiving message, key=" + messageContextKey + ", msgCtx="
+ + msgToInvoke.getEnvelope().getHeader());
+ engine.receive(msgToInvoke);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Resuming message, key=" + messageContextKey + ", msgCtx="
+ + msgToInvoke.getEnvelope().getHeader());
+ msgToInvoke.setPaused(false);
+ engine.resumeReceive(msgToInvoke);
+ }
+
+ if(transaction!=null){
+ transaction.commit();
+ transaction = storageManager.getTransaction();
+ }
+
+ } catch (Exception e) {
+ if (log.isDebugEnabled())
+ log.debug("Exception :", e);
+ if(transaction!=null){
+ transaction.rollback();
+ transaction = storageManager.getTransaction();
+ }
+ handleFault(rmMsg, e);
+ }
+
+ if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+ Sequence sequence = (Sequence) rmMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+
+ boolean highestMessage = false;
+ if (sequence.getLastMessage() != null) {
+ //this will work for RM 1.0 only
+ highestMessage = true;
+ } else {
+ RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, invokerBean.getSequenceID());
+
+ if (rmdBean!=null && rmdBean.isTerminated()) {
+ long highestInMsgNo = rmdBean.getHighestInMessageNumber();
+ if (invokerBean.getMsgNo()==highestInMsgNo)
+ highestMessage = true;
+ }
+ }
+
+ if (highestMessage) {
+ //do cleaning stuff that hs to be done after the invocation of the last message.
+ TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), storageManager);
+ // exit from current iteration. (since an entry
+ // was removed)
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run Last message return");
+ return;
+ }
+ }
+
+ if(!ignoreNextMsg){
+ // updating the next msg to invoke
+ RMDBean rMDBean = storageManager.getRMDBeanMgr().retrieve(invokerBean.getSequenceID());
+ long nextMsgNo = rMDBean.getNextMsgNoToProcess();
+
+ if (!(invokerBean.getMsgNo()==nextMsgNo)) {
+ String message = "Operated message number is different from the Next Message Number to invoke";
+ throw new SandeshaException (message);
+ }
+
+ nextMsgNo++;
+ rMDBean.setNextMsgNoToProcess(nextMsgNo);
+ storageManager.getRMDBeanMgr().update(rMDBean);
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled())
+ log.error(e.toString(), e);
+ if(transaction != null) {
+ transaction.rollback();
+ transaction = null;
+ }
+ } finally {
+ if (transaction!=null) transaction.commit();
+
+ if (workId !=null && lock!=null) {
+ lock.removeWork(workId);
+ }
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+ }
+
+ private void makeMessageReadyForReinjection(MessageContext messageContext) {
+ messageContext.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, null);
+ messageContext.getOptions().setMessageId(null);
+ messageContext.getOptions().setTo(null);
+ messageContext.getOptions().setAction(null);
+ messageContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, Sandesha2Constants.VALUE_TRUE);
+ }
+
+ private void handleFault(RMMsgContext inRMMsgContext, Exception e) {
+ MessageContext inMsgContext = inRMMsgContext.getMessageContext();
+ AxisEngine engine = new AxisEngine(inMsgContext.getConfigurationContext());
+ try {
+ MessageContext faultContext = MessageContextBuilder.createFaultMessageContext(inMsgContext, e);
+ // Copy some of the parameters to the new message context.
+ faultContext.setProperty(Constants.Configuration.CONTENT_TYPE, inMsgContext
+ .getProperty(Constants.Configuration.CONTENT_TYPE));
+
+ EndpointReference faultEPR = inRMMsgContext.getFaultTo();
+ if (faultEPR==null)
+ faultEPR = inRMMsgContext.getReplyTo();
+
+ //we handler the WSRM Anon InOut scenario differently here
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(inRMMsgContext.getRMSpecVersion())
+ && (faultEPR==null || faultEPR.hasAnonymousAddress())) {
+ RequestResponseTransport requestResponseTransport = (RequestResponseTransport) inRMMsgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ //this will cause the fault to be thrown out of thread waiting on this transport object.
+ AxisFault fault = new AxisFault ("Sandesha2 got a fault when doing the invocation", faultContext);
+ requestResponseTransport.signalFaultReady(fault);
+ } else
+ engine.sendFault(faultContext);
+
+ } catch (AxisFault e1) {
+ if (log.isErrorEnabled())
+ log.error("Unable to send fault message ", e1);
+ }
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,303 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * Aggregates pause and stop logic between sender and invoker threads.
+ */
+public abstract class SandeshaThread extends Thread{
+
+ private static final Log log = LogFactory.getLog(SandeshaThread.class);
+
+ private boolean runThread = false;
+ private boolean hasStoppedRunning = false;
+ private boolean hasPausedRunning = false;
+ private boolean pauseRequired = false;
+
+ private int sleepTime;
+ private WorkerLock lock = null;
+
+ private ArrayList workingSequences = new ArrayList();
+
+ protected transient ThreadFactory threadPool;
+ protected ConfigurationContext context = null;
+ protected StorageManager storageManager = null;
+ private boolean reRunThread;
+
+ public SandeshaThread(int sleepTime) {
+ this.sleepTime = sleepTime;
+ lock = new WorkerLock ();
+ }
+
+ public final WorkerLock getWorkerLock() {
+ return lock;
+ }
+
+ public synchronized void stopThreadForSequence(String sequenceID, boolean rmSource){
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaThread::stopThreadForSequence, " + sequenceID);
+
+ // We do not actually stop the thread here, as the workers are smart enough
+ // to sleep when there is no work to do. If we were to exit the thread then
+ // we wouldn't be able to start back up when the thread gets some more work
+ // to do.
+ workingSequences.remove(new SequenceEntry(sequenceID, rmSource));
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaThread::stopThreadForSequence");
+ }
+
+ /**
+ * Waits for the invoking thread to pause
+ */
+ public synchronized void blockForPause(){
+ while(pauseRequired){
+ //someone else is requesting a pause - wait for them to finish
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+ }
+
+ //we can now request a pause - the next pause will be ours
+ pauseRequired = true;
+
+ if(hasStoppedRunning() || !isThreadStarted()){
+ throw new IllegalStateException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotPauseThread));
+ }
+ while(!hasPausedRunning){
+ //wait for our pause to come around
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+
+ }
+ //the sandesha thread is now paused
+ }
+
+ public synchronized void finishPause(){
+ //indicate that the current pause is no longer required.
+ pauseRequired = false;
+ notifyAll();
+ }
+
+ public synchronized void stopRunning() {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaThread::stopRunning, " + this);
+
+ //NOTE: we do not take acount of pausing when stopping.
+ //The call to stop will wait until the invoker has exited the loop
+ if (isThreadStarted()) {
+ // the invoker is started so stop it
+ runThread = false;
+ // wait for it to finish
+ while (!hasStoppedRunning()) {
+ try {
+ wait(sleepTime);
+ } catch (InterruptedException e1) {
+ //ignore
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaThread::stopRunning, " + this);
+ }
+
+ public synchronized boolean isThreadStarted() {
+
+ if (!runThread && log.isDebugEnabled())
+ log.debug("SandeshaThread not started");
+
+ return runThread;
+ }
+
+
+ /**
+ * Ensure that the worker thread is aware of the given sequence. As source sequences
+ * do not have a proper sequence id at the time they are bootstrapped, the caller
+ * must pass in the internal sequence id when rmSource is true.
+ */
+ public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID, boolean rmSource){
+ if(log.isDebugEnabled())
+ log.debug("Entry: SandeshaThread::runThreadForSequence, " + this + ", " + sequenceID + ", " + rmSource);
+
+ SequenceEntry entry = new SequenceEntry(sequenceID, rmSource);
+ if (!workingSequences.contains(entry)) workingSequences.add(entry);
+
+ if (!isThreadStarted()) {
+ if(log.isDebugEnabled()) log.debug("Starting thread");
+
+ this.context = context;
+
+ // Get the axis2 thread pool
+ threadPool = context.getThreadPool();
+
+ runThread = true; // so that isStarted()=true.
+
+ super.start();
+
+ // Set the SandeshaThread to have the same context classloader as the application
+ this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
+ } else {
+ if(log.isDebugEnabled()) log.debug("Waking thread");
+ wakeThread();
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: SandeshaThread::runThreadForSequence");
+ }
+
+ /**
+ *
+ * @return a List of SequenceEntry instances
+ */
+ public synchronized ArrayList getSequences() {
+ // Need to copy the list for thread safety
+ ArrayList result = new ArrayList();
+ result.addAll(workingSequences);
+ return result;
+ }
+
+ protected synchronized boolean hasStoppedRunning() {
+ return hasStoppedRunning;
+ }
+
+ protected synchronized void doPauseIfNeeded(){
+ //see if we need to pause
+
+ while(pauseRequired){
+ if(!hasPausedRunning){
+ //let the requester of this pause know we are now pausing
+ hasPausedRunning = true;
+ notifyAll();
+ }
+ //now we pause
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+ }//end while
+ //the request to pause has finished so we are no longer pausing
+ hasPausedRunning = false;
+ }
+
+ /**
+ * Wake the current thread as there is work to be done.
+ * Also flag that if we miss a notify, then there is
+ * work to be done. Implementing threads should check this value
+ * before waiting
+ *
+ */
+ public synchronized void wakeThread() {
+ reRunThread = true;
+
+ if (!hasPausedRunning)
+ notify();
+ }
+
+ /**
+ * Indicate that the main loop has been run
+ */
+ public synchronized void setRanMainLoop() {
+ reRunThread = false;
+ }
+
+ /**
+ * Test to check if a notify has been called when not waiting
+ *
+ * @return
+ */
+ protected synchronized boolean runMainLoop () {
+ return reRunThread;
+ }
+
+ /**
+ * The main work loop, to be implemented by any child class. If the child wants
+ * to sleep before the next loop then they should return true.
+ */
+ protected abstract boolean internalRun();
+
+ public void run() {
+ try {
+ boolean sleep = false;
+
+ while (isThreadStarted()) {
+ try {
+ synchronized (this) {
+ if(sleep && !runMainLoop()) wait(sleepTime);
+ // Indicate that we are running the main loop
+ setRanMainLoop();
+ }
+ } catch (InterruptedException e1) {
+ log.debug("SandeshaThread was interupted...");
+ log.debug(e1.getMessage());
+ log.debug("End printing Interrupt...");
+ }
+
+ //pause if we have to
+ doPauseIfNeeded();
+
+ // Ensure we have context and a storage manager
+ if (context == null) {
+ String message = SandeshaMessageHelper
+ .getMessage(SandeshaMessageKeys.configContextNotSet);
+ message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.cannotCointinueSender, message);
+ log.debug(message);
+ throw new RuntimeException(message);
+ }
+
+ if(storageManager == null) {
+ try {
+ storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+ } catch (SandeshaException e2) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotCointinueSender, e2.toString());
+ log.debug(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ // Call into the real function
+ sleep = internalRun();
+ }
+ } finally {
+ // flag that we have exited the run loop and notify any waiting
+ // threads
+ synchronized (this) {
+ if(log.isDebugEnabled()) log.debug("SandeshaThread really stopping " + this);
+ hasStoppedRunning = true;
+ notify();
+ }
+ }
+ }
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,23 @@
+package org.apache.sandesha2.workers;
+
+public class SandeshaWorker {
+ WorkerLock lock = null;
+ String workId = null;
+
+ public WorkerLock getLock() {
+ return lock;
+ }
+ public void setLock(WorkerLock lock) {
+ this.lock = lock;
+ }
+ public String getWorkId() {
+ return workId;
+ }
+ public void setWorkId(String workId) {
+ this.workId = workId;
+ }
+
+
+
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,335 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+
+/**
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
+ */
+
+public class Sender extends SandeshaThread {
+
+ private static final Log log = LogFactory.getLog(Sender.class);
+
+ // If this sender is working for several sequences, we use round-robin to
+ // try and give them all a chance to invoke messages.
+ int nextIndex = 0;
+ boolean processedMessage = false;
+
+ public Sender () {
+ super(Sandesha2Constants.SENDER_SLEEP_TIME);
+ }
+
+ protected boolean internalRun() {
+ if (log.isDebugEnabled()) log.debug("Enter: Sender::internalRun");
+
+ Transaction transaction = null;
+ boolean sleep = false;
+
+ try {
+ // Pick a sequence using a round-robin approach
+ ArrayList allSequencesList = getSequences();
+ int size = allSequencesList.size();
+ if (log.isDebugEnabled())
+ log.debug("Choosing one from " + size + " sequences");
+ if(nextIndex >= size) {
+ nextIndex = 0;
+
+ // We just looped over the set of sequences. If we didn't process any
+ // messages on this loop then we sleep before the next one
+ if(size == 0 || !processedMessage) {
+ sleep = true;
+ }
+ processedMessage = false;
+
+ // At this point - delete any sequences that have timed out, or been terminated.
+ deleteTerminatedSequences(storageManager);
+
+ if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
+ return sleep;
+ }
+
+ SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+ String sequenceId = entry.getSequenceId();
+ if (log.isDebugEnabled())
+ log.debug("Chose sequence " + sequenceId);
+
+ transaction = storageManager.getTransaction();
+
+ // Check that the sequence is still valid
+ boolean found = false;
+ if(entry.isRmSource()) {
+ RMSBean matcher = new RMSBean();
+ matcher.setInternalSequenceID(sequenceId);
+ matcher.setTerminated(false);
+ RMSBean rms = storageManager.getRMSBeanMgr().findUnique(matcher);
+ if(rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
+ sequenceId = rms.getSequenceID();
+ if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))
+ SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, storageManager);
+ else
+ found = true;
+ }
+ } else {
+ RMDBean matcher = new RMDBean();
+ matcher.setSequenceID(sequenceId);
+ matcher.setTerminated(false);
+ RMDBean rmd = storageManager.getRMDBeanMgr().findUnique(matcher);
+ if(rmd != null) found = true;
+ }
+ if (!found) {
+ stopThreadForSequence(sequenceId, entry.isRmSource());
+ if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, sequence has ended");
+ return false;
+ }
+
+ SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
+ SenderBean senderBean = mgr.getNextMsgToSend(sequenceId);
+
+ if (senderBean == null) {
+ if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, no message for this sequence");
+ return false; // Move on to the next sequence in the list
+ }
+
+ // work Id is used to define the piece of work that will be
+ // assigned to the Worker thread,
+ // to handle this Sender bean.
+
+ //workId contains a timeTiSend part to cater for retransmissions.
+ //This will cause retransmissions to be treated as new work.
+ String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
+
+ // check weather the bean is already assigned to a worker.
+ if (getWorkerLock().isWorkPresent(workId)) {
+ // As there is already a worker running we are probably looping
+ // too fast, so sleep on the next loop.
+ if (log.isDebugEnabled()) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.workAlreadyAssigned,
+ workId);
+ log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
+ }
+ return true;
+ }
+
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+
+ // start a worker which will work on this messages.
+ SenderWorker worker = new SenderWorker(context, senderBean);
+ worker.setLock(getWorkerLock());
+ worker.setWorkId(workId);
+ threadPool.execute(worker);
+
+ // adding the workId to the lock after assigning it to a thread
+ // makes sure
+ // that all the workIds in the Lock are handled by threads.
+ getWorkerLock().addWork(workId);
+
+ // If we got to here then we found work to do on the sequence, so we should
+ // remember not to sleep at the end of the list of sequences.
+ processedMessage = true;
+
+ } catch (Exception e) {
+
+ // TODO : when this is the client side throw the exception to
+ // the client when necessary.
+
+
+ //TODO rollback only if a SandeshaStorageException.
+ //This allows the other Exceptions to be used within the Normal flow.
+
+ if (transaction != null) {
+ try {
+ transaction.rollback();
+ transaction = null;
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
+ .toString());
+ log.debug(message, e1);
+ }
+ }
+
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
+
+ log.debug(message, e);
+ } finally {
+ if (transaction != null) {
+ try {
+ transaction.commit();
+ transaction = null;
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper
+ .getMessage(SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
+ }
+ }
+ }
+ if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, not sleeping");
+ return false;
+ }
+
+ /**
+ * Finds any RMDBeans that have not been used inside the set InnactivityTimeoutInterval
+ *
+ * Iterates through RMSBeans and RMDBeans that have been terminated or timed out and
+ * deletes them.
+ *
+ */
+ private void deleteTerminatedSequences(StorageManager storageManager) {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::deleteTerminatedSequences");
+
+ RMSBean finderBean = new RMSBean();
+ finderBean.setTerminated(true);
+
+ Transaction transaction = storageManager.getTransaction();
+
+ try {
+
+ SandeshaPolicyBean propertyBean =
+ SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
+
+ long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
+ if (deleteTime < 0)
+ deleteTime = 0;
+
+ if (deleteTime > 0) {
+ // Find terminated sequences.
+ List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+
+ deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+
+ finderBean.setTerminated(false);
+ finderBean.setTimedOut(true);
+
+ // Find timed out sequences
+ rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+
+ deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+
+ // Remove any terminated RMDBeans.
+ RMDBean finderRMDBean = new RMDBean();
+ finderRMDBean.setTerminated(true);
+
+ List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+ Iterator beans = rmdBeans.iterator();
+ while (beans.hasNext()) {
+ RMDBean rmdBean = (RMDBean)beans.next();
+
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmdBean.getLastActivatedTime();
+
+ // delete sequences that have been timedout or deleted for more than
+ // the SequenceRemovalTimeoutInterval
+ if ((lastActivated + deleteTime) < timeNow) {
+ if (log.isDebugEnabled())
+ log.debug("Deleting RMDBean " + deleteTime + " : " + rmdBean);
+ storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
+ }
+ }
+ }
+
+ // Terminate RMD Sequences that have been inactive.
+ if (propertyBean.getInactivityTimeoutInterval() > 0) {
+ RMDBean finderRMDBean = new RMDBean();
+ finderRMDBean.setTerminated(false);
+
+ List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+ Iterator beans = rmdBeans.iterator();
+ while (beans.hasNext()) {
+ RMDBean rmdBean = (RMDBean)beans.next();
+
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmdBean.getLastActivatedTime();
+
+ if ((lastActivated + propertyBean.getInactivityTimeoutInterval()) < timeNow) {
+ // Terminate
+ rmdBean.setTerminated(true);
+ rmdBean.setLastActivatedTime(timeNow);
+ if (log.isDebugEnabled())
+ log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + rmdBean);
+ storageManager.getRMDBeanMgr().update(rmdBean);
+ }
+ }
+ }
+
+ } catch (SandeshaException e) {
+ if (log.isErrorEnabled())
+ log.error(e);
+ } finally {
+ transaction.commit();
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::deleteTerminatedSequences");
+ }
+
+ private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime)
+
+ throws SandeshaStorageException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::deleteRMSBeans");
+
+ Iterator beans = rmsBeans.iterator();
+
+ while (beans.hasNext())
+ {
+ RMSBean rmsBean = (RMSBean)beans.next();
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmsBean.getLastActivatedTime();
+ // delete sequences that have been timedout or deleted for more than
+ // the SequenceRemovalTimeoutInterval
+
+ if ((lastActivated + deleteTime) < timeNow) {
+ if (log.isDebugEnabled())
+ log.debug("Removing RMSBean " + rmsBean);
+ storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::deleteRMSBeans");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org