You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ga...@apache.org on 2006/12/05 16:43:26 UTC

svn commit: r482691 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: msgprocessors/AckRequestedProcessor.java msgprocessors/CloseSequenceProcessor.java msgprocessors/TerminateSeqMsgProcessor.java util/WSRMMessageSender.java

Author: gatfora
Date: Tue Dec  5 07:43:24 2006
New Revision: 482691

URL: http://svn.apache.org/viewvc?view=rev&rev=482691
Log:
ack request, close and terminate processing refactor patch for SANDESHA2-58

Added:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Tue Dec  5 07:43:24 2006
@@ -30,7 +30,6 @@
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -41,7 +40,6 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
@@ -50,7 +48,6 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.FaultManager;
@@ -59,13 +56,14 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
 import org.apache.sandesha2.wsrm.AckRequested;
 
 /**
  * Responsible for processing ack requested headers on incoming messages.
  */
 
-public class AckRequestedProcessor {
+public class AckRequestedProcessor extends WSRMMessageSender {
 
 	private static final Log log = LogFactory.getLog(AckRequestedProcessor.class);
 
@@ -274,11 +272,8 @@
 
 			if (it.hasNext()) {
 				SenderBean oldAckBean = (SenderBean) it.next();
-				timeToSend = oldAckBean.getTimeToSend(); // If there is an
-															// old ack. This ack
-															// will be sent in
-															// the old
-															// timeToSend.
+				// If there is an old ack. This ack will be sent in the old timeToSend.
+				timeToSend = oldAckBean.getTimeToSend(); 
 				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
 			}
 
@@ -315,58 +310,18 @@
 		if (log.isDebugEnabled())
 			log.debug("Enter: AckRequestedProcessor::processOutgoingAckRequestMessage");
 
-		MessageContext msgContext = ackRequestRMMsg.getMessageContext();
-		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
-		Options options = msgContext.getOptions();
-
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
-				configurationContext.getAxisConfiguration());
-
-		String toAddress = ackRequestRMMsg.getTo().getAddress();
-		String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-		String internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
-		// Does the sequence exist ?
-		boolean sequenceExists = false;
-		String outSequenceID = null;
-		
-		// Get the Create sequence bean with the matching internal sequenceid 
-		CreateSeqBean createSeqFindBean = new CreateSeqBean();
-		createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
-		CreateSeqBean createSeqBean = storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-		
-		if (createSeqBean == null)
-		{
-			if (log.isDebugEnabled())
-				log.debug("Exit: AckRequestedProcessor::processOutMessage Sequence doesn't exist");
-			
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.couldNotSendCloseSeqNotFound, internalSequenceID));			
-		}
-		
-		if (createSeqBean.getSequenceID() != null)
-		{
-			sequenceExists = true;		
-			outSequenceID = createSeqBean.getSequenceID();
-		}
-		else
-			outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;			
-
-		String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
-		if (rmVersion == null)
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+		setupOutMessage(ackRequestRMMsg);
 
 		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
 				Sandesha2Constants.MessageTypes.ACK,
-				rmVersion,
-				msgContext.getAxisService());
-		msgContext.setAxisOperation(ackOperation);
+				getRMVersion(),
+				getMsgContext().getAxisService());
+		getMsgContext().setAxisOperation(ackOperation);
 
 		OperationContext opcontext = new OperationContext(ackOperation);
-		opcontext.setParent(msgContext.getServiceContext());
-		configurationContext.registerOperationContext(ackRequestRMMsg.getMessageId(), opcontext);
-		msgContext.setOperationContext(opcontext);
+		opcontext.setParent(getMsgContext().getServiceContext());
+		getConfigurationContext().registerOperationContext(ackRequestRMMsg.getMessageId(), opcontext);
+		getMsgContext().setOperationContext(opcontext);
 		
 		Iterator iterator = ackRequestRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
 		
@@ -383,70 +338,13 @@
 			throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAckRequestPartFound));
 		}
 		
-		ackRequested.getIdentifier().setIndentifer(outSequenceID);
-		
-		msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-		ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (rmVersion));
-		ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (rmVersion));
-
-		String transportTo = SandeshaUtil.getSequenceProperty(internalSequenceID,
-				Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
-		if (transportTo != null) {
-			ackRequestRMMsg.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
-		}
-		
-		//setting msg context properties
-		ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
-		ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
-		ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey);
+		ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (getRMVersion()));
+		ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (getRMVersion()));
 
