You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/02/24 03:14:11 UTC
svn commit: r380307 - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: client/ handlers/
msgprocessors/ transport/ util/ workers/
Author: chamikara
Date: Thu Feb 23 18:14:07 2006
New Revision: 380307
URL: http://svn.apache.org/viewcvs?rev=380307&view=rev
Log:
Bug fixes.
Removed some commented code.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java Thu Feb 23 18:14:07 2006
@@ -48,7 +48,8 @@
public static String OFFERED_SEQUENCE_ID = "Sandesha2ClientAPIPropertyOfferedSequenceId";
public static String SANDESHA_DEBUG_MODE = "Sandesha2ClientAPIPropertyDebugMode";
public static String SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";
-
+ public static String MESSAGE_NUMBER = "Sandesha2ClientAPIPropertyMessageNumber";
+
public static SequenceReport getOutgoingSequenceReport (String to, String sequenceKey,ConfigurationContext configurationContext) throws SandeshaException {
String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Thu Feb 23 18:14:07 2006
@@ -33,9 +33,6 @@
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.handlers.AbstractHandler;
-import org.apache.ws.commons.soap.SOAPBody;
-import org.apache.ws.commons.soap.SOAPEnvelope;
-import org.apache.ws.commons.soap.SOAPFactory;
import org.apache.axis2.transport.TransportSender;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,7 +40,6 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.client.Sandesha2ClientAPI;
-import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
@@ -53,11 +49,9 @@
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.transport.Sandesha2TransportSender;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.PropertyManager;
import org.apache.sandesha2.util.RMMsgCreator;
-import org.apache.sandesha2.util.RMPolicyManager;
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaPropertyBean;
import org.apache.sandesha2.util.SandeshaUtil;
@@ -69,10 +63,11 @@
import org.apache.sandesha2.wsrm.MessageNumber;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceOffer;
+import org.apache.ws.commons.soap.SOAPBody;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
import org.apache.wsdl.WSDLConstants;
-import sun.security.action.GetPropertyAction;
-
/**
* This is invoked in the outFlow of an RM endpoint
*
@@ -121,10 +116,6 @@
return;
}
- // Adding the policy bean
- // RMPolicyBean policyBean = RMPolicyManager.getPolicyBean(rmMsgCtx);
- // rmMsgCtx.setProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN,
- // policyBean);
Parameter policyParam = msgCtx
.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
if (policyParam == null) {
@@ -144,7 +135,7 @@
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
- //Transaction transaction = storageManager.getTransaction();
+ Transaction outHandlerTransaction = storageManager.getTransaction();
boolean serverSide = msgCtx.isServerSide();
@@ -152,14 +143,14 @@
if (msgCtx.getMessageID() == null) {
msgCtx.setMessageID(SandeshaUtil.getUUID());
}
- // initial work
+
// find internal sequence id
String internalSequenceId = null;
- // Temp sequence id is the one used to refer to the sequence (since
- // actual sequence id is not available when first msg arrives)
- // server side - sequenceId if the incoming sequence
- // client side - wsaTo + SeequenceKey
+ /* Internal sequence id is the one used to refer to the sequence (since
+ actual sequence id is not available when first msg arrives)
+ server side - sequenceId if the incoming sequence
+ client side - wsaTo + SeequenceKey */
if (serverSide) {
// getting the request message and rmMessage.
@@ -206,15 +197,42 @@
}
- // Strating the sender.
- // SandeshaUtil.startSenderForTheSequence(context,internalSequenceId);
-
- // check if the first message
+ /* checking weather the user has given the messageNumber (most of the cases this will not be the case).
+ In that case the system will generate the message number */
- Transaction ouHandlerSetupTransaction = storageManager.getTransaction();
+ //User should set it as a long object.
+ Long messageNumberLng = (Long) msgCtx.getProperty(Sandesha2ClientAPI.MESSAGE_NUMBER);
+
+ long givenMessageNumber = -1;
+ if (messageNumberLng!=null) {
+ givenMessageNumber = messageNumberLng.longValue();
+ if (givenMessageNumber<=0) {
+ throw new SandeshaException ("The givem message number value is invalid (has to be larger than zero)");
+ }
+ }
+
+ //the message number that was last used.
+ long systemMessageNumber = getPreviousMsgNo(context, internalSequenceId);
+
+ //The number given by the user has to be larger than the last stored number.
+ if (givenMessageNumber>0 && givenMessageNumber<=systemMessageNumber) {
+ String message = "The given message number is not larger than value of the last sent message.";
+ throw new SandeshaException (message);
+ }
+
+ //Finding the correct message number.
+ long messageNumber = -1;
+ if (givenMessageNumber>0) // if given message number is valid use it. (this is larger than the last stored due to the last check)
+ messageNumber = givenMessageNumber;
+ else if (systemMessageNumber>0) { //if system message number is valid use it.
+ messageNumber = systemMessageNumber+1;
+ } else { //This is the fist message (systemMessageNumber = -1)
+ messageNumber = 1;
+ }
+
+ //saving the used message number
+ setNextMsgNo(context,internalSequenceId,messageNumber);
- long messageNumber = getNextMsgNo(context, internalSequenceId);
-
boolean sendCreateSequence = false;
SequencePropertyBean outSeqBean = seqPropMgr.retrieve(
@@ -234,24 +252,17 @@
}
}
+
if (messageNumber == 1) {
if (outSeqBean == null) {
- sendCreateSequence = true;
+ sendCreateSequence = true; // message number being one and not having an out sequence, implies that a create sequence has to be send.
}
- // if fist message - setup the sending side sequence - both for the
- // server and the client sides
- // if (!serverSide && sendCreateSequence) {
+ // if fist message - setup the sending side sequence - both for the server and the client sides
SequenceManager.setupNewClientSequence(msgCtx, internalSequenceId);
}
-
- ouHandlerSetupTransaction.commit();
- // if first message - add create sequence
if (sendCreateSequence) {
-
- Transaction beginCreateSeqTransaction = storageManager.getTransaction();
-
SequencePropertyBean responseCreateSeqAdded = seqPropMgr
.retrieve(
internalSequenceId,
@@ -273,7 +284,6 @@
if (msgCtx.isServerSide()) {
// we do not set acksTo value to anonymous when the create
// sequence is send from the server.
-
MessageContext requestMessage = operationContext
.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
if (requestMessage == null) {
@@ -281,7 +291,6 @@
log.debug(message);
throw new SandeshaException(message);
}
-
acksTo = requestMessage.getTo().getAddress();
} else {
@@ -289,9 +298,7 @@
acksTo = Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
}
- // If acksTo is not anonymous. Start the listner TODO: verify
- if (!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)
- && !serverSide) {
+ if (!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo) && !serverSide) {
String transportIn = (String) context
.getProperty(MessageContext.TRANSPORT_IN);
if (transportIn == null)
@@ -303,8 +310,7 @@
incomingSequencId,
Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
if (bean != null) {
- EndpointReference acksToEPR = new EndpointReference(
- bean.getValue());
+ EndpointReference acksToEPR = new EndpointReference(bean.getValue());
if (acksToEPR != null)
acksTo = (String) acksToEPR.getAddress();
}
@@ -314,19 +320,14 @@
Object trIn = msgCtx
.getProperty(MessageContext.TRANSPORT_IN);
if (trIn == null) {
-
+ //TODO
}
}
- beginCreateSeqTransaction.commit();
-
addCreateSequenceMessage(rmMsgCtx, internalSequenceId, acksTo);
}
}
- // do response processing
-
-
SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
if (env == null) {
SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(
@@ -346,49 +347,50 @@
rmMsgCtx.setMessageId(messageId1);
}
- if (serverSide) {
-
- // processing the response
- processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
+
+
+
+
+
+ if (serverSide) {
+ // let the request end with 202 if a ack has not been
+ // written in the incoming thread.
+
MessageContext reqMsgCtx = msgCtx.getOperationContext()
.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
- RMMsgContext requestRMMsgCtx = MsgInitializer
- .initializeMessage(reqMsgCtx);
- // let the request end with 202 if a ack has not been
- // written in the incoming thread.
if (reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN) == null
|| !"true".equals(reqMsgCtx
.getProperty(Sandesha2Constants.ACK_WRITTEN)))
reqMsgCtx.getOperationContext().setProperty(
org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
- } else {
- EndpointReference toEPR = msgCtx.getTo();
-
- if (toEPR == null) {
- String message = "To EPR is not found";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String to = toEPR.getAddress();
- String operationName = msgCtx.getOperationContext()
- .getAxisOperation().getName().getLocalPart();
-
- if (msgCtx.getWSAAction() == null) {
- msgCtx.setWSAAction(to + "/" + operationName);
- }
-
- if (msgCtx.getSoapAction() == null) {
- msgCtx.setSoapAction("\"" + to + "/" + operationName + "\"");
- }
-
- // processing the response
- processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
+ }
+
+
+ EndpointReference toEPR = msgCtx.getTo();
+ if (toEPR == null) {
+ String message = "To EPR is not found";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+ String to = toEPR.getAddress();
+
+ String operationName = msgCtx.getOperationContext().getAxisOperation().getName().getLocalPart();
+
+ if (msgCtx.getWSAAction() == null) {
+ msgCtx.setWSAAction(to + "/" + operationName);
+ }
+ if (msgCtx.getSoapAction() == null) {
+ msgCtx.setSoapAction("\"" + to + "/" + operationName + "\"");
}
+ // processing the response
+ processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
+
+ msgCtx.pause(); // the execution will be stopped.
+ outHandlerTransaction.commit();
}
public void addCreateSequenceMessage(RMMsgContext applicationRMMsg,
@@ -415,18 +417,15 @@
.getSandeshaStorageManager(applicationMsg
.getConfigurationContext());
- Transaction createSeqTransaction = storageManager.getTransaction();
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
SequenceOffer offer = createSequencePart.getSequenceOffer();
if (offer != null) {
- // Offer processing
String offeredSequenceId = offer.getIdentifer().getIdentifier();
SequencePropertyBean msgsBean = new SequencePropertyBean();
msgsBean.setSequenceID(offeredSequenceId);
- msgsBean
- .setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+ msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
msgsBean.setValue("");
SequencePropertyBean offeredSequenceBean = new SequencePropertyBean();
@@ -440,8 +439,7 @@
}
MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
- createSeqMsg.setRelatesTo(null); // create seq msg does not relateTo
- // anything
+ createSeqMsg.setRelatesTo(null); // create seq msg does not relateTo anything
AbstractContext context = applicationRMMsg.getContext();
if (context == null) {
String message = "Context is null";
@@ -481,8 +479,6 @@
storageManager.storeMessageContext(key,createSeqMsg);
- createSeqTransaction.commit();
-
// sending the message once through our sender.
AxisEngine engine = new AxisEngine(createSeqMsg
.getConfigurationContext());
@@ -492,10 +488,8 @@
TransportOutDescription transportOut = createSeqMsg.getTransportOut();
- createSeqMsg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,
- transportOut);
- createSeqMsg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,
- Sandesha2Constants.VALUE_TRUE);
+ createSeqMsg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+ createSeqMsg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, key);
Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc ();
@@ -507,7 +501,6 @@
} catch (AxisFault e) {
throw new SandeshaException (e.getMessage());
}
-
}
private void processResponseMessage(RMMsgContext rmMsg,
@@ -535,8 +528,6 @@
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(msg.getConfigurationContext());
- Transaction processResponseTransaction = storageManager.getTransaction();
-
SequencePropertyBeanMgr sequencePropertyMgr = storageManager
.getSequencePropretyBeanMgr();
@@ -679,9 +670,7 @@
Identifier id1 = new Identifier(factory);
id1.setIndentifer(identifierStr);
sequence.setIdentifier(id1);
- rmMsg
- .setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE,
- sequence);
+ rmMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE,sequence);
if (addAckRequested) {
ackRequested = new AckRequested(factory);
@@ -698,13 +687,9 @@
throw new SandeshaException(e1.getMessage());
}
- // Retransmitter bean entry for the application message
+ //Retransmitter bean entry for the application message
SenderBean appMsgEntry = new SenderBean();
- // String key = storageManager
- // .storeMessageContext(rmMsg.getMessageContext());
String storageKey = SandeshaUtil.getUUID();
- //storageManager.storeMessageContext(storageKey, msg);
-
appMsgEntry.setMessageContextRefKey(storageKey);
appMsgEntry.setTimeToSend(System.currentTimeMillis());
@@ -740,11 +725,22 @@
msg.setTransportOut(sandesha2TransportOutDesc);
}
+
- processResponseTransaction.commit();
- }
-
- private long getNextMsgNo(ConfigurationContext context,
+ //increasing the current handler index, so that the message will not be going throught the SandeshaOutHandler again.
+ msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex()+1);
+
+ //sending the message through, other handlers and the Sandesha2TransportSender so that it get dumped to the storage.
+ AxisEngine engine = new AxisEngine (msg.getConfigurationContext());
+ try {
+ engine.resumeSend(msg);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+
+ }
+
+ private long getPreviousMsgNo(ConfigurationContext context,
String internalSequenceId) throws SandeshaException {
StorageManager storageManager = SandeshaUtil
@@ -755,26 +751,47 @@
SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(
internalSequenceId,
Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
- long nextMsgNo = 1;
- boolean update = false;
+
+ long nextMsgNo = -1;
if (nextMsgNoBean != null) {
- update = true;
Long nextMsgNoLng = new Long(nextMsgNoBean.getValue());
nextMsgNo = nextMsgNoLng.longValue();
- } else {
+ }
+
+ return nextMsgNo;
+ }
+
+ private void setNextMsgNo(ConfigurationContext context,
+ String internalSequenceId, long msgNo) throws SandeshaException {
+
+ if (msgNo<=0) {
+ String message = "Message number '" + msgNo + "' is invalid. Has to be larger than zero.";
+ throw new SandeshaException (message);
+ }
+
+ StorageManager storageManager = SandeshaUtil
+ .getSandeshaStorageManager(context);
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager
+ .getSequencePropretyBeanMgr();
+ SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(
+ internalSequenceId,
+ Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
+
+ boolean update = true;
+ if (nextMsgNoBean == null) {
+ update = false;
nextMsgNoBean = new SequencePropertyBean();
nextMsgNoBean.setSequenceID(internalSequenceId);
- nextMsgNoBean
- .setName(Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
+ nextMsgNoBean.setName(Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
}
- nextMsgNoBean.setValue(new Long(nextMsgNo + 1).toString());
+ nextMsgNoBean.setValue(new Long(msgNo).toString());
if (update)
seqPropMgr.update(nextMsgNoBean);
else
seqPropMgr.insert(nextMsgNoBean);
- return nextMsgNo;
}
public QName getName() {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Thu Feb 23 18:14:07 2006
@@ -223,10 +223,7 @@
lastOutMessageNo);
if (complete) {
- //Transaction terminateTransaction = storageManager.getTransaction();
- addTerminateSequenceMessage(rmMsgCtx, outSequenceId,
- internalSequenceId);
- //terminateTransaction.commit();
+ addTerminateSequenceMessage(rmMsgCtx, outSequenceId,internalSequenceId);
}
}
@@ -346,15 +343,10 @@
//This should be dumped to the storage by the sender
TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
-
terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
-
terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
-
terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
-
terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
-
addTerminateSeqTransaction.commit();
AxisEngine engine = new AxisEngine (incomingAckRMMsg.getMessageContext().getConfigurationContext());
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Feb 23 18:14:07 2006
@@ -30,16 +30,12 @@
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisOperationFactory;
import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
-import org.apache.ws.commons.soap.SOAPEnvelope;
-import org.apache.ws.commons.soap.SOAPFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
@@ -51,7 +47,6 @@
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.transport.Sandesha2TransportSender;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.PropertyManager;
import org.apache.sandesha2.util.RMMsgCreator;
@@ -63,6 +58,8 @@
import org.apache.sandesha2.wsrm.LastMessage;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
/**
* Responsible for processing an incoming Application message.
@@ -130,13 +127,12 @@
throw new SandeshaException(message);
}
-
-
//setting mustUnderstand to false.
sequence.setMustUnderstand(false);
rmMsgCtx.addSOAPEnvelope();
Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
+
//updating the last activated time of the sequence.
SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
lastUpdatedTimeTransaction.commit();
@@ -160,7 +156,6 @@
&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
//this is a duplicate message and the invocation type is
// EXACTLY_ONCE.
-
rmMsgCtx.pause();
}
@@ -174,8 +169,6 @@
seqPropMgr.update(msgsBean);
updataMsgStringTransaction.commit();
-
-
Transaction invokeTransaction = storageManager.getTransaction();
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Thu Feb 23 18:14:07 2006
@@ -94,20 +94,7 @@
//removing an entry from the listener
String transport = terminateSeqMsg.getTransportIn().getName().getLocalPart();
-// try {
- //This will throw an exception in the server side. //TODO find a better method.
- //TODO : following causes the SAS to stop withot returning 202. find a better method or correct this
- //ListenerManager.stop(context, transport);
-// } catch (AxisFault e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// String message = "Cant stop listener...";
-// log.debug(message);
-// }
-
-
-
-
+
Transaction lastUpdatedTransaction = storageManager.getTransaction();
SequenceManager.updateLastActivatedTime(sequenceId,context);
lastUpdatedTransaction.commit();
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java Thu Feb 23 18:14:07 2006
@@ -9,30 +9,21 @@
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.transport.TransportSender;
-import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
public class Sandesha2TransportSender implements TransportSender {
- private String messageStoreKey = null;
-
public void invoke(MessageContext msgContext) throws AxisFault {
//setting the correct transport sender.
- //TransportSender sender = (TransportSender) msgContext.getProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_SENDER);
TransportOutDescription transportOut = (TransportOutDescription) msgContext.getProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC);
if (transportOut==null)
throw new SandeshaException ("Original transport sender is not present");
-
- //msgContext.getTransportOut().setSender(sender);
+
msgContext.setTransportOut(transportOut);
String key = (String) msgContext.getProperty(Sandesha2Constants.MESSAGE_STORE_KEY);
@@ -42,27 +33,7 @@
ConfigurationContext configurationContext = msgContext.getConfigurationContext();
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
-
- Transaction messageStoreTransaction = storageManager.getTransaction();
storageManager.updateMessageContext(key,msgContext);
- messageStoreTransaction.commit();
-
- Transaction transaction = storageManager.getTransaction();
- //setting send=true (if requestd) for the message.
- SenderBeanMgr senderBeanMgr = storageManager.getRetransmitterBeanMgr();
-
- RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgContext);
-
- String messageType = SandeshaUtil.getMessageTypeString(rmMsg.getMessageType());
- SenderBean senderBean = senderBeanMgr.retrieveFromMessageRefKey(key);
-
-// String setSendToTrue = (String) msgContext.getProperty(Sandesha2Constants.SET_SEND_TO_TRUE);
-// if (Sandesha2Constants.VALUE_TRUE.equals(setSendToTrue)) {
-// senderBean.setSend(true);
-// senderBeanMgr.update(senderBean);
-// }
-
- transaction.commit();
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_TRUE);
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java Thu Feb 23 18:14:07 2006
@@ -31,13 +31,10 @@
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.ParameterImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ws.commons.soap.SOAPEnvelope;
-import org.apache.ws.commons.soap.SOAPFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
@@ -57,6 +54,8 @@
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.SequenceOffer;
import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
/**
* Used to create new RM messages.
@@ -68,31 +67,11 @@
private static Log log = LogFactory.getLog(RMMsgCreator.class);
-
- private static void initializeCreation1(MessageContext relatedMessage,
- MessageContext newMessage) {
- }
-
- private static void finalizeCreation1(MessageContext relatedMessage,
- MessageContext newMessage) throws SandeshaException {
-
- }
-
-
-
-
private static void initializeCreation(MessageContext relatedMessage,
MessageContext newMessage) throws SandeshaException {
- // Seting RMPolicyBean
- // if
- // (rmMsgCtx.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN)==null)
- // rmMsgCtx.setProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN,
- // PropertyManager.getInstance().getRMPolicyBean());
Parameter policyParam = relatedMessage
.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
- // if (propertyParam!=null)
- // newMessage.setProperty(propertyParam.getName(),propertyParam.getValue());
if (policyParam != null) {
@@ -157,8 +136,6 @@
String key = (String) keyIter.next();
newOpContext.setProperty(key, oldOpContextProperties
.get(key));
- // newAxisOperation.addParameter(new ParameterImpl
- // (nextParam.getName(),(String) nextParam.getValue()));
}
}
}
@@ -172,8 +149,6 @@
String key = (String) keyIter.next();
newMessage.setProperty(key, oldMsgContextProperties
.get(key));
- // newAxisOperation.addParameter(new ParameterImpl
- // (nextParam.getName(),(String) nextParam.getValue()));
}
}
}
@@ -261,8 +236,6 @@
} catch (AxisFault e) {
throw new SandeshaException(e.getMessage());
}
-
- // setUpMessage(applicationMsgContext, createSeqmsgContext);
AxisOperation appMsgOperationDesc = applicationMsgContext
.getAxisOperation();
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Thu Feb 23 18:14:07 2006
@@ -54,19 +54,13 @@
public class InOrderInvoker extends Thread {
private boolean runInvoker = false;
- //private boolean stopInvokerAfterWork = false;
private ArrayList workingSequences = new ArrayList();
-
private ConfigurationContext context = null;
+ private Log log = LogFactory.getLog(getClass());
- Log log = LogFactory.getLog(getClass());
-
- int i = 1;
-
public synchronized void stopInvokerForTheSequence(String sequenceID) {
workingSequences.remove(sequenceID);
if (workingSequences.size()==0) {
-
//runInvoker = false;
}
}
@@ -83,7 +77,6 @@
if (!workingSequences.contains(sequenceID))
workingSequences.add(sequenceID);
-
if (!isInvokerStarted()) {
runInvoker = true; //so that isSenderStarted()=true.
@@ -104,7 +97,6 @@
}
try {
-
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(context);
NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
@@ -135,8 +127,6 @@
currentIteration: while (allSequencesItr.hasNext()) {
String sequenceId = (String) allSequencesItr.next();
-
- //Transaction sequenceInvocationTransaction = storageManager.getTransaction();
Transaction invocationTransaction = storageManager.getTransaction(); //Transaction based invocation
@@ -164,11 +154,7 @@
Iterator stMapIt = storageMapMgr.find(
new InvokerBean(null, nextMsgno, sequenceId))
.iterator();
-
- //sequenceInvocationTransaction.commit();
-
-
while (stMapIt.hasNext()) {
@@ -179,8 +165,7 @@
MessageContext msgToInvoke = storageManager.retrieveMessageContext(key,context);
- //invocationTransaction.commit();
-
+
RMMsgContext rmMsg = MsgInitializer
.initializeMessage(msgToInvoke);
Sequence seq = (Sequence) rmMsg
@@ -190,9 +175,7 @@
try {
//Invoking the message.
-// new AxisEngine(msgToInvoke.getConfigurationContext())
-// .receive(msgToInvoke);
-
+
//currently Transaction based invocation can be supplied only for the in-only case.
if (!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern())) {
@@ -209,23 +192,15 @@
log.info("Invoker invoking a '" + SandeshaUtil.getMessageTypeString(rmMsg
.getMessageType()) + "' message.");
- //Transaction deleteEntryTransaction = storageManager.getTransaction();
- //deleting the message entry.
+
storageMapMgr.delete(key);
-
- //deleteEntryTransaction.commit();
-
} catch (AxisFault e) {
throw new SandeshaException(e);
}
//Transaction postInvocationTransaction = storageManager.getTransaction();
//undating the next msg to invoke
-// nextMsgno++;
-// stMapIt = storageMapMgr
-// .find(
-// new InvokerBean(null, nextMsgno,
-// sequenceId)).iterator();
+
//terminate (AfterInvocation)
if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
@@ -247,25 +222,13 @@
}
- //Transaction updateNextMsgTransaction = storageManager.getTransaction();
nextMsgno++;
nextMsgBean.setNextMsgNoToProcess(nextMsgno);
nextMsgMgr.update(nextMsgBean);
- //updateNextMsgTransaction.commit();
-
-// i++;
-// if (i==3) {
-// throw new SandeshaException ("test");
-// }
-
invocationTransaction.commit();
-
-
-
}
} catch (SandeshaException e1) {
- // TODO Auto-generated catch block
e1.printStackTrace();
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=380307&r1=380306&r2=380307&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Thu Feb 23 18:14:07 2006
@@ -28,7 +28,6 @@
import org.apache.axis2.description.AxisOperationFactory;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
-import org.apache.ws.commons.soap.SOAPEnvelope;
import org.apache.axis2.transport.TransportSender;
import org.apache.axis2.transport.TransportUtils;
import org.apache.commons.logging.Log;
@@ -45,11 +44,11 @@
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.PropertyManager;
-import org.apache.sandesha2.util.SandeshaPropertyBean;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.ws.commons.soap.SOAPEnvelope;
/**
* This is responsible for sending and re-sending messages of Sandesha2. This
@@ -62,14 +61,10 @@
public class Sender extends Thread {
private boolean runSender = false;
-
private boolean stopSenderAfterWork = false;
-
private ArrayList workingSequences = new ArrayList();
-
private ConfigurationContext context = null;
-
- Log log = LogFactory.getLog(getClass());
+ private Log log = LogFactory.getLog(getClass());
public synchronized void stopSenderForTheSequence(String sequenceID) {
workingSequences.remove(sequenceID);
@@ -104,11 +99,7 @@
throw new SandeshaException(message);
}
- Transaction pickMessagesToSendTransaction = storageManager
- .getTransaction(); // starting
- // a
- // new
- // transaction
+ Transaction pickMessagesToSendTransaction = storageManager.getTransaction();
SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr();
Collection coll = mgr.findMsgsToSend();
@@ -136,40 +127,29 @@
// set and not true.
// But it will set if it is not set (null)
- // This is used to make sure that the mesage get passed the
- // Sandesha2TransportSender.
+ // This is used to make sure that the mesage get passed the Sandesha2TransportSender.
String qualifiedForSending = (String) msgCtx
.getProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING);
if (qualifiedForSending != null
- && !qualifiedForSending
- .equals(Sandesha2Constants.VALUE_TRUE)) {
+ && !qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
continue;
}
- // try {
-
if (msgCtx == null) {
- log
- .debug("ERROR: Sender has an Unavailable Message entry");
+ log.debug("ERROR: Sender has an Unavailable Message entry");
break;
}
RMMsgContext rmMsgCtx = MsgInitializer
.initializeMessage(msgCtx);
- // rmMsgCtx.addSOAPEnvelope();
// skip sending if this message has been mentioned as a
// message not to send (within sandesha2.properties)
- ArrayList msgsNotToSend = PropertyManager.getInstance()
- .getMessagesNotToSend();
- // SandeshaPropertyBean propertyBean =
- // (SandeshaPropertyBean)
- // messageContext.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
+ ArrayList msgsNotToSend = PropertyManager.getInstance().getMessagesNotToSend();
if (msgsNotToSend != null
- && msgsNotToSend.contains(new Integer(rmMsgCtx
- .getMessageType()))) {
+ && msgsNotToSend.contains(new Integer(rmMsgCtx.getMessageType()))) {
continue;
}
@@ -185,48 +165,29 @@
int messageType = rmMsgCtx.getMessageType();
if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
-
- Sequence sequence = (Sequence) rmMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String sequenceID = sequence.getIdentifier()
- .getIdentifier();
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceID = sequence.getIdentifier().getIdentifier();
// checking weather the sequence has been timed out.
- boolean sequenceTimedOut = SequenceManager
- .hasSequenceTimedOut(sequenceID, rmMsgCtx);
+ boolean sequenceTimedOut = SequenceManager.hasSequenceTimedOut(sequenceID, rmMsgCtx);
if (sequenceTimedOut) {
// sequence has been timed out.
// do time out processing.
// TODO uncomment below line
- TerminateManager.terminateSendingSide(context,
- sequenceID, msgCtx.isServerSide());
+ TerminateManager.terminateSendingSide(context,sequenceID, msgCtx.isServerSide());
String message = "Sequence timed out";
log.debug(message);
throw new SandeshaException(message);
}
- // piggybacking if an ack if available for the same
- // sequence.
+ // piggybacking if an ack if available for the same sequence.
//TODO do piggybacking based on wsa:To
-
AcknowledgementManager.piggybackAckIfPresent(rmMsgCtx);
}
preSendTransaction.commit();
- // every message should be resumed (pause==false) when
- // sending
- // boolean paused = msgCtx.isPaused();
-
- // AxisEngine engine = new AxisEngine(msgCtx
- // .getConfigurationContext());
- // if (paused) {
- // engine.resume(msgCtx);
- // }else {
- // engine.send(msgCtx);
- // }
-
TransportOutDescription transportOutDescription = msgCtx
.getTransportOut();
TransportSender transportSender = transportOutDescription
@@ -235,8 +196,7 @@
transportSender.invoke(msgCtx);
}
- Transaction postSendTransaction = storageManager
- .getTransaction();
+ Transaction postSendTransaction = storageManager.getTransaction();
MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
@@ -261,8 +221,7 @@
mgr.delete(bean1.getMessageID());
}
- postSendTransaction.commit(); // commiting the current
- // transaction
+ postSendTransaction.commit(); // commiting the current transaction
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
@@ -278,8 +237,7 @@
ConfigurationContext configContext = msgCtx
.getConfigurationContext();
- TerminateManager.terminateSendingSide(configContext,
- sequenceID, msgCtx.isServerSide());
+ TerminateManager.terminateSendingSide(configContext,sequenceID, msgCtx.isServerSide());
// removing a entry from the Listener
String transport = msgCtx.getTransportOut().getName()
@@ -348,16 +306,7 @@
}
private void updateMessage(MessageContext msgCtx1) throws SandeshaException {
- // try {
- // RMMsgContext rmMsgCtx1 = MsgInitializer.initializeMessage(msgCtx1);
- // rmMsgCtx1.addSOAPEnvelope();
- //
- // } catch (AxisFault e) {
- // String message = "Exception in updating contexts";
- // log.debug(message);
- // throw new SandeshaException(message);
- // }
-
+ //do updates if required.
}
private void checkForSyncResponses(MessageContext msgCtx)
@@ -383,9 +332,6 @@
responseMessageContext.setProperty(MessageContext.TRANSPORT_IN,
msgCtx.getProperty(MessageContext.TRANSPORT_IN));
- // msgCtx.getAxisOperation().registerOperationContext(responseMessageContext,
- // msgCtx.getOperationContext());
- // responseMessageContext.setServerSide(false);
responseMessageContext
.setServiceContext(msgCtx.getServiceContext());
responseMessageContext.setServiceGroupContext(msgCtx
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org