You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by de...@apache.org on 2007/09/26 18:20:27 UTC
svn commit: r579708 - in
/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java
workers/SenderWorker.java
Author: deepal
Date: Wed Sep 26 09:20:24 2007
New Revision: 579708
URL: http://svn.apache.org/viewvc?rev=579708&view=rev
Log:
- Fixing issue when we send a CS request and receive non create sequence response , in that case we need to stop sending CS req and need to notify client abt that
- When the timeout happen it never going to notify to the client
- This commit fix both of the above , Chamikara or someone who expert about the code base please validate the patch
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Wed Sep 26 09:20:24 2007
@@ -247,7 +247,6 @@
// with the same internal sequenceid
// Check that someone hasn't created the bean
rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
-
// if first message - setup the sending side sequence - both for the
// server and the client sides.
if (rmsBean == null) {
@@ -363,7 +362,8 @@
}
// Update the rmsBean
- storageManager.getRMSBeanMgr().update(rmsBean);
+ rmsBean.setApplicationMessageMessageId(msgContext.getMessageID());
+ storageManager.getRMSBeanMgr().update(rmsBean);
if(startPolling) {
SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);
Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java Wed Sep 26 09:20:24 2007
@@ -144,7 +144,10 @@
*/
private boolean avoidAutoTermination = false;
- /**
+ //To store the message id if the outgoing appliction message
+ private String applicationMessageMessageId ;
+
+ /**
* Flags that are used to check if the primitive types on this bean
* have been set. If a primitive type has not been set then it will
* be ignored within the match method.
@@ -512,4 +515,12 @@
return match;
}
+
+ public String getApplicationMessageMessageId() {
+ return applicationMessageMessageId;
+ }
+
+ public void setApplicationMessageMessageId(String applicationMessageMessageId) {
+ this.applicationMessageMessageId = applicationMessageMessageId;
+ }
}
Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Wed Sep 26 09:20:24 2007
@@ -7,6 +7,9 @@
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.client.async.AxisCallback;
+import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
@@ -17,6 +20,7 @@
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.OutOnlyAxisOperation;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.engine.Handler.InvocationResponse;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportUtils;
@@ -45,13 +49,7 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.*;
public class SenderWorker extends SandeshaWorker implements Runnable {
@@ -82,7 +80,7 @@
try {
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration());
SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-
+
transaction = storageManager.getTransaction();
String key = senderBean.getMessageContextRefKey();
@@ -195,8 +193,8 @@
}
//if the message belong to the Replay Model, it will be send out only if
-
-
+
+
boolean continueSending = updateMessage(rmMsgCtx,senderBean,storageManager);
//save changes done @ updateMessage -> MessageRetransmissionAdjuster.adjustRetransmittion
storageManager.getSenderBeanMgr().update(senderBean);
@@ -210,7 +208,7 @@
transaction.commit();
transaction = null;
}
-
+ invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run, !continueSending");
return;
}
@@ -236,7 +234,7 @@
senderBeanMgr.update(bean2);
}
}
-
+
// have to commit the transaction before sending. This may
// get changed when WS-AT is available.
if(transaction != null) {
@@ -335,10 +333,15 @@
transaction = null;
- if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide())
- checkForSyncResponses(msgCtx);
-
- if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
+ if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide()) {
+ boolean validCs = checkForSyncResponses(msgCtx );
+ if (!validCs) {
+ invokeCallBackObject(storageManager,msgCtx ,
+ "Sandesha2 sender thread has not received a valid CreateSequnceResponse");
+ }
+ }
+
+ if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
&&
(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) {
try {
@@ -505,8 +508,13 @@
log.debug("Exit: SenderWorker::isAckPiggybackableMsgType, " + piggybackable);
return piggybackable;
}
-
- private void checkForSyncResponses(MessageContext msgCtx) {
+
+ /**
+ * return value will be false if the create sequence fails else it will be true
+ * @param msgCtx
+ * @return
+ */
+ private boolean checkForSyncResponses(MessageContext msgCtx ) {
if (log.isDebugEnabled())
log.debug("Enter: SenderWorker::checkForSyncResponses, " + msgCtx.getEnvelope().getHeader());
@@ -522,7 +530,7 @@
boolean transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) != null);
if (!transportInPresent && (responseMessageContext==null || responseMessageContext.getEnvelope()==null)) {
if(log.isDebugEnabled()) log.debug("Exit: SenderWorker::checkForSyncResponses, no response present");
- return;
+ return true;
}
//to find out weather the response was built by me.
@@ -592,7 +600,7 @@
log.error ("Caught exception", e);
}
- return;
+ return true;
}
//If addressing is disabled we will be adding this message simply as the application response of the request message.
@@ -631,7 +639,7 @@
//if the syncResponseWas not built here and the client was not expecting a sync response. We will not try to execute
//here. Doing so will cause a double invocation for a async message.
if (msgCtx.getOptions().isUseSeparateListener()==true && !syncResponseBuilt) {
- return;
+ return true;
}
@@ -650,13 +658,22 @@
}
} catch (Exception e) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
- if (log.isWarnEnabled())
- log.warn(message, e);
+
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
+ if (msgCtx != null &&! msgCtx.isServerSide() &&
+ (Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())
+ || Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())) ){
+ // We have not received a valid createSequnce reponse for the request we send so we need to terminate the seunce here
+ return false;
+ } else {
+ if (log.isWarnEnabled())
+ log.warn(message, e);
+ }
}
if (log.isDebugEnabled())
log.debug("Exit: SenderWorker::checkForSyncResponses");
- }
+ return true;
+ }
private void recordError (Exception e, RMMsgContext outRMMsg, StorageManager storageManager) throws SandeshaStorageException {
// Store the Exception as a sequence property to enable the client to lookup the last
@@ -702,5 +719,60 @@
}
}
}
-
+
+ private void invokeCallBackObject(StorageManager storageManager,
+ MessageContext msgCtx,
+ String message) throws SandeshaStorageException {
+ Transaction transaction = null;
+ if (msgCtx.isServerSide()) {
+ return;
+ }
+ try {
+ transaction = storageManager.getTransaction();
+ //terminate message sent using the SandeshaClient. Since the terminate message will simply get the
+ //InFlow of the reference message get called which could be zero sized (OutOnly operations).
+
+ // terminate sending side if this is the WSRM 1.0 spec.
+ // If the WSRM versoion is 1.1 termination will happen in the terminate sequence response message.
+
+ String internalSequenceId = (String) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+ if (internalSequenceId == null) internalSequenceId = senderBean.getInternalSequenceID();
+ if (internalSequenceId != null) {
+ // Create a new Transaction
+ transaction = storageManager.getTransaction();
+ RMSBean bean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+ TerminateManager.terminateSendingSide(bean, storageManager);
+
+ OperationContext opCtx =
+ configurationContext.getOperationContext(bean.getApplicationMessageMessageId());
+ if (opCtx != null) {
+ AxisOperation applicationAxisOperation = opCtx.getAxisOperation();
+ if (applicationAxisOperation != null) {
+ MessageReceiver msgReceiver = applicationAxisOperation.getMessageReceiver();
+ if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) {
+ Object callback = ((CallbackReceiver) msgReceiver)
+ .lookupCallback(bean.getApplicationMessageMessageId());
+ if (callback != null) {
+ AxisCallback axisCallback = ((AxisCallback) callback);
+ axisCallback.onError(new Exception(message));
+ axisCallback.onComplete();
+ }
+ }
+ }
+ }
+ if (transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+ }
+
+ } catch (Exception e) {
+ if (log.isWarnEnabled())
+ log.warn(e);
+ } finally {
+ if (transaction != null && transaction.isActive()) {
+ transaction.rollback();
+ transaction = null;
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org
Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java
Posted by Chamikara Jayalath <ch...@gmail.com>.
Yes, we hv to consider all other cases. checkForSyncResponse() is a general
method which checks for any synchronous response.
Also what u need to check for is the CREATE_SEQUENCE_RESPONSE action. Not
the CREATE_SEQUENCE action.
Chamikara
On 9/28/07, Deepal Jayasinghe <de...@gmail.com> wrote:
>
> Well , I added code to terminate the sequence when we send CS and
> receive non CS response. All the other cases I have ignored , if you
> think we need to consider other one as well then I can add the code
> for that.
>
> Thanks
> Deepal
>
> On 9/28/07, Chamikara Jayalath <ch...@gmail.com> wrote:
> > Hi Deepal,
> >
> > Did you assume that checkForSyncResponse() only checks for a
> > CreateSequenceResponse messages.
> > It's not the case. It can be anything ( an Ack, CSR, application
> response
> > etc.)
> >
> > Chamikara
> >
> >
> >
> > On 9/27/07, Deepal jayasinghe <de...@gmail.com> wrote:
> > >
> > > Andrew K Gatford wrote:
> > > > To check the fix, I'd like to understand what response is being
> returned
> > > > when sending out a CreateSequence message that isn't a
> > > > CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or
> is
> > > it
> > > > something else ?
> > > >
> > > It was something else. It is like we send a CS req to a server which
> > > does not have either addressing or rm support. In that case it will
> just
> > > process the request and send an application response. So the reply was
> > > neither a CS response nor a SOAF fault.
> > >
> > > Thanks
> > > Deepal
> > > > Andrew Gatford
> > > >
> > > > Hursley MP211
> > > > Telephone :
> > > > Internal (7) 245743
> > > > External 01962 815743
> > > > Internet : gatfora@uk.ibm.com
> > > >
> > > >
> > > >
> > > > deepal@apache.org
> > > > 26/09/2007 17:20
> > > >
> > > > To
> > > > sandesha-cvs@ws.apache.org
> > > > cc
> > > >
> > > > Subject
> > > > svn commit: r579708 - in
> > > >
> > >
> >
> /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
> > > > msgprocessors/ApplicationMsgProcessor.java
> storage/beans/RMSBean.java
> > > > workers/SenderWorker.java
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
> > > For additional commands, e-mail: sandesha-dev-help@ws.apache.org
> > >
> > >
> >
>
>
> --
>
>
>
>
>
> ============================
> Deepal Jayasinghe
> Lanka Software Foundation
>
Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java
Posted by Deepal Jayasinghe <de...@gmail.com>.
Well , I added code to terminate the sequence when we send CS and
receive non CS response. All the other cases I have ignored , if you
think we need to consider other one as well then I can add the code
for that.
Thanks
Deepal
On 9/28/07, Chamikara Jayalath <ch...@gmail.com> wrote:
> Hi Deepal,
>
> Did you assume that checkForSyncResponse() only checks for a
> CreateSequenceResponse messages.
> It's not the case. It can be anything ( an Ack, CSR, application response
> etc.)
>
> Chamikara
>
>
>
> On 9/27/07, Deepal jayasinghe <de...@gmail.com> wrote:
> >
> > Andrew K Gatford wrote:
> > > To check the fix, I'd like to understand what response is being returned
> > > when sending out a CreateSequence message that isn't a
> > > CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is
> > it
> > > something else ?
> > >
> > It was something else. It is like we send a CS req to a server which
> > does not have either addressing or rm support. In that case it will just
> > process the request and send an application response. So the reply was
> > neither a CS response nor a SOAF fault.
> >
> > Thanks
> > Deepal
> > > Andrew Gatford
> > >
> > > Hursley MP211
> > > Telephone :
> > > Internal (7) 245743
> > > External 01962 815743
> > > Internet : gatfora@uk.ibm.com
> > >
> > >
> > >
> > > deepal@apache.org
> > > 26/09/2007 17:20
> > >
> > > To
> > > sandesha-cvs@ws.apache.org
> > > cc
> > >
> > > Subject
> > > svn commit: r579708 - in
> > >
> >
> /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
> > > msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java
> > > workers/SenderWorker.java
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
> > For additional commands, e-mail: sandesha-dev-help@ws.apache.org
> >
> >
>
--
============================
Deepal Jayasinghe
Lanka Software Foundation
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org
Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java
Posted by Chamikara Jayalath <ch...@gmail.com>.
Hi Deepal,
Did you assume that checkForSyncResponse() only checks for a
CreateSequenceResponse messages.
It's not the case. It can be anything ( an Ack, CSR, application response
etc.)
Chamikara
On 9/27/07, Deepal jayasinghe <de...@gmail.com> wrote:
>
> Andrew K Gatford wrote:
> > To check the fix, I'd like to understand what response is being returned
> > when sending out a CreateSequence message that isn't a
> > CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is
> it
> > something else ?
> >
> It was something else. It is like we send a CS req to a server which
> does not have either addressing or rm support. In that case it will just
> process the request and send an application response. So the reply was
> neither a CS response nor a SOAF fault.
>
> Thanks
> Deepal
> > Andrew Gatford
> >
> > Hursley MP211
> > Telephone :
> > Internal (7) 245743
> > External 01962 815743
> > Internet : gatfora@uk.ibm.com
> >
> >
> >
> > deepal@apache.org
> > 26/09/2007 17:20
> >
> > To
> > sandesha-cvs@ws.apache.org
> > cc
> >
> > Subject
> > svn commit: r579708 - in
> >
> /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
> > msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java
> > workers/SenderWorker.java
> >
> >
> >
> >
> >
> >
> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
> For additional commands, e-mail: sandesha-dev-help@ws.apache.org
>
>
Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java
Posted by Deepal jayasinghe <de...@gmail.com>.
Andrew K Gatford wrote:
> To check the fix, I'd like to understand what response is being returned
> when sending out a CreateSequence message that isn't a
> CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is it
> something else ?
>
It was something else. It is like we send a CS req to a server which
does not have either addressing or rm support. In that case it will just
process the request and send an application response. So the reply was
neither a CS response nor a SOAF fault.
Thanks
Deepal
> Andrew Gatford
>
> Hursley MP211
> Telephone :
> Internal (7) 245743
> External 01962 815743
> Internet : gatfora@uk.ibm.com
>
>
>
> deepal@apache.org
> 26/09/2007 17:20
>
> To
> sandesha-cvs@ws.apache.org
> cc
>
> Subject
> svn commit: r579708 - in
> /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
> msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java
> workers/SenderWorker.java
>
>
>
>
>
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org
Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java
storage/beans/RMSBean.java workers/SenderWorker.java
Posted by Andrew K Gatford <GA...@uk.ibm.com>.
To check the fix, I'd like to understand what response is being returned
when sending out a CreateSequence message that isn't a
CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is it
something else ?
Andrew Gatford
Hursley MP211
Telephone :
Internal (7) 245743
External 01962 815743
Internet : gatfora@uk.ibm.com
deepal@apache.org
26/09/2007 17:20
To
sandesha-cvs@ws.apache.org
cc
Subject
svn commit: r579708 - in
/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java
workers/SenderWorker.java
Author: deepal
Date: Wed Sep 26 09:20:24 2007
New Revision: 579708
URL: http://svn.apache.org/viewvc?rev=579708&view=rev
Log:
- Fixing issue when we send a CS request and receive non create sequence
response , in that case we need to stop sending CS req and need to notify
client abt that
- When the timeout happen it never going to notify to the client
- This commit fix both of the above , Chamikara or someone who expert
about the code base please validate the patch
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Wed Sep 26 09:20:24 2007
@@ -247,7 +247,6 @@
// with
the same internal sequenceid
// Check
that someone hasn't created the bean
rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
-
// if
first message - setup the sending side sequence - both for the
// server
and the client sides.
if
(rmsBean == null) {
@@ -363,7 +362,8 @@
}
// Update the rmsBean
- storageManager.getRMSBeanMgr().update(rmsBean);
+ rmsBean.setApplicationMessageMessageId(msgContext.getMessageID());
+ storageManager.getRMSBeanMgr().update(rmsBean);
if(startPolling) {
SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(),
rmsBean);
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
Wed Sep 26 09:20:24 2007
@@ -144,7 +144,10 @@
*/
private boolean avoidAutoTermination = false;
- /**
+ //To store the message id if the outgoing appliction message
+ private String applicationMessageMessageId ;
+
+ /**
* Flags that are used to check if the primitive types on
this bean
* have been set. If a primitive type has not been set
then it will
* be ignored within the match method.
@@ -512,4 +515,12 @@
return match;
}
+
+ public String getApplicationMessageMessageId() {
+ return applicationMessageMessageId;
+ }
+
+ public void setApplicationMessageMessageId(String
applicationMessageMessageId) {
+ this.applicationMessageMessageId = applicationMessageMessageId;
+ }
}
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Wed Sep 26 09:20:24 2007
@@ -7,6 +7,9 @@
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.client.async.AxisCallback;
+import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
@@ -17,6 +20,7 @@
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.OutOnlyAxisOperation;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.engine.Handler.InvocationResponse;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportUtils;
@@ -45,13 +49,7 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.*;
public class SenderWorker extends SandeshaWorker implements Runnable {
@@ -82,7 +80,7 @@
try {
StorageManager
storageManager =
SandeshaUtil.getSandeshaStorageManager(configurationContext,
configurationContext.getAxisConfiguration());
SenderBeanMgr
senderBeanMgr = storageManager.getSenderBeanMgr();
-
+
transaction =
storageManager.getTransaction();
String key =
senderBean.getMessageContextRefKey();
@@ -195,8 +193,8 @@
}
//if the message belong
to the Replay Model, it will be send out only if
-
-
+
+
boolean continueSending =
updateMessage(rmMsgCtx,senderBean,storageManager);
//save changes done @
updateMessage -> MessageRetransmissionAdjuster.adjustRetransmittion
storageManager.getSenderBeanMgr().update(senderBean);
@@ -210,7 +208,7 @@
transaction.commit();
transaction = null;
}
-
+ invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run,
!continueSending");
return;
}
@@ -236,7 +234,7 @@
senderBeanMgr.update(bean2);
}
}
-
+
// have to commit the
transaction before sending. This may
// get changed when WS-AT
is available.
if(transaction != null) {
@@ -335,10 +333,15 @@
transaction = null;
- if
((processResponseForFaults || successfullySent) && !msgCtx.isServerSide())
- checkForSyncResponses(msgCtx);
-
- if
((rmMsgCtx.getMessageType() ==
Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
+ if ((processResponseForFaults || successfullySent) &&
!msgCtx.isServerSide()) {
+ boolean validCs = checkForSyncResponses(msgCtx );
+ if (!validCs) {
+ invokeCallBackObject(storageManager,msgCtx ,
+ "Sandesha2 sender thread has not received a
valid CreateSequnceResponse");
+ }
+ }
+
+ if ((rmMsgCtx.getMessageType() ==
Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
&&
(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue())))
{
try {
@@ -505,8 +508,13 @@
log.debug("Exit:
SenderWorker::isAckPiggybackableMsgType, " + piggybackable);
return piggybackable;
}
-
- private void checkForSyncResponses(MessageContext msgCtx)
{
+
+ /**
+ * return value will be false if the create sequence fails else it
will be true
+ * @param msgCtx
+ * @return
+ */
+ private boolean checkForSyncResponses(MessageContext msgCtx ) {
if (log.isDebugEnabled())
log.debug("Enter:
SenderWorker::checkForSyncResponses, " +
msgCtx.getEnvelope().getHeader());
@@ -522,7 +530,7 @@
boolean
transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) !=
null);
if (!transportInPresent
&& (responseMessageContext==null ||
responseMessageContext.getEnvelope()==null)) {
if(log.isDebugEnabled()) log.debug("Exit:
SenderWorker::checkForSyncResponses, no response present");
- return;
+ return
true;
}
//to find out weather the
response was built by me.
@@ -592,7 +600,7 @@
log.error ("Caught exception", e);
}
- return;
+ return true;
}
//If
addressing is disabled we will be adding this message simply as the
application response of the request message.
@@ -631,7 +639,7 @@
//if the syncResponseWas
not built here and the client was not expecting a sync response. We will
not try to execute
//here. Doing so will
cause a double invocation for a async message.
if
(msgCtx.getOptions().isUseSeparateListener()==true && !syncResponseBuilt)
{
- return;
+ return
true;
}
@@ -650,13 +658,22 @@
}
} catch (Exception e) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
- if (log.isWarnEnabled())
- log.warn(message, e);
+
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
+ if (msgCtx != null &&! msgCtx.isServerSide() &&
+
(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())
+ ||
Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction()))
){
+ // We have not received a valid createSequnce reponse for
the request we send so we need to terminate the seunce here
+ return false;
+ } else {
+ if (log.isWarnEnabled())
+ log.warn(message, e);
+ }
}
if (log.isDebugEnabled())
log.debug("Exit:
SenderWorker::checkForSyncResponses");
- }
+ return true;
+ }
private void recordError (Exception e, RMMsgContext
outRMMsg, StorageManager storageManager) throws SandeshaStorageException {
// Store the Exception as a sequence
property to enable the client to lookup the last
@@ -702,5 +719,60 @@
}
}
}
-
+
+ private void invokeCallBackObject(StorageManager storageManager,
+ MessageContext msgCtx,
+ String message) throws
SandeshaStorageException {
+ Transaction transaction = null;
+ if (msgCtx.isServerSide()) {
+ return;
+ }
+ try {
+ transaction = storageManager.getTransaction();
+ //terminate message sent using the SandeshaClient. Since the
terminate message will simply get the
+ //InFlow of the reference message get called which could be
zero sized (OutOnly operations).
+
+ // terminate sending side if this is the WSRM 1.0 spec.
+ // If the WSRM versoion is 1.1 termination will happen in the
terminate sequence response message.
+
+ String internalSequenceId = (String)
msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+ if (internalSequenceId == null) internalSequenceId =
senderBean.getInternalSequenceID();
+ if (internalSequenceId != null) {
+ // Create a new Transaction
+ transaction = storageManager.getTransaction();
+ RMSBean bean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
+ TerminateManager.terminateSendingSide(bean,
storageManager);
+
+ OperationContext opCtx =
+
configurationContext.getOperationContext(bean.getApplicationMessageMessageId());
+ if (opCtx != null) {
+ AxisOperation applicationAxisOperation =
opCtx.getAxisOperation();
+ if (applicationAxisOperation != null) {
+ MessageReceiver msgReceiver =
applicationAxisOperation.getMessageReceiver();
+ if ((msgReceiver != null) && (msgReceiver
instanceof CallbackReceiver)) {
+ Object callback = ((CallbackReceiver)
msgReceiver)
+ .lookupCallback(bean.getApplicationMessageMessageId());
+ if (callback != null) {
+ AxisCallback axisCallback =
((AxisCallback) callback);
+ axisCallback.onError(new
Exception(message));
+ axisCallback.onComplete();
+ }
+ }
+ }
+ }
+ if (transaction != null && transaction.isActive())
transaction.commit();
+ transaction = null;
+ }
+
+ } catch (Exception e) {
+ if (log.isWarnEnabled())
+ log.warn(e);
+ } finally {
+ if (transaction != null && transaction.isActive()) {
+ transaction.rollback();
+ transaction = null;
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number
741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org