-		ackRequestRMMsg.addSOAPEnvelope();
+		sendOutgoingMessage(ackRequestRMMsg, Sandesha2Constants.MessageTypes.ACK_REQUEST, 0);
 		
-		// Ensure the outbound message us secured using the correct token
-		String tokenData = SandeshaUtil.getSequenceProperty(internalSequenceID,
-				Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
-				storageManager);
-		if(tokenData != null) {
-			SecurityManager secMgr = SandeshaUtil.getSecurityManager(configurationContext);
-			SecurityToken token = secMgr.recoverSecurityToken(tokenData);
-			secMgr.applySecurityToken(token, msgContext);
-		}
-
-		String key = SandeshaUtil.getUUID();
-
-		SenderBean ackRequestBean = new SenderBean();
-		ackRequestBean.setMessageType(Sandesha2Constants.MessageTypes.ACK_REQUEST);
-		ackRequestBean.setMessageContextRefKey(key);
-
-		ackRequestBean.setTimeToSend(System.currentTimeMillis());
-
-		ackRequestBean.setMessageID(msgContext.getMessageID());
-		
-		EndpointReference to = msgContext.getTo();
-		if (to!=null)
-			ackRequestBean.setToAddress(to.getAddress());
-		
-		// this will be set to true at the sender.
-		if (sequenceExists)
-		  ackRequestBean.setSend(true);
-		else
-			ackRequestBean.setSend(false);
-
-		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
-		ackRequestBean.setReSend(false);
-
-		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
-		// Set the sequence id and internal sequence id in the SenderBean
-		ackRequestBean.setInternalSequenceID(internalSequenceID);
-		if (sequenceExists)
-			ackRequestBean.setSequenceID(outSequenceID);
-		
-		SandeshaUtil.executeAndStore(ackRequestRMMsg, key);
-
-		retramsmitterMgr.insert(ackRequestBean);
+		// Pause the message context
+		ackRequestRMMsg.pause();
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Tue Dec  5 07:43:24 2006
@@ -23,9 +23,6 @@
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -37,16 +34,12 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
@@ -54,6 +47,7 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
 import org.apache.sandesha2.wsrm.CloseSequence;
 import org.apache.sandesha2.wsrm.Identifier;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -63,7 +57,7 @@
  * by the WSRM 1.1 specification)
  */
 
-public class CloseSequenceProcessor implements MsgProcessor {
+public class CloseSequenceProcessor extends WSRMMessageSender implements MsgProcessor {
 
 	private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
 
@@ -174,61 +168,26 @@
 		if (log.isDebugEnabled()) 
 			log.debug("Enter: CloseSequenceProcessor::processOutMessage");
 		
-		MessageContext msgContext = rmMsgCtx.getMessageContext();
-		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
-		Options options = msgContext.getOptions();
-
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
-				configurationContext.getAxisConfiguration());
-
-		String toAddress = rmMsgCtx.getTo().getAddress();
-		String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-		String internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
-		// Does the sequence exist ?
-		boolean sequenceExists = false;
-		String outSequenceID = null;
-		
-		// Get the Create sequence bean with the matching internal sequenceid 
-		CreateSeqBean createSeqFindBean = new CreateSeqBean();
-		createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
-		CreateSeqBean createSeqBean = storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-		
-		if (createSeqBean == null)
-		{
-			if (log.isDebugEnabled())
-				log.debug("Exit: CloseSequenceProcessor::processOutMessage Sequence doesn't exist");
-			
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.couldNotSendCloseSeqNotFound, internalSequenceID));			
-		}
-		
-		if (createSeqBean.getSequenceID() != null)
-		{
-			sequenceExists = true;		
-			outSequenceID = createSeqBean.getSequenceID();
-		}
-		else
-			outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;			
+		// Get the data from the message context
+		setupOutMessage(rmMsgCtx);
 
 		//write into the sequence proeprties that the client is now closed
 		SequencePropertyBean sequenceClosedBean = new SequencePropertyBean();
-		sequenceClosedBean.setSequencePropertyKey(internalSequenceID);
+		sequenceClosedBean.setSequencePropertyKey(getInternalSequenceID());
 		sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED_CLIENT);
 		sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
-		storageManager.getSequencePropertyBeanMgr().insert(sequenceClosedBean);
+		getStorageManager().getSequencePropertyBeanMgr().insert(sequenceClosedBean);
 
 		AxisOperation closeOperation = SpecSpecificConstants.getWSRMOperation(
 				Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE,
 				rmMsgCtx.getRMSpecVersion(),
 				rmMsgCtx.getMessageContext().getAxisService());
