You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by di...@apache.org on 2007/08/09 02:58:01 UTC
svn commit: r564063 [6/16] - in /webservices/sandesha/trunk/java: ./
modules/client/ modules/core/
modules/core/src/main/java/org/apache/sandesha2/
modules/core/src/main/java/org/apache/sandesha2/client/
modules/core/src/main/java/org/apache/sandesha2/...
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Wed Aug 8 17:57:51 2007
@@ -1,380 +1,380 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.sandesha2.util;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.storage.SandeshaStorageException;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.RMSBean;
-import org.apache.sandesha2.storage.beans.InvokerBean;
-import org.apache.sandesha2.storage.beans.RMDBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
-
-/**
- * Contains logic to remove all the storad data of a sequence. Methods of this
- * are called by sending side and the receiving side when appropriate
- */
-
-public class TerminateManager {
-
- private static Log log = LogFactory.getLog(TerminateManager.class);
-
- private static String CLEANED_ON_TERMINATE_MSG = "CleanedOnTerminateMsg";
-
- private static String CLEANED_AFTER_INVOCATION = "CleanedAfterInvocation";
-
- public static HashMap receivingSideCleanMap = new HashMap();
-
- public static void checkAndTerminate(ConfigurationContext configurationContext, StorageManager storageManager, RMSBean rmsBean)
- throws SandeshaStorageException, AxisFault {
- if(log.isDebugEnabled()) log.debug("Enter: TerminateManager::checkAndTerminate " +rmsBean);
-
- long lastOutMessage = rmsBean.getLastOutMessage ();
-
- if (lastOutMessage > 0 && !rmsBean.isTerminateAdded()) {
-
- boolean complete = AcknowledgementManager.verifySequenceCompletion(rmsBean.getClientCompletedMessages(), lastOutMessage);
-
- //If this is RM 1.1 and RMAnonURI scenario, dont do the termination unless the response side createSequence has been
- //received (RMDBean has been created) through polling, in this case termination will happen in the create sequence response processor.
- String rmVersion = rmsBean.getRMVersion();
- String replyToAddress = rmsBean.getReplyToEPR();
-
- if (complete &&
- Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmVersion) && SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
- RMDBean findBean = new RMDBean ();
- findBean.setPollingMode(true);
- findBean.setToAddress(replyToAddress);
-
- RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
- List beans = rmdBeanMgr.find(findBean);
- if(beans.isEmpty()) {
- rmsBean.setTerminationPauserForCS(true);
- storageManager.getRMSBeanMgr().update(rmsBean);
- complete = false;
- }
- }
-
- // If we are doing sync 2-way over WSRM 1.0 then we may need to keep sending messages,
- // so check to see if all the senders have been removed
- EndpointReference replyTo = new EndpointReference (replyToAddress);
- if (complete &&
- Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && (replyToAddress==null || replyTo.hasAnonymousAddress())) {
- SenderBean matcher = new SenderBean();
- matcher.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
- matcher.setSequenceID(rmsBean.getSequenceID());
-
- List matches = storageManager.getSenderBeanMgr().find(matcher);
- if(!matches.isEmpty()) complete = false;
- }
-
- if (complete) {
-
- String referenceMsgKey = rmsBean.getReferenceMessageStoreKey();
- if (referenceMsgKey==null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referenceMessageNotSetForSequence,rmsBean.getSequenceID());
- throw new SandeshaException (message);
- }
-
- MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey, configurationContext);
-
- if (referenceMessage==null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getSequenceID());
- throw new SandeshaException (message);
- }
-
- RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
- addTerminateSequenceMessage(referenceRMMsg, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
- }
-
- }
-
- if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::checkAndTerminate");
- }
-
-
- /**
- * Called by the receiving side to remove data related to a sequence. e.g.
- * After sending the TerminateSequence message. Calling this methods will
- * complete all the data if InOrder invocation is not sequired.
- *
- * @param configContext
- * @param sequenceID
- * @throws SandeshaException
- */
- public static void cleanReceivingSideOnTerminateMessage(ConfigurationContext configContext, String sequenceId,
- StorageManager storageManager) throws SandeshaException {
-
- // clean senderMap
-
- //removing any un-sent ack messages.
- SenderBean findAckBean = new SenderBean ();
- findAckBean.setSequenceID(sequenceId);
- findAckBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-
- SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
- Iterator ackBeans = senderBeanMgr.find(findAckBean).iterator();
- while (ackBeans.hasNext()) {
- SenderBean ackBean = (SenderBean) ackBeans.next();
- senderBeanMgr.delete(ackBean.getMessageID());
-
- storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
- }
-
- // Currently in-order invocation is done for default values.
- boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(configContext.getAxisConfiguration())
- .isInOrder();
-
- if (!inOrderInvocation) {
- // there is no invoking by Sandesha2. So clean invocations storages.
-
- receivingSideCleanMap.put(sequenceId, CLEANED_ON_TERMINATE_MSG);
- cleanReceivingSideAfterInvocation(sequenceId, storageManager);
- } else {
-
- String cleanStatus = (String) receivingSideCleanMap.get(sequenceId);
- if (cleanStatus != null
- && CLEANED_AFTER_INVOCATION.equals(cleanStatus))
- // Remove the sequence from the map
- receivingSideCleanMap.remove(sequenceId);
- //completeTerminationOfReceivingSide(configContext,
- // sequenceId, storageManager);
- else
- receivingSideCleanMap.put(sequenceId, CLEANED_ON_TERMINATE_MSG);
- }
- }
-
- /**
- * When InOrder invocation is anabled this had to be called to clean the
- * data left by the above method. This had to be called after the Invocation
- * of the Last Message.
- *
- * @param sequenceID
- * @throws SandeshaException
- */
- public static void cleanReceivingSideAfterInvocation(String sequenceId,
- StorageManager storageManager) throws SandeshaException {
- if(log.isDebugEnabled()) log.debug("Enter: TerminateManager::cleanReceivingSideAfterInvocation " +sequenceId);
-
- InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
-
- // removing InvokerBean entries
- InvokerBean invokerFindBean = new InvokerBean();
- invokerFindBean.setSequenceID(sequenceId);
- Collection collection = invokerBeanMgr.find(invokerFindBean);
- Iterator iterator = collection.iterator();
- while (iterator.hasNext()) {
- InvokerBean invokerBean = (InvokerBean) iterator.next();
- String messageStoreKey = invokerBean.getMessageContextRefKey();
- invokerBeanMgr.delete(messageStoreKey);
-
- // removing the respective message context from the message store.
- storageManager.removeMessageContext(messageStoreKey);
- }
-
- String cleanStatus = (String) receivingSideCleanMap.get(sequenceId);
- if (cleanStatus != null && CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
- // Remove the sequence id from the map
- receivingSideCleanMap.remove(sequenceId);
- //completeTerminationOfReceivingSide(configContext, sequenceId, storageManager);
- else
- receivingSideCleanMap.put(sequenceId, CLEANED_AFTER_INVOCATION);
-
- if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::cleanReceivingSideAfterInvocation");
- }
-
- /**
- * This is called by the sending side to clean data related to a sequence.
- * e.g. after sending the TerminateSequence message.
- *
- * @param configContext
- * @param sequenceID
- * @throws SandeshaException
- */
- public static void terminateSendingSide(RMSBean rmsBean,
- StorageManager storageManager) throws SandeshaException {
-
- // Indicate that the sequence is terminated
- rmsBean.setTerminated(true);
- storageManager.getRMSBeanMgr().update(rmsBean);
-
- cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager);
- }
-
- public static void timeOutSendingSideSequence(String internalSequenceId,
- StorageManager storageManager) throws SandeshaException {
-
- RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
- rmsBean.setTimedOut(true);
- rmsBean.setLastActivatedTime(System.currentTimeMillis());
- storageManager.getRMSBeanMgr().update(rmsBean);
-
- cleanSendingSideData(internalSequenceId, storageManager);
- }
-
- private static void cleanSendingSideData(String internalSequenceId, StorageManager storageManager) throws SandeshaException {
-
- SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
-
- // removing retransmitterMgr entries and corresponding message contexts.
- Collection collection = retransmitterBeanMgr.find(internalSequenceId);
- Iterator iterator = collection.iterator();
- while (iterator.hasNext()) {
- SenderBean retransmitterBean = (SenderBean) iterator.next();
- retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
-
- String messageStoreKey = retransmitterBean.getMessageContextRefKey();
- storageManager.removeMessageContext(messageStoreKey);
- }
- }
-
- public static void addTerminateSequenceMessage(RMMsgContext referenceMessage, String internalSequenceID, String outSequenceId, StorageManager storageManager) throws AxisFault {
-
- if(log.isDebugEnabled())
- log.debug("Enter: TerminateManager::addTerminateSequenceMessage " + outSequenceId + ", " + internalSequenceID);
-
- RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
-
- if (rmsBean.isTerminateAdded()) {
- if(log.isDebugEnabled())
- log.debug("Exit: TerminateManager::addTerminateSequenceMessage - terminate was added previously.");
- return;
- }
-
- RMMsgContext terminateRMMessage = RMMsgCreator.createTerminateSequenceMessage(referenceMessage, rmsBean, storageManager);
- terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
- terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-
- //setting the To EPR.
- //First try to get it from an Endpoint property.
- //If not get it from the To property.
-
- EndpointReference toEPR = null;
-
- if (rmsBean.getOfferedEndPoint() != null)
- toEPR = new EndpointReference (rmsBean.getOfferedEndPoint());
-
- if (toEPR==null) {
-
- if (rmsBean.getToEPR()!=null) {
- toEPR = new EndpointReference(rmsBean.getToEPR());
- if (toEPR == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
- throw new SandeshaException(message);
- }
- }
- }
-
- if (toEPR!=null)
- terminateRMMessage.setTo(toEPR);
-
- if (rmsBean.getReplyToEPR()!=null) {
- terminateRMMessage.setReplyTo(new EndpointReference (rmsBean.getReplyToEPR()));
- }
-
- String rmVersion = rmsBean.getRMVersion();
- terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
- terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
-
- if (rmsBean.getTransportTo() != null) {
- terminateRMMessage.setProperty(Constants.Configuration.TRANSPORT_URL, rmsBean.getTransportTo());
- }
-
- terminateRMMessage.addSOAPEnvelope();
-
- String key = SandeshaUtil.getUUID();
-
- SenderBean terminateBean = new SenderBean();
- terminateBean.setInternalSequenceID(internalSequenceID);
- terminateBean.setSequenceID(outSequenceId);
- terminateBean.setMessageContextRefKey(key);
- terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
-
- // Set a retransmitter lastSentTime so that terminate will be send with
- // some delay.
- // Otherwise this get send before return of the current request (ack).
- // TODO: refine the terminate delay.
- terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
-
- terminateBean.setMessageID(terminateRMMessage.getMessageId());
-
- // this will be set to true at the sender.
- terminateBean.setSend(true);
-
- terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
- Sandesha2Constants.VALUE_FALSE);
-
- terminateBean.setReSend(false);
-
- terminateBean.setSequenceID(outSequenceId);
-
- terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
- terminateBean.setInternalSequenceID(internalSequenceID);
-
-
- EndpointReference to = terminateRMMessage.getTo();
- if (to!=null)
- terminateBean.setToAddress(to.getAddress());
-
- // If this message is targetted at an anonymous address then we must not have a transport
- // ready for it, as the terminate sequence is not a reply.
- if(to == null || to.hasAnonymousAddress())
- terminateBean.setTransportAvailable(false);
-
- rmsBean.setTerminateAdded(true);
-
- storageManager.getRMSBeanMgr().update(rmsBean);
-
- terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
-
- //the propertyKey of the ackMessage will be the propertyKey for the terminate message as well.
-// terminateRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY, sequencePropertyKey);
-
- // / addTerminateSeqTransaction.commit();
- SandeshaUtil.executeAndStore(terminateRMMessage, key);
-
- SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
-
-
- retramsmitterMgr.insert(terminateBean);
-
- if(log.isDebugEnabled())
- log.debug("Exit: TerminateManager::addTerminateSequenceMessage");
- }
-
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+/**
+ * Contains logic to remove all the storad data of a sequence. Methods of this
+ * are called by sending side and the receiving side when appropriate
+ */
+
+public class TerminateManager {
+
+ private static Log log = LogFactory.getLog(TerminateManager.class);
+
+ private static String CLEANED_ON_TERMINATE_MSG = "CleanedOnTerminateMsg";
+
+ private static String CLEANED_AFTER_INVOCATION = "CleanedAfterInvocation";
+
+ public static HashMap receivingSideCleanMap = new HashMap();
+
+ public static void checkAndTerminate(ConfigurationContext configurationContext, StorageManager storageManager, RMSBean rmsBean)
+ throws SandeshaStorageException, AxisFault {
+ if(log.isDebugEnabled()) log.debug("Enter: TerminateManager::checkAndTerminate " +rmsBean);
+
+ long lastOutMessage = rmsBean.getLastOutMessage ();
+
+ if (lastOutMessage > 0 && !rmsBean.isTerminateAdded()) {
+
+ boolean complete = AcknowledgementManager.verifySequenceCompletion(rmsBean.getClientCompletedMessages(), lastOutMessage);
+
+ //If this is RM 1.1 and RMAnonURI scenario, dont do the termination unless the response side createSequence has been
+ //received (RMDBean has been created) through polling, in this case termination will happen in the create sequence response processor.
+ String rmVersion = rmsBean.getRMVersion();
+ String replyToAddress = rmsBean.getReplyToEPR();
+
+ if (complete &&
+ Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmVersion) && SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
+ RMDBean findBean = new RMDBean ();
+ findBean.setPollingMode(true);
+ findBean.setToAddress(replyToAddress);
+
+ RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+ List beans = rmdBeanMgr.find(findBean);
+ if(beans.isEmpty()) {
+ rmsBean.setTerminationPauserForCS(true);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ complete = false;
+ }
+ }
+
+ // If we are doing sync 2-way over WSRM 1.0 then we may need to keep sending messages,
+ // so check to see if all the senders have been removed
+ EndpointReference replyTo = new EndpointReference (replyToAddress);
+ if (complete &&
+ Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && (replyToAddress==null || replyTo.hasAnonymousAddress())) {
+ SenderBean matcher = new SenderBean();
+ matcher.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ matcher.setSequenceID(rmsBean.getSequenceID());
+
+ List matches = storageManager.getSenderBeanMgr().find(matcher);
+ if(!matches.isEmpty()) complete = false;
+ }
+
+ if (complete) {
+
+ String referenceMsgKey = rmsBean.getReferenceMessageStoreKey();
+ if (referenceMsgKey==null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referenceMessageNotSetForSequence,rmsBean.getSequenceID());
+ throw new SandeshaException (message);
+ }
+
+ MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey, configurationContext);
+
+ if (referenceMessage==null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getSequenceID());
+ throw new SandeshaException (message);
+ }
+
+ RMMsgContext referenceRMMsg = MsgInitializer.initializeMessage(referenceMessage);
+ addTerminateSequenceMessage(referenceRMMsg, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+ }
+
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::checkAndTerminate");
+ }
+
+
+ /**
+ * Called by the receiving side to remove data related to a sequence. e.g.
+ * After sending the TerminateSequence message. Calling this methods will
+ * complete all the data if InOrder invocation is not sequired.
+ *
+ * @param configContext
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void cleanReceivingSideOnTerminateMessage(ConfigurationContext configContext, String sequenceId,
+ StorageManager storageManager) throws SandeshaException {
+
+ // clean senderMap
+
+ //removing any un-sent ack messages.
+ SenderBean findAckBean = new SenderBean ();
+ findAckBean.setSequenceID(sequenceId);
+ findAckBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+ Iterator ackBeans = senderBeanMgr.find(findAckBean).iterator();
+ while (ackBeans.hasNext()) {
+ SenderBean ackBean = (SenderBean) ackBeans.next();
+ senderBeanMgr.delete(ackBean.getMessageID());
+
+ storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
+ }
+
+ // Currently in-order invocation is done for default values.
+ boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(configContext.getAxisConfiguration())
+ .isInOrder();
+
+ if (!inOrderInvocation) {
+ // there is no invoking by Sandesha2. So clean invocations storages.
+
+ receivingSideCleanMap.put(sequenceId, CLEANED_ON_TERMINATE_MSG);
+ cleanReceivingSideAfterInvocation(sequenceId, storageManager);
+ } else {
+
+ String cleanStatus = (String) receivingSideCleanMap.get(sequenceId);
+ if (cleanStatus != null
+ && CLEANED_AFTER_INVOCATION.equals(cleanStatus))
+ // Remove the sequence from the map
+ receivingSideCleanMap.remove(sequenceId);
+ //completeTerminationOfReceivingSide(configContext,
+ // sequenceId, storageManager);
+ else
+ receivingSideCleanMap.put(sequenceId, CLEANED_ON_TERMINATE_MSG);
+ }
+ }
+
+ /**
+ * When InOrder invocation is anabled this had to be called to clean the
+ * data left by the above method. This had to be called after the Invocation
+ * of the Last Message.
+ *
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void cleanReceivingSideAfterInvocation(String sequenceId,
+ StorageManager storageManager) throws SandeshaException {
+ if(log.isDebugEnabled()) log.debug("Enter: TerminateManager::cleanReceivingSideAfterInvocation " +sequenceId);
+
+ InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
+
+ // removing InvokerBean entries
+ InvokerBean invokerFindBean = new InvokerBean();
+ invokerFindBean.setSequenceID(sequenceId);
+ Collection collection = invokerBeanMgr.find(invokerFindBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ InvokerBean invokerBean = (InvokerBean) iterator.next();
+ String messageStoreKey = invokerBean.getMessageContextRefKey();
+ invokerBeanMgr.delete(messageStoreKey);
+
+ // removing the respective message context from the message store.
+ storageManager.removeMessageContext(messageStoreKey);
+ }
+
+ String cleanStatus = (String) receivingSideCleanMap.get(sequenceId);
+ if (cleanStatus != null && CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
+ // Remove the sequence id from the map
+ receivingSideCleanMap.remove(sequenceId);
+ //completeTerminationOfReceivingSide(configContext, sequenceId, storageManager);
+ else
+ receivingSideCleanMap.put(sequenceId, CLEANED_AFTER_INVOCATION);
+
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::cleanReceivingSideAfterInvocation");
+ }
+
+ /**
+ * This is called by the sending side to clean data related to a sequence.
+ * e.g. after sending the TerminateSequence message.
+ *
+ * @param configContext
+ * @param sequenceID
+ * @throws SandeshaException
+ */
+ public static void terminateSendingSide(RMSBean rmsBean,
+ StorageManager storageManager) throws SandeshaException {
+
+ // Indicate that the sequence is terminated
+ rmsBean.setTerminated(true);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager);
+ }
+
+ public static void timeOutSendingSideSequence(String internalSequenceId,
+ StorageManager storageManager) throws SandeshaException {
+
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+ rmsBean.setTimedOut(true);
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ cleanSendingSideData(internalSequenceId, storageManager);
+ }
+
+ private static void cleanSendingSideData(String internalSequenceId, StorageManager storageManager) throws SandeshaException {
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+ // removing retransmitterMgr entries and corresponding message contexts.
+ Collection collection = retransmitterBeanMgr.find(internalSequenceId);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SenderBean retransmitterBean = (SenderBean) iterator.next();
+ retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+ String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+ storageManager.removeMessageContext(messageStoreKey);
+ }
+ }
+
+ public static void addTerminateSequenceMessage(RMMsgContext referenceMessage, String internalSequenceID, String outSequenceId, StorageManager storageManager) throws AxisFault {
+
+ if(log.isDebugEnabled())
+ log.debug("Enter: TerminateManager::addTerminateSequenceMessage " + outSequenceId + ", " + internalSequenceID);
+
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
+
+ if (rmsBean.isTerminateAdded()) {
+ if(log.isDebugEnabled())
+ log.debug("Exit: TerminateManager::addTerminateSequenceMessage - terminate was added previously.");
+ return;
+ }
+
+ RMMsgContext terminateRMMessage = RMMsgCreator.createTerminateSequenceMessage(referenceMessage, rmsBean, storageManager);
+ terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+ terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+
+ //setting the To EPR.
+ //First try to get it from an Endpoint property.
+ //If not get it from the To property.
+
+ EndpointReference toEPR = null;
+
+ if (rmsBean.getOfferedEndPoint() != null)
+ toEPR = new EndpointReference (rmsBean.getOfferedEndPoint());
+
+ if (toEPR==null) {
+
+ if (rmsBean.getToEPR()!=null) {
+ toEPR = new EndpointReference(rmsBean.getToEPR());
+ if (toEPR == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+ throw new SandeshaException(message);
+ }
+ }
+ }
+
+ if (toEPR!=null)
+ terminateRMMessage.setTo(toEPR);
+
+ if (rmsBean.getReplyToEPR()!=null) {
+ terminateRMMessage.setReplyTo(new EndpointReference (rmsBean.getReplyToEPR()));
+ }
+
+ String rmVersion = rmsBean.getRMVersion();
+ terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+ terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+ if (rmsBean.getTransportTo() != null) {
+ terminateRMMessage.setProperty(Constants.Configuration.TRANSPORT_URL, rmsBean.getTransportTo());
+ }
+
+ terminateRMMessage.addSOAPEnvelope();
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean terminateBean = new SenderBean();
+ terminateBean.setInternalSequenceID(internalSequenceID);
+ terminateBean.setSequenceID(outSequenceId);
+ terminateBean.setMessageContextRefKey(key);
+ terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+
+ // Set a retransmitter lastSentTime so that terminate will be send with
+ // some delay.
+ // Otherwise this get send before return of the current request (ack).
+ // TODO: refine the terminate delay.
+ terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
+
+ terminateBean.setMessageID(terminateRMMessage.getMessageId());
+
+ // this will be set to true at the sender.
+ terminateBean.setSend(true);
+
+ terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+ Sandesha2Constants.VALUE_FALSE);
+
+ terminateBean.setReSend(false);
+
+ terminateBean.setSequenceID(outSequenceId);
+
+ terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+ terminateBean.setInternalSequenceID(internalSequenceID);
+
+
+ EndpointReference to = terminateRMMessage.getTo();
+ if (to!=null)
+ terminateBean.setToAddress(to.getAddress());
+
+ // If this message is targetted at an anonymous address then we must not have a transport
+ // ready for it, as the terminate sequence is not a reply.
+ if(to == null || to.hasAnonymousAddress())
+ terminateBean.setTransportAvailable(false);
+
+ rmsBean.setTerminateAdded(true);
+
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+
+ //the propertyKey of the ackMessage will be the propertyKey for the terminate message as well.
+// terminateRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY, sequencePropertyKey);
+
+ // / addTerminateSeqTransaction.commit();
+ SandeshaUtil.executeAndStore(terminateRMMessage, key);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
+
+
+ retramsmitterMgr.insert(terminateBean);
+
+ if(log.isDebugEnabled())
+ log.debug("Exit: TerminateManager::addTerminateSequenceMessage");
+ }
+
+}
Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java Wed Aug 8 17:57:51 2007
@@ -1,220 +1,220 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-package org.apache.sandesha2.util;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.RMSBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
-
-public class WSRMMessageSender {
-
- private static final Log log = LogFactory.getLog(WSRMMessageSender.class);
-
- private MessageContext msgContext;
- private StorageManager storageManager;
- private ConfigurationContext configurationContext;
- private String toAddress;
- private String sequenceKey;
- private String internalSequenceID;
- private boolean sequenceExists;
- private String outSequenceID;
- private String rmVersion;
- private RMSBean rmsBean;
-
- /**
- * Extracts information from the rmMsgCtx specific for processing out messages
- *
- * @param rmMsgCtx
- * @throws AxisFault
- */
- protected void setupOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
- if (log.isDebugEnabled())
- log.debug("Enter: WSRMParentProcessor::setupOutMessage");
-
- msgContext = rmMsgCtx.getMessageContext();
- configurationContext = msgContext.getConfigurationContext();
- Options options = msgContext.getOptions();
-
- storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
- configurationContext.getAxisConfiguration());
-
- internalSequenceID =
- (String)rmMsgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
-
- toAddress = rmMsgCtx.getTo().getAddress();
- sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-
- if(internalSequenceID==null)
- {
- internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
- }
-
- // Does the sequence exist ?
- sequenceExists = false;
- outSequenceID = null;
-
- // Get the RMSBean with the matching internal sequenceid
- rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
-
- if (rmsBean == null)
- {
- if (log.isDebugEnabled())
- log.debug("Exit: WSRMParentProcessor::setupOutMessage Sequence doesn't exist");
-
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));
- }
-
- if (rmsBean.getSequenceID() != null)
- {
- sequenceExists = true;
- outSequenceID = rmsBean.getSequenceID();
- }
- else
- outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;
-
- rmVersion = rmsBean.getRMVersion();
- if (rmVersion == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
- if (log.isDebugEnabled())
- log.debug("Exit: WSRMParentProcessor::setupOutMessage");
- }
-
-
- protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay) throws AxisFault {
- if (log.isDebugEnabled())
- log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
-
- rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
- getMsgContext().setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- rmMsgCtx.setTo(new EndpointReference(toAddress));
-
- String transportTo = rmsBean.getTransportTo();
- if (transportTo != null) {
- rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
- }
-
- //setting msg context properties
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
-
- rmMsgCtx.addSOAPEnvelope();
-
- // Ensure the outbound message us secured using the correct token
- RMMsgCreator.secureOutboundMessage(getRMSBean(), msgContext);
-
- String key = SandeshaUtil.getUUID();
-
- SenderBean senderBean = new SenderBean();
- senderBean.setMessageType(msgType);
- senderBean.setMessageContextRefKey(key);
- senderBean.setTimeToSend(System.currentTimeMillis() + delay);
- senderBean.setMessageID(msgContext.getMessageID());
-
- // Set the internal sequence id and outgoing sequence id for the terminate message
- senderBean.setInternalSequenceID(internalSequenceID);
- if (sequenceExists)
- {
- senderBean.setSend(true);
- senderBean.setSequenceID(outSequenceID);
- }
- else
- senderBean.setSend(false);
-
- EndpointReference to = msgContext.getTo();
- if (to!=null)
- senderBean.setToAddress(to.getAddress());
-
- // If this message is targetted at an anonymous address then we must not have a transport
- // ready for it, as the current message is not a reply.
- if(to == null || to.hasAnonymousAddress())
- senderBean.setTransportAvailable(false);
-
- msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- senderBean.setReSend(false);
-
- SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
-
- SandeshaUtil.executeAndStore(rmMsgCtx, key);
-
- retramsmitterMgr.insert(senderBean);
-
- if (log.isDebugEnabled())
- log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
-
- }
-
-
- public final StorageManager getStorageManager() {
- return storageManager;
- }
-
- public final String getInternalSequenceID() {
- return internalSequenceID;
- }
-
- public final MessageContext getMsgContext() {
- return msgContext;
- }
-
- public final String getOutSequenceID() {
- return outSequenceID;
- }
-
- public final boolean isSequenceExists() {
- return sequenceExists;
- }
-
- public final String getSequenceKey() {
- return sequenceKey;
- }
-
- public final String getToAddress() {
- return toAddress;
- }
-
- public final ConfigurationContext getConfigurationContext() {
- return configurationContext;
- }
-
- public final String getRMVersion() {
- return rmVersion;
- }
-
- public final RMSBean getRMSBean() {
- return rmsBean;
- }
-
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+public class WSRMMessageSender {
+
+ private static final Log log = LogFactory.getLog(WSRMMessageSender.class);
+
+ private MessageContext msgContext;
+ private StorageManager storageManager;
+ private ConfigurationContext configurationContext;
+ private String toAddress;
+ private String sequenceKey;
+ private String internalSequenceID;
+ private boolean sequenceExists;
+ private String outSequenceID;
+ private String rmVersion;
+ private RMSBean rmsBean;
+
+ /**
+ * Extracts information from the rmMsgCtx specific for processing out messages
+ *
+ * @param rmMsgCtx
+ * @throws AxisFault
+ */
+ protected void setupOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: WSRMParentProcessor::setupOutMessage");
+
+ msgContext = rmMsgCtx.getMessageContext();
+ configurationContext = msgContext.getConfigurationContext();
+ Options options = msgContext.getOptions();
+
+ storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+ configurationContext.getAxisConfiguration());
+
+ internalSequenceID =
+ (String)rmMsgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+
+ toAddress = rmMsgCtx.getTo().getAddress();
+ sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+
+ if(internalSequenceID==null)
+ {
+ internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
+ }
+
+ // Does the sequence exist ?
+ sequenceExists = false;
+ outSequenceID = null;
+
+ // Get the RMSBean with the matching internal sequenceid
+ rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceID);
+
+ if (rmsBean == null)
+ {
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::setupOutMessage Sequence doesn't exist");
+
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));
+ }
+
+ if (rmsBean.getSequenceID() != null)
+ {
+ sequenceExists = true;
+ outSequenceID = rmsBean.getSequenceID();
+ }
+ else
+ outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;
+
+ rmVersion = rmsBean.getRMVersion();
+ if (rmVersion == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::setupOutMessage");
+ }
+
+
+ protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
+
+ rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+ getMsgContext().setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ rmMsgCtx.setTo(new EndpointReference(toAddress));
+
+ String transportTo = rmsBean.getTransportTo();
+ if (transportTo != null) {
+ rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
+ }
+
+ //setting msg context properties
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
+
+ rmMsgCtx.addSOAPEnvelope();
+
+ // Ensure the outbound message us secured using the correct token
+ RMMsgCreator.secureOutboundMessage(getRMSBean(), msgContext);
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean senderBean = new SenderBean();
+ senderBean.setMessageType(msgType);
+ senderBean.setMessageContextRefKey(key);
+ senderBean.setTimeToSend(System.currentTimeMillis() + delay);
+ senderBean.setMessageID(msgContext.getMessageID());
+
+ // Set the internal sequence id and outgoing sequence id for the terminate message
+ senderBean.setInternalSequenceID(internalSequenceID);
+ if (sequenceExists)
+ {
+ senderBean.setSend(true);
+ senderBean.setSequenceID(outSequenceID);
+ }
+ else
+ senderBean.setSend(false);
+
+ EndpointReference to = msgContext.getTo();
+ if (to!=null)
+ senderBean.setToAddress(to.getAddress());
+
+ // If this message is targetted at an anonymous address then we must not have a transport
+ // ready for it, as the current message is not a reply.
+ if(to == null || to.hasAnonymousAddress())
+ senderBean.setTransportAvailable(false);
+
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ senderBean.setReSend(false);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
+
+ SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+ retramsmitterMgr.insert(senderBean);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
+
+ }
+
+
+ public final StorageManager getStorageManager() {
+ return storageManager;
+ }
+
+ public final String getInternalSequenceID() {
+ return internalSequenceID;
+ }
+
+ public final MessageContext getMsgContext() {
+ return msgContext;
+ }
+
+ public final String getOutSequenceID() {
+ return outSequenceID;
+ }
+
+ public final boolean isSequenceExists() {
+ return sequenceExists;
+ }
+
+ public final String getSequenceKey() {
+ return sequenceKey;
+ }
+
+ public final String getToAddress() {
+ return toAddress;
+ }
+
+ public final ConfigurationContext getConfigurationContext() {
+ return configurationContext;
+ }
+
+ public final String getRMVersion() {
+ return rmVersion;
+ }
+
+ public final RMSBean getRMSBean() {
+ return rmsBean;
+ }
+
+}
Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java Wed Aug 8 17:57:51 2007
@@ -1,316 +1,316 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-package org.apache.sandesha2.workers;
-
-import java.security.AccessController;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.ThreadFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.util.SandeshaUtil;
-
-/**
- * Aggregates pause and stop logic between sender and invoker threads.
- */
-public abstract class SandeshaThread extends Thread{
-
- private static final Log log = LogFactory.getLog(SandeshaThread.class);
-
- private boolean runThread = false;
- private boolean hasStoppedRunning = false;
- private boolean hasPausedRunning = false;
- private boolean pauseRequired = false;
-
- private int sleepTime;
- private WorkerLock lock = null;
-
- private ArrayList workingSequences = new ArrayList();
-
- protected transient ThreadFactory threadPool;
- protected ConfigurationContext context = null;
- protected StorageManager storageManager = null;
- private boolean reRunThread;
-
- public SandeshaThread(int sleepTime) {
- this.sleepTime = sleepTime;
- lock = new WorkerLock ();
- }
-
- public final WorkerLock getWorkerLock() {
- return lock;
- }
-
- public synchronized void stopThreadForSequence(String sequenceID, boolean rmSource){
- if (log.isDebugEnabled())
- log.debug("Enter: SandeshaThread::stopThreadForSequence, " + sequenceID);
-
- // We do not actually stop the thread here, as the workers are smart enough
- // to sleep when there is no work to do. If we were to exit the thread then
- // we wouldn't be able to start back up when the thread gets some more work
- // to do.
- workingSequences.remove(new SequenceEntry(sequenceID, rmSource));
-
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaThread::stopThreadForSequence");
- }
-
- /**
- * Waits for the invoking thread to pause
- */
- public synchronized void blockForPause(){
- while(pauseRequired){
- //someone else is requesting a pause - wait for them to finish
- try{
- wait(sleepTime);
- }catch(InterruptedException e){
- //ignore
- }
- }
-
- //we can now request a pause - the next pause will be ours
- pauseRequired = true;
-
- if(hasStoppedRunning() || !isThreadStarted()){
- throw new IllegalStateException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotPauseThread));
- }
- while(!hasPausedRunning){
- //wait for our pause to come around
- try{
- wait(sleepTime);
- }catch(InterruptedException e){
- //ignore
- }
-
- }
- //the sandesha thread is now paused
- }
-
- public synchronized void finishPause(){
- //indicate that the current pause is no longer required.
- pauseRequired = false;
- notifyAll();
- }
-
- public synchronized void stopRunning() {
- if (log.isDebugEnabled())
- log.debug("Enter: SandeshaThread::stopRunning, " + this);
-
- //NOTE: we do not take acount of pausing when stopping.
- //The call to stop will wait until the invoker has exited the loop
- if (isThreadStarted()) {
- // the invoker is started so stop it
- runThread = false;
- // wait for it to finish
- while (!hasStoppedRunning()) {
- try {
- wait(sleepTime);
- } catch (InterruptedException e1) {
- //ignore
- }
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaThread::stopRunning, " + this);
- }
-
- public synchronized boolean isThreadStarted() {
-
- if (!runThread && log.isDebugEnabled())
- log.debug("SandeshaThread not started");
-
- return runThread;
- }
-
-
- /**
- * Ensure that the worker thread is aware of the given sequence. As source sequences
- * do not have a proper sequence id at the time they are bootstrapped, the caller
- * must pass in the internal sequence id when rmSource is true.
- */
- public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID, boolean rmSource){
- if(log.isDebugEnabled())
- log.debug("Entry: SandeshaThread::runThreadForSequence, " + this + ", " + sequenceID + ", " + rmSource);
-
- SequenceEntry entry = new SequenceEntry(sequenceID, rmSource);
- if (!workingSequences.contains(entry)) workingSequences.add(entry);
-
- if (!isThreadStarted()) {
- if(log.isDebugEnabled()) log.debug("Starting thread");
-
- this.context = context;
-
- // Get the axis2 thread pool
- threadPool = context.getThreadPool();
-
- runThread = true; // so that isStarted()=true.
-
- super.start();
-
- // Set the SandeshaThread to have the same context classloader as the application
- try{
- AccessController.doPrivileged(new PrivilegedExceptionAction() {
- public Object run() throws Exception {
- SandeshaThread.this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
- return null;
- }
- });
- }
- catch(Exception e){
- log.error(e);
- throw new RuntimeException(e);
- }
- } else {
- if(log.isDebugEnabled()) log.debug("Waking thread");
- wakeThread();
- }
-
- if(log.isDebugEnabled()) log.debug("Exit: SandeshaThread::runThreadForSequence");
- }
-
- /**
- *
- * @return a List of SequenceEntry instances
- */
- public synchronized ArrayList getSequences() {
- // Need to copy the list for thread safety
- ArrayList result = new ArrayList();
- result.addAll(workingSequences);
- return result;
- }
-
- protected synchronized boolean hasStoppedRunning() {
- return hasStoppedRunning;
- }
-
- protected synchronized void doPauseIfNeeded(){
- //see if we need to pause
-
- while(pauseRequired){
- if(!hasPausedRunning){
- //let the requester of this pause know we are now pausing
- hasPausedRunning = true;
- notifyAll();
- }
- //now we pause
- try{
- wait(sleepTime);
- }catch(InterruptedException e){
- //ignore
- }
- }//end while
- //the request to pause has finished so we are no longer pausing
- hasPausedRunning = false;
- }
-
- /**
- * Wake the current thread as there is work to be done.
- * Also flag that if we miss a notify, then there is
- * work to be done. Implementing threads should check this value
- * before waiting
- *
- */
- public synchronized void wakeThread() {
- reRunThread = true;
-
- if (!hasPausedRunning)
- notify();
- }
-
- /**
- * Indicate that the main loop has been run
- */
- public synchronized void setRanMainLoop() {
- reRunThread = false;
- }
-
- /**
- * Test to check if a notify has been called when not waiting
- *
- * @return
- */
- protected synchronized boolean runMainLoop () {
- return reRunThread;
- }
-
- /**
- * The main work loop, to be implemented by any child class. If the child wants
- * to sleep before the next loop then they should return true.
- */
- protected abstract boolean internalRun();
-
- public void run() {
- try {
- boolean sleep = false;
-
- while (isThreadStarted()) {
- try {
- synchronized (this) {
- if(sleep && !runMainLoop()) wait(sleepTime);
- // Indicate that we are running the main loop
- setRanMainLoop();
- }
- } catch (InterruptedException e1) {
- log.debug("SandeshaThread was interupted...");
- log.debug(e1.getMessage());
- log.debug("End printing Interrupt...");
- }
-
- //pause if we have to
- doPauseIfNeeded();
-
- // Ensure we have context and a storage manager
- if (context == null) {
- String message = SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.configContextNotSet);
- message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.cannotCointinueSender, message);
- log.debug(message);
- throw new RuntimeException(message);
- }
-
- if(storageManager == null) {
- try {
- storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
- } catch (SandeshaException e2) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotCointinueSender, e2.toString());
- log.debug(message);
- throw new RuntimeException(message);
- }
- }
-
- // Call into the real function
- sleep = internalRun();
- }
- } finally {
- // flag that we have exited the run loop and notify any waiting
- // threads
- synchronized (this) {
- if(log.isDebugEnabled()) log.debug("SandeshaThread really stopping " + this);
- hasStoppedRunning = true;
- notify();
- }
- }
- }
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.sandesha2.workers;
+
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * Aggregates pause and stop logic between sender and invoker threads.
+ */
+public abstract class SandeshaThread extends Thread{
+
+ private static final Log log = LogFactory.getLog(SandeshaThread.class);
+
+ private boolean runThread = false;
+ private boolean hasStoppedRunning = false;
+ private boolean hasPausedRunning = false;
+ private boolean pauseRequired = false;
+
+ private int sleepTime;
+ private WorkerLock lock = null;
+
+ private ArrayList workingSequences = new ArrayList();
+
+ protected transient ThreadFactory threadPool;
+ protected ConfigurationContext context = null;
+ protected StorageManager storageManager = null;
+ private boolean reRunThread;
+
+ public SandeshaThread(int sleepTime) {
+ this.sleepTime = sleepTime;
+ lock = new WorkerLock ();
+ }
+
+ public final WorkerLock getWorkerLock() {
+ return lock;
+ }
+
+ public synchronized void stopThreadForSequence(String sequenceID, boolean rmSource){
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaThread::stopThreadForSequence, " + sequenceID);
+
+ // We do not actually stop the thread here, as the workers are smart enough
+ // to sleep when there is no work to do. If we were to exit the thread then
+ // we wouldn't be able to start back up when the thread gets some more work
+ // to do.
+ workingSequences.remove(new SequenceEntry(sequenceID, rmSource));
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaThread::stopThreadForSequence");
+ }
+
+ /**
+ * Waits for the invoking thread to pause
+ */
+ public synchronized void blockForPause(){
+ while(pauseRequired){
+ //someone else is requesting a pause - wait for them to finish
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+ }
+
+ //we can now request a pause - the next pause will be ours
+ pauseRequired = true;
+
+ if(hasStoppedRunning() || !isThreadStarted()){
+ throw new IllegalStateException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotPauseThread));
+ }
+ while(!hasPausedRunning){
+ //wait for our pause to come around
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+
+ }
+ //the sandesha thread is now paused
+ }
+
+ public synchronized void finishPause(){
+ //indicate that the current pause is no longer required.
+ pauseRequired = false;
+ notifyAll();
+ }
+
+ public synchronized void stopRunning() {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaThread::stopRunning, " + this);
+
+ //NOTE: we do not take acount of pausing when stopping.
+ //The call to stop will wait until the invoker has exited the loop
+ if (isThreadStarted()) {
+ // the invoker is started so stop it
+ runThread = false;
+ // wait for it to finish
+ while (!hasStoppedRunning()) {
+ try {
+ wait(sleepTime);
+ } catch (InterruptedException e1) {
+ //ignore
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaThread::stopRunning, " + this);
+ }
+
+ public synchronized boolean isThreadStarted() {
+
+ if (!runThread && log.isDebugEnabled())
+ log.debug("SandeshaThread not started");
+
+ return runThread;
+ }
+
+
+ /**
+ * Ensure that the worker thread is aware of the given sequence. As source sequences
+ * do not have a proper sequence id at the time they are bootstrapped, the caller
+ * must pass in the internal sequence id when rmSource is true.
+ */
+ public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID, boolean rmSource){
+ if(log.isDebugEnabled())
+ log.debug("Entry: SandeshaThread::runThreadForSequence, " + this + ", " + sequenceID + ", " + rmSource);
+
+ SequenceEntry entry = new SequenceEntry(sequenceID, rmSource);
+ if (!workingSequences.contains(entry)) workingSequences.add(entry);
+
+ if (!isThreadStarted()) {
+ if(log.isDebugEnabled()) log.debug("Starting thread");
+
+ this.context = context;
+
+ // Get the axis2 thread pool
+ threadPool = context.getThreadPool();
+
+ runThread = true; // so that isStarted()=true.
+
+ super.start();
+
+ // Set the SandeshaThread to have the same context classloader as the application
+ try{
+ AccessController.doPrivileged(new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ SandeshaThread.this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
+ return null;
+ }
+ });
+ }
+ catch(Exception e){
+ log.error(e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ if(log.isDebugEnabled()) log.debug("Waking thread");
+ wakeThread();
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: SandeshaThread::runThreadForSequence");
+ }
+
+ /**
+ *
+ * @return a List of SequenceEntry instances
+ */
+ public synchronized ArrayList getSequences() {
+ // Need to copy the list for thread safety
+ ArrayList result = new ArrayList();
+ result.addAll(workingSequences);
+ return result;
+ }
+
+ protected synchronized boolean hasStoppedRunning() {
+ return hasStoppedRunning;
+ }
+
+ protected synchronized void doPauseIfNeeded(){
+ //see if we need to pause
+
+ while(pauseRequired){
+ if(!hasPausedRunning){
+ //let the requester of this pause know we are now pausing
+ hasPausedRunning = true;
+ notifyAll();
+ }
+ //now we pause
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+ }//end while
+ //the request to pause has finished so we are no longer pausing
+ hasPausedRunning = false;
+ }
+
+ /**
+ * Wake the current thread as there is work to be done.
+ * Also flag that if we miss a notify, then there is
+ * work to be done. Implementing threads should check this value
+ * before waiting
+ *
+ */
+ public synchronized void wakeThread() {
+ reRunThread = true;
+
+ if (!hasPausedRunning)
+ notify();
+ }
+
+ /**
+ * Indicate that the main loop has been run
+ */
+ public synchronized void setRanMainLoop() {
+ reRunThread = false;
+ }
+
+ /**
+ * Test to check if a notify has been called when not waiting
+ *
+ * @return
+ */
+ protected synchronized boolean runMainLoop () {
+ return reRunThread;
+ }
+
+ /**
+ * The main work loop, to be implemented by any child class. If the child wants
+ * to sleep before the next loop then they should return true.
+ */
+ protected abstract boolean internalRun();
+
+ public void run() {
+ try {
+ boolean sleep = false;
+
+ while (isThreadStarted()) {
+ try {
+ synchronized (this) {
+ if(sleep && !runMainLoop()) wait(sleepTime);
+ // Indicate that we are running the main loop
+ setRanMainLoop();
+ }
+ } catch (InterruptedException e1) {
+ log.debug("SandeshaThread was interupted...");
+ log.debug(e1.getMessage());
+ log.debug("End printing Interrupt...");
+ }
+
+ //pause if we have to
+ doPauseIfNeeded();
+
+ // Ensure we have context and a storage manager
+ if (context == null) {
+ String message = SandeshaMessageHelper
+ .getMessage(SandeshaMessageKeys.configContextNotSet);
+ message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.cannotCointinueSender, message);
+ log.debug(message);
+ throw new RuntimeException(message);
+ }
+
+ if(storageManager == null) {
+ try {
+ storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+ } catch (SandeshaException e2) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotCointinueSender, e2.toString());
+ log.debug(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ // Call into the real function
+ sleep = internalRun();
+ }
+ } finally {
+ // flag that we have exited the run loop and notify any waiting
+ // threads
+ synchronized (this) {
+ if(log.isDebugEnabled()) log.debug("SandeshaThread really stopping " + this);
+ hasStoppedRunning = true;
+ notify();
+ }
+ }
+ }
+}
Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaWorker.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org