You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ga...@apache.org on 2006/12/05 16:43:26 UTC
svn commit: r482691 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2:
msgprocessors/AckRequestedProcessor.java
msgprocessors/CloseSequenceProcessor.java
msgprocessors/TerminateSeqMsgProcessor.java util/WSRMMessageSender.java
Author: gatfora
Date: Tue Dec 5 07:43:24 2006
New Revision: 482691
URL: http://svn.apache.org/viewvc?view=rev&rev=482691
Log:
ack request, close and terminate processing refactor patch for SANDESHA2-58
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Tue Dec 5 07:43:24 2006
@@ -30,7 +30,6 @@
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
@@ -41,7 +40,6 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
@@ -50,7 +48,6 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.FaultManager;
@@ -59,13 +56,14 @@
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
import org.apache.sandesha2.wsrm.AckRequested;
/**
* Responsible for processing ack requested headers on incoming messages.
*/
-public class AckRequestedProcessor {
+public class AckRequestedProcessor extends WSRMMessageSender {
private static final Log log = LogFactory.getLog(AckRequestedProcessor.class);
@@ -274,11 +272,8 @@
if (it.hasNext()) {
SenderBean oldAckBean = (SenderBean) it.next();
- timeToSend = oldAckBean.getTimeToSend(); // If there is an
- // old ack. This ack
- // will be sent in
- // the old
- // timeToSend.
+ // If there is an old ack. This ack will be sent in the old timeToSend.
+ timeToSend = oldAckBean.getTimeToSend();
retransmitterBeanMgr.delete(oldAckBean.getMessageID());
}
@@ -315,58 +310,18 @@
if (log.isDebugEnabled())
log.debug("Enter: AckRequestedProcessor::processOutgoingAckRequestMessage");
- MessageContext msgContext = ackRequestRMMsg.getMessageContext();
- ConfigurationContext configurationContext = msgContext.getConfigurationContext();
- Options options = msgContext.getOptions();
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
- configurationContext.getAxisConfiguration());
-
- String toAddress = ackRequestRMMsg.getTo().getAddress();
- String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- String internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
- // Does the sequence exist ?
- boolean sequenceExists = false;
- String outSequenceID = null;
-
- // Get the Create sequence bean with the matching internal sequenceid
- CreateSeqBean createSeqFindBean = new CreateSeqBean();
- createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
- CreateSeqBean createSeqBean = storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-
- if (createSeqBean == null)
- {
- if (log.isDebugEnabled())
- log.debug("Exit: AckRequestedProcessor::processOutMessage Sequence doesn't exist");
-
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.couldNotSendCloseSeqNotFound, internalSequenceID));
- }
-
- if (createSeqBean.getSequenceID() != null)
- {
- sequenceExists = true;
- outSequenceID = createSeqBean.getSequenceID();
- }
- else
- outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;
-
- String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
- if (rmVersion == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+ setupOutMessage(ackRequestRMMsg);
AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
Sandesha2Constants.MessageTypes.ACK,
- rmVersion,
- msgContext.getAxisService());
- msgContext.setAxisOperation(ackOperation);
+ getRMVersion(),
+ getMsgContext().getAxisService());
+ getMsgContext().setAxisOperation(ackOperation);
OperationContext opcontext = new OperationContext(ackOperation);
- opcontext.setParent(msgContext.getServiceContext());
- configurationContext.registerOperationContext(ackRequestRMMsg.getMessageId(), opcontext);
- msgContext.setOperationContext(opcontext);
+ opcontext.setParent(getMsgContext().getServiceContext());
+ getConfigurationContext().registerOperationContext(ackRequestRMMsg.getMessageId(), opcontext);
+ getMsgContext().setOperationContext(opcontext);
Iterator iterator = ackRequestRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
@@ -383,70 +338,13 @@
throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAckRequestPartFound));
}
- ackRequested.getIdentifier().setIndentifer(outSequenceID);
-
- msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (rmVersion));
- ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (rmVersion));
-
- String transportTo = SandeshaUtil.getSequenceProperty(internalSequenceID,
- Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
- if (transportTo != null) {
- ackRequestRMMsg.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
- }
-
- //setting msg context properties
- ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
- ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
- ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey);
+ ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (getRMVersion()));
+ ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (getRMVersion()));
- ackRequestRMMsg.addSOAPEnvelope();
+ sendOutgoingMessage(ackRequestRMMsg, Sandesha2Constants.MessageTypes.ACK_REQUEST, 0);
- // Ensure the outbound message us secured using the correct token
- String tokenData = SandeshaUtil.getSequenceProperty(internalSequenceID,
- Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
- storageManager);
- if(tokenData != null) {
- SecurityManager secMgr = SandeshaUtil.getSecurityManager(configurationContext);
- SecurityToken token = secMgr.recoverSecurityToken(tokenData);
- secMgr.applySecurityToken(token, msgContext);
- }
-
- String key = SandeshaUtil.getUUID();
-
- SenderBean ackRequestBean = new SenderBean();
- ackRequestBean.setMessageType(Sandesha2Constants.MessageTypes.ACK_REQUEST);
- ackRequestBean.setMessageContextRefKey(key);
-
- ackRequestBean.setTimeToSend(System.currentTimeMillis());
-
- ackRequestBean.setMessageID(msgContext.getMessageID());
-
- EndpointReference to = msgContext.getTo();
- if (to!=null)
- ackRequestBean.setToAddress(to.getAddress());
-
- // this will be set to true at the sender.
- if (sequenceExists)
- ackRequestBean.setSend(true);
- else
- ackRequestBean.setSend(false);
-
- msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- ackRequestBean.setReSend(false);
-
- SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
- // Set the sequence id and internal sequence id in the SenderBean
- ackRequestBean.setInternalSequenceID(internalSequenceID);
- if (sequenceExists)
- ackRequestBean.setSequenceID(outSequenceID);
-
- SandeshaUtil.executeAndStore(ackRequestRMMsg, key);
-
- retramsmitterMgr.insert(ackRequestBean);
+ // Pause the message context
+ ackRequestRMMsg.pause();
if (log.isDebugEnabled())
log.debug("Exit: AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Tue Dec 5 07:43:24 2006
@@ -23,9 +23,6 @@
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
@@ -37,16 +34,12 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
@@ -54,6 +47,7 @@
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
import org.apache.sandesha2.wsrm.CloseSequence;
import org.apache.sandesha2.wsrm.Identifier;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -63,7 +57,7 @@
* by the WSRM 1.1 specification)
*/
-public class CloseSequenceProcessor implements MsgProcessor {
+public class CloseSequenceProcessor extends WSRMMessageSender implements MsgProcessor {
private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
@@ -174,61 +168,26 @@
if (log.isDebugEnabled())
log.debug("Enter: CloseSequenceProcessor::processOutMessage");
- MessageContext msgContext = rmMsgCtx.getMessageContext();
- ConfigurationContext configurationContext = msgContext.getConfigurationContext();
- Options options = msgContext.getOptions();
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
- configurationContext.getAxisConfiguration());
-
- String toAddress = rmMsgCtx.getTo().getAddress();
- String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- String internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
- // Does the sequence exist ?
- boolean sequenceExists = false;
- String outSequenceID = null;
-
- // Get the Create sequence bean with the matching internal sequenceid
- CreateSeqBean createSeqFindBean = new CreateSeqBean();
- createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
- CreateSeqBean createSeqBean = storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-
- if (createSeqBean == null)
- {
- if (log.isDebugEnabled())
- log.debug("Exit: CloseSequenceProcessor::processOutMessage Sequence doesn't exist");
-
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.couldNotSendCloseSeqNotFound, internalSequenceID));
- }
-
- if (createSeqBean.getSequenceID() != null)
- {
- sequenceExists = true;
- outSequenceID = createSeqBean.getSequenceID();
- }
- else
- outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;
+ // Get the data from the message context
+ setupOutMessage(rmMsgCtx);
//write into the sequence proeprties that the client is now closed
SequencePropertyBean sequenceClosedBean = new SequencePropertyBean();
- sequenceClosedBean.setSequencePropertyKey(internalSequenceID);
+ sequenceClosedBean.setSequencePropertyKey(getInternalSequenceID());
sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED_CLIENT);
sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
- storageManager.getSequencePropertyBeanMgr().insert(sequenceClosedBean);
+ getStorageManager().getSequencePropertyBeanMgr().insert(sequenceClosedBean);
AxisOperation closeOperation = SpecSpecificConstants.getWSRMOperation(
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE,
rmMsgCtx.getRMSpecVersion(),
rmMsgCtx.getMessageContext().getAxisService());
- msgContext.setAxisOperation(closeOperation);
+ getMsgContext().setAxisOperation(closeOperation);
OperationContext opcontext = new OperationContext(closeOperation);
- opcontext.setParent(msgContext.getServiceContext());
- configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
- msgContext.setOperationContext(opcontext);
+ opcontext.setParent(getMsgContext().getServiceContext());
+ getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
+ getMsgContext().setOperationContext(opcontext);
CloseSequence closeSequencePart = (CloseSequence) rmMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
@@ -238,76 +197,14 @@
closeSequencePart.setIdentifier(identifier);
}
- identifier.setIndentifer(outSequenceID);
-
- msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
- if (rmVersion == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
- rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(rmVersion));
- rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (rmVersion));
-
- String transportTo = SandeshaUtil.getSequenceProperty(internalSequenceID,
- Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
- if (transportTo != null) {
- rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
- }
-
- //setting msg context properties
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey);
-
- rmMsgCtx.addSOAPEnvelope();
-
- // Ensure the outbound message us secured using the correct token
- String tokenData = SandeshaUtil.getSequenceProperty(internalSequenceID,
- Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
- storageManager);
- if(tokenData != null) {
- SecurityManager secMgr = SandeshaUtil.getSecurityManager(configurationContext);
- SecurityToken token = secMgr.recoverSecurityToken(tokenData);
- secMgr.applySecurityToken(token, msgContext);
- }
-
- String key = SandeshaUtil.getUUID();
-
- SenderBean closeBean = new SenderBean();
- // Indicate that this is a close sequence message
- closeBean.setMessageType(Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE);
- closeBean.setMessageContextRefKey(key);
-
- closeBean.setTimeToSend(System.currentTimeMillis());
-
- closeBean.setMessageID(msgContext.getMessageID());
-
- EndpointReference to = msgContext.getTo();
- if (to!=null)
- closeBean.setToAddress(to.getAddress());
-
- // this will be set to true at the sender.
- if (sequenceExists)
- closeBean.setSend(true);
- else
- closeBean.setSend(false);
-
- msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- closeBean.setReSend(false);
-
- SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
- // Add the sequence id and internal sequenceid to the closeBean
- if (sequenceExists)
- closeBean.setSequenceID(outSequenceID);
-
- closeBean.setInternalSequenceID(internalSequenceID);
+ rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(getRMVersion()));
+ rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (getRMVersion()));
- SandeshaUtil.executeAndStore(rmMsgCtx, key);
+ // Send this outgoing message
+ sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0);
- retramsmitterMgr.insert(closeBean);
+ // Pause the message context
+ rmMsgCtx.pause();
if (log.isDebugEnabled())
log.debug("Exit: CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Dec 5 07:43:24 2006
@@ -21,9 +21,7 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
@@ -37,16 +35,12 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
@@ -55,6 +49,7 @@
import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.util.WSRMMessageSender;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.TerminateSequence;
@@ -62,7 +57,7 @@
* Responsible for processing an incoming Terminate Sequence message.
*/
-public class TerminateSeqMsgProcessor implements MsgProcessor {
+public class TerminateSeqMsgProcessor extends WSRMMessageSender implements MsgProcessor {
private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
@@ -312,49 +307,12 @@
if (log.isDebugEnabled())
log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
- MessageContext msgContext = rmMsgCtx.getMessageContext();
- ConfigurationContext configurationContext = msgContext.getConfigurationContext();
- Options options = msgContext.getOptions();
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
- configurationContext.getAxisConfiguration());
-
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
- String toAddress = rmMsgCtx.getTo().getAddress();
- String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- String internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
- // Does the sequence exist ?
- boolean sequenceExists = false;
- String outSequenceID = null;
-
- // Get the Create sequence bean with the matching internal sequenceid
- CreateSeqBean createSeqFindBean = new CreateSeqBean();
- createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
- CreateSeqBean createSeqBean = storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-
- if (createSeqBean == null)
- {
- if (log.isDebugEnabled())
- log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage Sequence doesn't exist");
-
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));
- }
-
- if (createSeqBean.getSequenceID() != null)
- {
- sequenceExists = true;
- outSequenceID = createSeqBean.getSequenceID();
- }
- else
- outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;
+ // Get the parent processor to setup the out message
+ setupOutMessage(rmMsgCtx);
// Check if the sequence is already terminated (stored on the internal sequenceid)
- String terminated = SandeshaUtil.getSequenceProperty(internalSequenceID,
- Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
+ String terminated = SandeshaUtil.getSequenceProperty(getInternalSequenceID(),
+ Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, getStorageManager());
if (terminated != null && "true".equals(terminated)) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
@@ -367,94 +325,36 @@
AxisOperation terminateOp = SpecSpecificConstants.getWSRMOperation(
Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
rmMsgCtx.getRMSpecVersion(),
- msgContext.getAxisService());
+ getMsgContext().getAxisService());
OperationContext opcontext = OperationContextFactory
.createOperationContext(
WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, terminateOp);
- opcontext.setParent(msgContext.getServiceContext());
- configurationContext.registerOperationContext(rmMsgCtx.getMessageId(), opcontext);
+ opcontext.setParent(getMsgContext().getServiceContext());
+ getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(), opcontext);
- msgContext.setOperationContext(opcontext);
- msgContext.setAxisOperation(terminateOp);
+ getMsgContext().setOperationContext(opcontext);
+ getMsgContext().setAxisOperation(terminateOp);
TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
-
- rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
- msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- rmMsgCtx.setTo(new EndpointReference(toAddress));
-
- String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
- if (rmVersion == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
- rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
- rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
-
- String transportTo = SandeshaUtil.getSequenceProperty(internalSequenceID,
- Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
- if (transportTo != null) {
- rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
- }
-
- //setting msg context properties
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
- rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey);
-
- try {
- rmMsgCtx.addSOAPEnvelope();
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage(),e);
- }
+ terminateSequencePart.getIdentifier().setIndentifer(getOutSequenceID());
- String key = SandeshaUtil.getUUID();
+ rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(getRMVersion()));
+ rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(getRMVersion()));
+
+ SequencePropertyBean terminateAdded = new SequencePropertyBean();
+ terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ terminateAdded.setSequencePropertyKey(getInternalSequenceID());
+ terminateAdded.setValue("true");
- SenderBean terminateBean = new SenderBean();
- terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
- terminateBean.setMessageContextRefKey(key);
+ getStorageManager().getSequencePropertyBeanMgr().insert(terminateAdded);
+ // Send the outgoing message
// Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
// Otherwise this get send before return of the current request (ack).
// TODO: refine the terminate delay.
- terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
-
- terminateBean.setMessageID(msgContext.getMessageID());
-
- // Set the internal sequence id and outgoing sequence id for the terminate message
- terminateBean.setInternalSequenceID(internalSequenceID);
- if (sequenceExists)
- terminateBean.setSequenceID(outSequenceID);
-
- EndpointReference to = msgContext.getTo();
- if (to!=null)
- terminateBean.setToAddress(to.getAddress());
-
- // this will be set to true at the sender.
- if (sequenceExists)
- terminateBean.setSend(true);
- else
- terminateBean.setSend(false);
-
- msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- terminateBean.setReSend(false);
-
- SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
- SequencePropertyBean terminateAdded = new SequencePropertyBean();
- terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
- terminateAdded.setSequencePropertyKey(internalSequenceID);
- terminateAdded.setValue("true");
-
- seqPropMgr.insert(terminateAdded);
-
- SandeshaUtil.executeAndStore(rmMsgCtx, key);
-
- retramsmitterMgr.insert(terminateBean);
+ sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.TERMINATE_SEQ, Sandesha2Constants.TERMINATE_DELAY);
// Pause the message context
rmMsgCtx.pause();
@@ -463,5 +363,4 @@
log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
return true;
}
-
}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java?view=auto&rev=482691
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java Tue Dec 5 07:43:24 2006
@@ -0,0 +1,217 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+public class WSRMMessageSender {
+
+ private static final Log log = LogFactory.getLog(WSRMMessageSender.class);
+
+ private MessageContext msgContext;
+ private StorageManager storageManager;
+ private ConfigurationContext configurationContext;
+ private String toAddress;
+ private String sequenceKey;
+ private String internalSequenceID;
+ private boolean sequenceExists;
+ private String outSequenceID;
+ private String rmVersion;
+
+ /**
+ * Extracts information from the rmMsgCtx specific for processing out messages
+ *
+ * @param rmMsgCtx
+ * @throws AxisFault
+ */
+ protected void setupOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: WSRMParentProcessor::setupOutMessage");
+
+ msgContext = rmMsgCtx.getMessageContext();
+ configurationContext = msgContext.getConfigurationContext();
+ Options options = msgContext.getOptions();
+
+ storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+ configurationContext.getAxisConfiguration());
+
+ toAddress = rmMsgCtx.getTo().getAddress();
+ sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ internalSequenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
+
+ // Does the sequence exist ?
+ sequenceExists = false;
+ outSequenceID = null;
+
+ // Get the Create sequence bean with the matching internal sequenceid
+ CreateSeqBean createSeqFindBean = new CreateSeqBean();
+ createSeqFindBean.setInternalSequenceID(internalSequenceID);
+
+ CreateSeqBean createSeqBean = storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
+
+ if (createSeqBean == null)
+ {
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::setupOutMessage Sequence doesn't exist");
+
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));
+ }
+
+ if (createSeqBean.getSequenceID() != null)
+ {
+ sequenceExists = true;
+ outSequenceID = createSeqBean.getSequenceID();
+ }
+ else
+ outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;
+
+ String rmVersion = SandeshaUtil.getRMVersion(getInternalSequenceID(), getStorageManager());
+ if (rmVersion == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::setupOutMessage");
+ }
+
+
+ protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
+
+ rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+ getMsgContext().setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ rmMsgCtx.setTo(new EndpointReference(toAddress));
+
+ String transportTo = SandeshaUtil.getSequenceProperty(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
+ if (transportTo != null) {
+ rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
+ }
+
+ //setting msg context properties
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey);
+
+ rmMsgCtx.addSOAPEnvelope();
+
+ // Ensure the outbound message us secured using the correct token
+ String tokenData = SandeshaUtil.getSequenceProperty(internalSequenceID,
+ Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
+ storageManager);
+ if(tokenData != null) {
+ SecurityManager secMgr = SandeshaUtil.getSecurityManager(configurationContext);
+ SecurityToken token = secMgr.recoverSecurityToken(tokenData);
+ secMgr.applySecurityToken(token, msgContext);
+ }
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean senderBean = new SenderBean();
+ senderBean.setMessageType(msgType);
+ senderBean.setMessageContextRefKey(key);
+ senderBean.setTimeToSend(System.currentTimeMillis() + delay);
+ senderBean.setMessageID(msgContext.getMessageID());
+
+ // Set the internal sequence id and outgoing sequence id for the terminate message
+ senderBean.setInternalSequenceID(internalSequenceID);
+ if (sequenceExists)
+ {
+ senderBean.setSend(true);
+ senderBean.setSequenceID(outSequenceID);
+ }
+ else
+ senderBean.setSend(false);
+
+ EndpointReference to = msgContext.getTo();
+ if (to!=null)
+ senderBean.setToAddress(to.getAddress());
+
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ senderBean.setReSend(false);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+ SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+ retramsmitterMgr.insert(senderBean);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
+
+ }
+
+
+ public final StorageManager getStorageManager() {
+ return storageManager;
+ }
+
+ public final String getInternalSequenceID() {
+ return internalSequenceID;
+ }
+
+ public final MessageContext getMsgContext() {
+ return msgContext;
+ }
+
+ public final String getOutSequenceID() {
+ return outSequenceID;
+ }
+
+ public final boolean isSequenceExists() {
+ return sequenceExists;
+ }
+
+ public final String getSequenceKey() {
+ return sequenceKey;
+ }
+
+ public final String getToAddress() {
+ return toAddress;
+ }
+
+ public final ConfigurationContext getConfigurationContext() {
+ return configurationContext;
+ }
+
+ public final String getRMVersion() {
+ return rmVersion;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org