-		msgContext.setAxisOperation(closeOperation);
+		getMsgContext().setAxisOperation(closeOperation);
 
 		OperationContext opcontext = new OperationContext(closeOperation);
-		opcontext.setParent(msgContext.getServiceContext());
-		configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
-		msgContext.setOperationContext(opcontext);
+		opcontext.setParent(getMsgContext().getServiceContext());
+		getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
+		getMsgContext().setOperationContext(opcontext);
 		
 		CloseSequence closeSequencePart = (CloseSequence) rmMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
@@ -238,76 +197,14 @@
 			closeSequencePart.setIdentifier(identifier);
 		}
 		
-		identifier.setIndentifer(outSequenceID);
-
-		msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-		String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
-		if (rmVersion == null)
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
-		rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(rmVersion));
-		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (rmVersion));
-
-		String transportTo = SandeshaUtil.getSequenceProperty(internalSequenceID,
-				Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
-		if (transportTo != null) {
-			rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
-		}
-		
-		//setting msg context properties
-		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
-		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
-		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey);
-
-		rmMsgCtx.addSOAPEnvelope();
-
-		// Ensure the outbound message us secured using the correct token
-		String tokenData = SandeshaUtil.getSequenceProperty(internalSequenceID,
-				Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
-				storageManager);
-		if(tokenData != null) {
-			SecurityManager secMgr = SandeshaUtil.getSecurityManager(configurationContext);
-			SecurityToken token = secMgr.recoverSecurityToken(tokenData);
-			secMgr.applySecurityToken(token, msgContext);
-		}
-
-		String key = SandeshaUtil.getUUID();
-
-		SenderBean closeBean = new SenderBean();
-		// Indicate that this is a close sequence message
-		closeBean.setMessageType(Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE);
-		closeBean.setMessageContextRefKey(key);
-
-		closeBean.setTimeToSend(System.currentTimeMillis());
-
-		closeBean.setMessageID(msgContext.getMessageID());
-		
-		EndpointReference to = msgContext.getTo();
-		if (to!=null)
-			closeBean.setToAddress(to.getAddress());
-		
-		// this will be set to true at the sender.
-		if (sequenceExists)
-		  closeBean.setSend(true);
-		else
-			closeBean.setSend(false);
-
-		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
-		closeBean.setReSend(false);
-
-		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-		
-		// Add the sequence id and internal sequenceid to the closeBean
-		if (sequenceExists)
-		  closeBean.setSequenceID(outSequenceID);
-		
-		closeBean.setInternalSequenceID(internalSequenceID);
+		rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(getRMVersion()));
+		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (getRMVersion()));
 
-		SandeshaUtil.executeAndStore(rmMsgCtx, key);
+		// Send this outgoing message
+		sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0);
 
-		retramsmitterMgr.insert(closeBean);
+		// Pause the message context
+		rmMsgCtx.pause();
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Dec  5 07:43:24 2006
@@ -21,9 +21,7 @@
 
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -37,16 +35,12 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
@@ -55,6 +49,7 @@
 import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.util.WSRMMessageSender;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
@@ -62,7 +57,7 @@
  * Responsible for processing an incoming Terminate Sequence message.
  */
 
-public class TerminateSeqMsgProcessor implements MsgProcessor {
+public class TerminateSeqMsgProcessor extends WSRMMessageSender implements MsgProcessor {
 
 	private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
 
@@ -312,49 +307,12 @@
 		if (log.isDebugEnabled())
 			log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
 
-		MessageContext msgContext = rmMsgCtx.getMessageContext();
-		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
-		Options options = msgContext.getOptions();
-
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
-				configurationContext.getAxisConfiguration());
-
-		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
-		String toAddress = rmMsgCtx.getTo().getAddress();
-		String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-		String internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
-		// Does the sequence exist ?
-		boolean sequenceExists = false;
-		String outSequenceID = null;
-		
-		// Get the Create sequence bean with the matching internal sequenceid 
-		CreateSeqBean createSeqFindBean = new CreateSeqBean();
-		createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
-		CreateSeqBean createSeqBean = storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-		
-		if (createSeqBean == null)
-		{
-			if (log.isDebugEnabled())
-				log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage Sequence doesn't exist");
-			
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));			
-		}
-		
-		if (createSeqBean.getSequenceID() != null)
-		{
-			sequenceExists = true;		
-			outSequenceID = createSeqBean.getSequenceID();
-		}
-		else
-			outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;			
+		// Get the parent processor to setup the out message
+		setupOutMessage(rmMsgCtx);
 		
 		// Check if the sequence is already terminated (stored on the internal sequenceid)
-		String terminated = SandeshaUtil.getSequenceProperty(internalSequenceID,
-				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
+		String terminated = SandeshaUtil.getSequenceProperty(getInternalSequenceID(),
+				Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, getStorageManager());
 
 		if (terminated != null && "true".equals(terminated)) {
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
@@ -367,94 +325,36 @@
 		AxisOperation terminateOp = SpecSpecificConstants.getWSRMOperation(
 				Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
 				rmMsgCtx.getRMSpecVersion(),
-				msgContext.getAxisService());
+				getMsgContext().getAxisService());
 		OperationContext opcontext = OperationContextFactory
 				.createOperationContext(
 						WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, terminateOp);
-		opcontext.setParent(msgContext.getServiceContext());
-		configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),	opcontext);
+		opcontext.setParent(getMsgContext().getServiceContext());
+		getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(),	opcontext);
 
-		msgContext.setOperationContext(opcontext);
-		msgContext.setAxisOperation(terminateOp);
+		getMsgContext().setOperationContext(opcontext);
+		getMsgContext().setAxisOperation(terminateOp);
 
 		TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
 				.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-		terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
-
-		rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
-		msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-		rmMsgCtx.setTo(new EndpointReference(toAddress));
-
-		String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
-		if (rmVersion == null)
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
-		rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
-		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
-
-		String transportTo = SandeshaUtil.getSequenceProperty(internalSequenceID,
-				Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
-		if (transportTo != null) {
-			rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
-		}		
-		
-		//setting msg context properties
-		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
-		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
-		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey);
-
-		try {
-			rmMsgCtx.addSOAPEnvelope();
-		} catch (AxisFault e) {
-			throw new SandeshaException(e.getMessage(),e);
-		}
+		terminateSequencePart.getIdentifier().setIndentifer(getOutSequenceID());
 
-		String key = SandeshaUtil.getUUID();
+		rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(getRMVersion()));
+		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(getRMVersion()));
+		
+		SequencePropertyBean terminateAdded = new SequencePropertyBean();
+		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+		terminateAdded.setSequencePropertyKey(getInternalSequenceID());
+		terminateAdded.setValue("true");
 
-		SenderBean terminateBean = new SenderBean();
-		terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
-		terminateBean.setMessageContextRefKey(key);
+		getStorageManager().getSequencePropertyBeanMgr().insert(terminateAdded);
 
+		// Send the outgoing message
 		// Set a retransmitter lastSentTime so that terminate will be send with
 		// some delay.
 		// Otherwise this get send before return of the current request (ack).
 		// TODO: refine the terminate delay.
-		terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
-
-		terminateBean.setMessageID(msgContext.getMessageID());
-		
-		// Set the internal sequence id and outgoing sequence id for the terminate message
-		terminateBean.setInternalSequenceID(internalSequenceID);
-		if (sequenceExists)
-		  terminateBean.setSequenceID(outSequenceID);
-		
-		EndpointReference to = msgContext.getTo();
-		if (to!=null)
-			terminateBean.setToAddress(to.getAddress());
-		
-		// this will be set to true at the sender.
-		if (sequenceExists)
-			terminateBean.setSend(true);
-		else
-			terminateBean.setSend(false);
-
-		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
-		terminateBean.setReSend(false);
-
-		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
-		SequencePropertyBean terminateAdded = new SequencePropertyBean();
-		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
-		terminateAdded.setSequencePropertyKey(internalSequenceID);
-		terminateAdded.setValue("true");
-
-		seqPropMgr.insert(terminateAdded);
-		
-		SandeshaUtil.executeAndStore(rmMsgCtx, key);
-	
-		retramsmitterMgr.insert(terminateBean);
+		sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.TERMINATE_SEQ, Sandesha2Constants.TERMINATE_DELAY);		
 
 		// Pause the message context
 		rmMsgCtx.pause();
@@ -463,5 +363,4 @@
 			log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
 		return true;
 	}
-
 }

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java?view=auto&rev=482691
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java Tue Dec  5 07:43:24 2006
@@ -0,0 +1,217 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+public class WSRMMessageSender  {
+
+	private static final Log log = LogFactory.getLog(WSRMMessageSender.class);
+	
+	private MessageContext msgContext;
+	private StorageManager storageManager;
+	private ConfigurationContext configurationContext;
+	private String toAddress;
+	private String sequenceKey;
+	private String internalSequenceID;
+	private boolean sequenceExists;
+	private String outSequenceID;
+	private String rmVersion;
+	
+	/**
+	 * Extracts information from the rmMsgCtx specific for processing out messages
+	 * 
+	 * @param rmMsgCtx
+	 * @throws AxisFault
+	 */
+	protected void setupOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: WSRMParentProcessor::setupOutMessage");
+
+		msgContext = rmMsgCtx.getMessageContext();
+		configurationContext = msgContext.getConfigurationContext();
+		Options options = msgContext.getOptions();
+
+		storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+				configurationContext.getAxisConfiguration());
+
+		toAddress = rmMsgCtx.getTo().getAddress();
+		sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+		internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
+
+		// Does the sequence exist ?
+		sequenceExists = false;
+		outSequenceID = null;
+		
+		// Get the Create sequence bean with the matching internal sequenceid 
+		CreateSeqBean createSeqFindBean = new CreateSeqBean();
+		createSeqFindBean.setInternalSequenceID(internalSequenceID);
+
+		CreateSeqBean createSeqBean = storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
+		
+		if (createSeqBean == null)
+		{
+			if (log.isDebugEnabled())
+				log.debug("Exit: WSRMParentProcessor::setupOutMessage Sequence doesn't exist");
+			
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));			
+		}
+		
+		if (createSeqBean.getSequenceID() != null)
+		{
+			sequenceExists = true;		
+			outSequenceID = createSeqBean.getSequenceID();
+		}
+		else
+			outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;			
+
+		String rmVersion = SandeshaUtil.getRMVersion(getInternalSequenceID(), getStorageManager());
+		if (rmVersion == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: WSRMParentProcessor::setupOutMessage");
+  }
+	
+	
+	protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
+		
+		rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+		getMsgContext().setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		rmMsgCtx.setTo(new EndpointReference(toAddress));
+		
+		String transportTo = SandeshaUtil.getSequenceProperty(internalSequenceID,
+				Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
+		if (transportTo != null) {
+			rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
+		}		
+		
+		//setting msg context properties
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey);
+
+		rmMsgCtx.addSOAPEnvelope();
+
+		// Ensure the outbound message us secured using the correct token
+		String tokenData = SandeshaUtil.getSequenceProperty(internalSequenceID,
+				Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
+				storageManager);
+		if(tokenData != null) {
+			SecurityManager secMgr = SandeshaUtil.getSecurityManager(configurationContext);
+			SecurityToken token = secMgr.recoverSecurityToken(tokenData);
+			secMgr.applySecurityToken(token, msgContext);
+		}
+		
+		String key = SandeshaUtil.getUUID();
+
+		SenderBean senderBean = new SenderBean();
+		senderBean.setMessageType(msgType);
+		senderBean.setMessageContextRefKey(key);
+		senderBean.setTimeToSend(System.currentTimeMillis() + delay);
+		senderBean.setMessageID(msgContext.getMessageID());
+		
+		// Set the internal sequence id and outgoing sequence id for the terminate message
+		senderBean.setInternalSequenceID(internalSequenceID);
+		if (sequenceExists)
+		{
+			senderBean.setSend(true);
+			senderBean.setSequenceID(outSequenceID);
+		}
+		else
+			senderBean.setSend(false);			
+		
+		EndpointReference to = msgContext.getTo();
+		if (to!=null)
+			senderBean.setToAddress(to.getAddress());
+		
+		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+		senderBean.setReSend(false);
+
+		SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+		
+		SandeshaUtil.executeAndStore(rmMsgCtx, key);
+	
+		retramsmitterMgr.insert(senderBean);
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
+
+	}
+	
+
+	public final StorageManager getStorageManager() {
+  	return storageManager;
+  }
+
+	public final String getInternalSequenceID() {
+  	return internalSequenceID;
+  }
+
+	public final MessageContext getMsgContext() {
+  	return msgContext;
+  }
+
+	public final String getOutSequenceID() {
+  	return outSequenceID;
+  }
+
+	public final boolean isSequenceExists() {
+  	return sequenceExists;
+  }
+
+	public final String getSequenceKey() {
+  	return sequenceKey;
+  }
+
+	public final String getToAddress() {
+  	return toAddress;
+  }
+
+	public final ConfigurationContext getConfigurationContext() {
+  	return configurationContext;
+  }
+
+	public final String getRMVersion() {
+  	return rmVersion;
+  }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org