You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by de...@apache.org on 2007/09/26 18:20:27 UTC

svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java

Author: deepal
Date: Wed Sep 26 09:20:24 2007
New Revision: 579708

URL: http://svn.apache.org/viewvc?rev=579708&view=rev
Log:
- Fixing issue when we send a CS request and receive non create sequence response , in that case we need to stop sending CS req and need to notify client abt that
- When the timeout happen it never going to notify to the client 

- This commit fix both of the above , Chamikara or someone who expert about the code base please validate the patch 

 

Modified:
    webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
    webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java

Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Wed Sep 26 09:20:24 2007
@@ -247,7 +247,6 @@
 				// with the same internal sequenceid
 				// Check that someone hasn't created the bean
 				rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
-				
 				// if first message - setup the sending side sequence - both for the
 				// server and the client sides.
 				if (rmsBean == null) {
@@ -363,7 +362,8 @@
 		}
 
 		// Update the rmsBean
-		storageManager.getRMSBeanMgr().update(rmsBean);
+        rmsBean.setApplicationMessageMessageId(msgContext.getMessageID());
+        storageManager.getRMSBeanMgr().update(rmsBean);
 		
 		if(startPolling) {
 			SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);

Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java Wed Sep 26 09:20:24 2007
@@ -144,7 +144,10 @@
 	 */
 	private boolean avoidAutoTermination = false;
 
-	/**
+    //To store the message id if the outgoing appliction message
+    private String applicationMessageMessageId ;
+
+    /**
 	 * Flags that are used to check if the primitive types on this bean
 	 * have been set. If a primitive type has not been set then it will
 	 * be ignored within the match method.
@@ -512,4 +515,12 @@
 		return match;
 	}
 
+
+    public String getApplicationMessageMessageId() {
+        return applicationMessageMessageId;
+    }
+
+    public void setApplicationMessageMessageId(String applicationMessageMessageId) {
+        this.applicationMessageMessageId = applicationMessageMessageId;
+    }
 }

Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Wed Sep 26 09:20:24 2007
@@ -7,6 +7,9 @@
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.client.async.AxisCallback;
+import org.apache.axis2.util.CallbackReceiver;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
@@ -17,6 +20,7 @@
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.OutOnlyAxisOperation;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
 import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.TransportUtils;
@@ -45,13 +49,7 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.*;
 
 public class SenderWorker extends SandeshaWorker implements Runnable {
 
@@ -82,7 +80,7 @@
 		try {
 			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration());
 			SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-			
+
 			transaction = storageManager.getTransaction();
 
 			String key = senderBean.getMessageContextRefKey();
@@ -195,8 +193,8 @@
 			}
 			
 			//if the message belong to the Replay Model, it will be send out only if 
-			
-			
+
+
 			boolean continueSending = updateMessage(rmMsgCtx,senderBean,storageManager);
 			//save changes done @ updateMessage -> MessageRetransmissionAdjuster.adjustRetransmittion
 			storageManager.getSenderBeanMgr().update(senderBean);
@@ -210,7 +208,7 @@
 					transaction.commit();
 					transaction = null;
 				}
-				
+				invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run, !continueSending");
 				return;
 			}
 
@@ -236,7 +234,7 @@
 					senderBeanMgr.update(bean2);
 				}
 			}
-			
+
 			// have to commit the transaction before sending. This may
 			// get changed when WS-AT is available.
 			if(transaction != null) {
@@ -335,10 +333,15 @@
 			
 			transaction = null;
 
-			if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide()) 
-				checkForSyncResponses(msgCtx);
-			
-			if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
+            if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide()) {
+                boolean  validCs = checkForSyncResponses(msgCtx );
+                if (!validCs) {
+                    invokeCallBackObject(storageManager,msgCtx ,
+                            "Sandesha2 sender thread has not received a valid CreateSequnceResponse");
+                }
+            }
+
+            if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
 					&&
 					 (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) {
 				try {
@@ -505,8 +508,13 @@
 			log.debug("Exit: SenderWorker::isAckPiggybackableMsgType, " + piggybackable);
 		return piggybackable;
 	}
-	
-	private void checkForSyncResponses(MessageContext msgCtx) {
+
+    /**
+     * return value will be false if the create sequence fails else it will be true
+     * @param msgCtx
+     * @return
+     */
+    private boolean checkForSyncResponses(MessageContext msgCtx ) {
 		if (log.isDebugEnabled())
 			log.debug("Enter: SenderWorker::checkForSyncResponses, " + msgCtx.getEnvelope().getHeader());
 
@@ -522,7 +530,7 @@
 			boolean transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) != null);
 			if (!transportInPresent && (responseMessageContext==null || responseMessageContext.getEnvelope()==null)) {
 				if(log.isDebugEnabled()) log.debug("Exit: SenderWorker::checkForSyncResponses, no response present");
-				return;
+				return true;
 			}
 			
 			//to find out weather the response was built by me.
@@ -592,7 +600,7 @@
 					log.error ("Caught exception", e);
 					}
 				
-					return;
+					return true;
 				}
 				
 				//If addressing is disabled we will be adding this message simply as the application response of the request message.
@@ -631,7 +639,7 @@
 			//if the syncResponseWas not built here and the client was not expecting a sync response. We will not try to execute 
 			//here. Doing so will cause a double invocation for a async message. 
 			if (msgCtx.getOptions().isUseSeparateListener()==true &&  !syncResponseBuilt) {
-				return;
+				return true;
 			}
 			
 			
@@ -650,13 +658,22 @@
 			}
 
 		} catch (Exception e) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
-			if (log.isWarnEnabled())
-				log.warn(message, e);
+
+            String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
+            if (msgCtx != null &&! msgCtx.isServerSide() &&
+                    (Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())
+                            || Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())) ){
+                // We have not received a valid createSequnce reponse for the request we send so we need to terminate the seunce here
+                return false;
+            } else {
+                if (log.isWarnEnabled())
+                    log.warn(message, e);
+            }
 		}
 		if (log.isDebugEnabled())
 			log.debug("Exit: SenderWorker::checkForSyncResponses");
-	}
+        return true;
+    }
 	
 	private void recordError (Exception e, RMMsgContext outRMMsg, StorageManager storageManager) throws SandeshaStorageException {
 		// Store the Exception as a sequence property to enable the client to lookup the last 
@@ -702,5 +719,60 @@
 			}
 		}
 	}
-	
+
+    private void invokeCallBackObject(StorageManager storageManager,
+                                      MessageContext msgCtx,
+                                      String message) throws SandeshaStorageException {
+        Transaction transaction = null;
+        if (msgCtx.isServerSide()) {
+            return;
+        }
+        try {
+            transaction = storageManager.getTransaction();
+            //terminate message sent using the SandeshaClient. Since the terminate message will simply get the
+            //InFlow of the reference message get called which could be zero sized (OutOnly operations).
+
+            // terminate sending side if this is the WSRM 1.0 spec.
+            // If the WSRM versoion is 1.1 termination will happen in the terminate sequence response message.
+
+            String internalSequenceId = (String) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+            if (internalSequenceId == null) internalSequenceId = senderBean.getInternalSequenceID();
+            if (internalSequenceId != null) {
+                // Create a new Transaction
+                transaction = storageManager.getTransaction();
+                RMSBean bean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+                TerminateManager.terminateSendingSide(bean, storageManager);
+
+                OperationContext opCtx =
+                        configurationContext.getOperationContext(bean.getApplicationMessageMessageId());
+                if (opCtx != null) {
+                    AxisOperation applicationAxisOperation = opCtx.getAxisOperation();
+                    if (applicationAxisOperation != null) {
+                        MessageReceiver msgReceiver = applicationAxisOperation.getMessageReceiver();
+                        if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) {
+                            Object callback = ((CallbackReceiver) msgReceiver)
+                                    .lookupCallback(bean.getApplicationMessageMessageId());
+                            if (callback != null) {
+                                AxisCallback axisCallback = ((AxisCallback) callback);
+                                axisCallback.onError(new Exception(message));
+                                axisCallback.onComplete();
+                            }
+                        }
+                    }
+                }
+                if (transaction != null && transaction.isActive()) transaction.commit();
+                transaction = null;
+            }
+
+        } catch (Exception e) {
+            if (log.isWarnEnabled())
+                log.warn(e);
+        } finally {
+            if (transaction != null && transaction.isActive()) {
+                transaction.rollback();
+                transaction = null;
+            }
+        }
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java

Posted by Chamikara Jayalath <ch...@gmail.com>.
Yes, we hv to consider all other cases. checkForSyncResponse() is a general
method which checks for any synchronous response.
Also what u need to check for is the CREATE_SEQUENCE_RESPONSE action. Not
the CREATE_SEQUENCE action.

Chamikara


On 9/28/07, Deepal Jayasinghe <de...@gmail.com> wrote:
>
> Well , I added code to terminate the sequence when we send CS and
> receive non CS response. All the other cases I have ignored , if you
> think we need to consider other one as well then I can add the code
> for that.
>
> Thanks
> Deepal
>
> On 9/28/07, Chamikara Jayalath <ch...@gmail.com> wrote:
> > Hi Deepal,
> >
> > Did you assume that checkForSyncResponse() only checks for a
> > CreateSequenceResponse messages.
> > It's not the case. It can be anything ( an Ack, CSR, application
> response
> > etc.)
> >
> > Chamikara
> >
> >
> >
> > On 9/27/07, Deepal jayasinghe <de...@gmail.com> wrote:
> > >
> > > Andrew K Gatford wrote:
> > > > To check the fix, I'd like to understand what response is being
> returned
> > > > when sending out a CreateSequence message that isn't a
> > > > CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or
> is
> > > it
> > > > something else ?
> > > >
> > > It was something else. It is like we send a CS req to a server which
> > > does not have either addressing or rm support. In that case it will
> just
> > > process the request and send an application response. So the reply was
> > > neither a CS response nor a SOAF fault.
> > >
> > > Thanks
> > > Deepal
> > > > Andrew Gatford
> > > >
> > > > Hursley MP211
> > > > Telephone :
> > > > Internal (7) 245743
> > > > External 01962 815743
> > > > Internet : gatfora@uk.ibm.com
> > > >
> > > >
> > > >
> > > > deepal@apache.org
> > > > 26/09/2007 17:20
> > > >
> > > > To
> > > > sandesha-cvs@ws.apache.org
> > > > cc
> > > >
> > > > Subject
> > > > svn commit: r579708 - in
> > > >
> > >
> >
> /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
> > > > msgprocessors/ApplicationMsgProcessor.java
> storage/beans/RMSBean.java
> > > > workers/SenderWorker.java
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
> > > For additional commands, e-mail: sandesha-dev-help@ws.apache.org
> > >
> > >
> >
>
>
> --
>
>
>
>
>
> ============================
> Deepal Jayasinghe
> Lanka Software Foundation
>

Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java

Posted by Deepal Jayasinghe <de...@gmail.com>.
Well , I added code to terminate the sequence when we send CS and
receive non CS response. All the other cases I have ignored , if you
think we need to consider other one as well then I can add the code
for that.

Thanks
Deepal

On 9/28/07, Chamikara Jayalath <ch...@gmail.com> wrote:
> Hi Deepal,
>
> Did you assume that checkForSyncResponse() only checks for a
> CreateSequenceResponse messages.
> It's not the case. It can be anything ( an Ack, CSR, application response
> etc.)
>
> Chamikara
>
>
>
> On 9/27/07, Deepal jayasinghe <de...@gmail.com> wrote:
> >
> > Andrew K Gatford wrote:
> > > To check the fix, I'd like to understand what response is being returned
> > > when sending out a CreateSequence message that isn't a
> > > CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is
> > it
> > > something else ?
> > >
> > It was something else. It is like we send a CS req to a server which
> > does not have either addressing or rm support. In that case it will just
> > process the request and send an application response. So the reply was
> > neither a CS response nor a SOAF fault.
> >
> > Thanks
> > Deepal
> > > Andrew Gatford
> > >
> > > Hursley MP211
> > > Telephone :
> > > Internal (7) 245743
> > > External 01962 815743
> > > Internet : gatfora@uk.ibm.com
> > >
> > >
> > >
> > > deepal@apache.org
> > > 26/09/2007 17:20
> > >
> > > To
> > > sandesha-cvs@ws.apache.org
> > > cc
> > >
> > > Subject
> > > svn commit: r579708 - in
> > >
> >
> /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
> > > msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java
> > > workers/SenderWorker.java
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
> > For additional commands, e-mail: sandesha-dev-help@ws.apache.org
> >
> >
>


-- 





============================
Deepal Jayasinghe
Lanka Software Foundation

---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java

Posted by Chamikara Jayalath <ch...@gmail.com>.
Hi Deepal,

Did you assume that checkForSyncResponse() only checks for a
CreateSequenceResponse messages.
It's not the case. It can be anything ( an Ack, CSR, application response
etc.)

Chamikara



On 9/27/07, Deepal jayasinghe <de...@gmail.com> wrote:
>
> Andrew K Gatford wrote:
> > To check the fix, I'd like to understand what response is being returned
> > when sending out a CreateSequence message that isn't a
> > CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is
> it
> > something else ?
> >
> It was something else. It is like we send a CS req to a server which
> does not have either addressing or rm support. In that case it will just
> process the request and send an application response. So the reply was
> neither a CS response nor a SOAF fault.
>
> Thanks
> Deepal
> > Andrew Gatford
> >
> > Hursley MP211
> > Telephone :
> > Internal (7) 245743
> > External 01962 815743
> > Internet : gatfora@uk.ibm.com
> >
> >
> >
> > deepal@apache.org
> > 26/09/2007 17:20
> >
> > To
> > sandesha-cvs@ws.apache.org
> > cc
> >
> > Subject
> > svn commit: r579708 - in
> >
> /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:
> > msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java
> > workers/SenderWorker.java
> >
> >
> >
> >
> >
> >
> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
> For additional commands, e-mail: sandesha-dev-help@ws.apache.org
>
>

Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java

Posted by Deepal jayasinghe <de...@gmail.com>.
Andrew K Gatford wrote:
> To check the fix, I'd like to understand what response is being returned 
> when sending out a CreateSequence message that isn't a 
> CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is it 
> something else ?
>   
It was something else. It is like we send a CS req to a server which
does not have either addressing or rm support. In that case it will just
process the request and send an application response. So the reply was
neither a CS response nor a SOAF fault.

Thanks
Deepal
> Andrew Gatford
>
> Hursley MP211
> Telephone : 
> Internal (7) 245743 
> External 01962 815743
> Internet : gatfora@uk.ibm.com
>
>
>
> deepal@apache.org 
> 26/09/2007 17:20
>
> To
> sandesha-cvs@ws.apache.org
> cc
>
> Subject
> svn commit: r579708 - in 
> /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: 
> msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java 
> workers/SenderWorker.java
>
>
>
>
>
>
>   


---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java

Posted by Andrew K Gatford <GA...@uk.ibm.com>.
To check the fix, I'd like to understand what response is being returned 
when sending out a CreateSequence message that isn't a 
CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is it 
something else ?

Andrew Gatford

Hursley MP211
Telephone : 
Internal (7) 245743 
External 01962 815743
Internet : gatfora@uk.ibm.com



deepal@apache.org 
26/09/2007 17:20

To
sandesha-cvs@ws.apache.org
cc

Subject
svn commit: r579708 - in 
/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: 
msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java 
workers/SenderWorker.java






Author: deepal
Date: Wed Sep 26 09:20:24 2007
New Revision: 579708

URL: http://svn.apache.org/viewvc?rev=579708&view=rev
Log:
- Fixing issue when we send a CS request and receive non create sequence 
response , in that case we need to stop sending CS req and need to notify 
client abt that
- When the timeout happen it never going to notify to the client 

- This commit fix both of the above , Chamikara or someone who expert 
about the code base please validate the patch 

 

Modified:
 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff

==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java 
(original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java 
Wed Sep 26 09:20:24 2007
@@ -247,7 +247,6 @@
                                                                 // with 
the same internal sequenceid
                                                                 // Check 
that someone hasn't created the bean
                                                                 rmsBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
internalSequenceId);
- 
                                                                 // if 
first message - setup the sending side sequence - both for the
                                                                 // server 
and the client sides.
                                                                 if 
(rmsBean == null) {
@@ -363,7 +362,8 @@
                                 }
 
                                 // Update the rmsBean
- storageManager.getRMSBeanMgr().update(rmsBean);
+ rmsBean.setApplicationMessageMessageId(msgContext.getMessageID());
+        storageManager.getRMSBeanMgr().update(rmsBean);
 
                                 if(startPolling) {
 
SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), 
rmsBean);

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff

==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java 
(original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java 
Wed Sep 26 09:20:24 2007
@@ -144,7 +144,10 @@
                  */
                 private boolean avoidAutoTermination = false;
 
-                /**
+    //To store the message id if the outgoing appliction message
+    private String applicationMessageMessageId ;
+
+    /**
                  * Flags that are used to check if the primitive types on 
this bean
                  * have been set. If a primitive type has not been set 
then it will
                  * be ignored within the match method.
@@ -512,4 +515,12 @@
                                 return match;
                 }
 
+
+    public String getApplicationMessageMessageId() {
+        return applicationMessageMessageId;
+    }
+
+    public void setApplicationMessageMessageId(String 
applicationMessageMessageId) {
+        this.applicationMessageMessageId = applicationMessageMessageId;
+    }
 }

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff

==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java 
(original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java 
Wed Sep 26 09:20:24 2007
@@ -7,6 +7,9 @@
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.client.async.AxisCallback;
+import org.apache.axis2.util.CallbackReceiver;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
@@ -17,6 +20,7 @@
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.OutOnlyAxisOperation;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
 import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.TransportUtils;
@@ -45,13 +49,7 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.*;
 
 public class SenderWorker extends SandeshaWorker implements Runnable {
 
@@ -82,7 +80,7 @@
                                 try {
                                                 StorageManager 
storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext, 
configurationContext.getAxisConfiguration());
                                                 SenderBeanMgr 
senderBeanMgr = storageManager.getSenderBeanMgr();
- 
+
                                                 transaction = 
storageManager.getTransaction();
 
                                                 String key = 
senderBean.getMessageContextRefKey();
@@ -195,8 +193,8 @@
                                                 }
 
                                                 //if the message belong 
to the Replay Model, it will be send out only if 
- 
- 
+
+
                                                 boolean continueSending = 
updateMessage(rmMsgCtx,senderBean,storageManager);
                                                 //save changes done @ 
updateMessage -> MessageRetransmissionAdjuster.adjustRetransmittion
 storageManager.getSenderBeanMgr().update(senderBean);
@@ -210,7 +208,7 @@
  transaction.commit();
  transaction = null;
                                                                 }
- 
+ invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run, 
!continueSending");
                                                                 return;
                                                 }
 
@@ -236,7 +234,7 @@
  senderBeanMgr.update(bean2);
                                                                 }
                                                 }
- 
+
                                                 // have to commit the 
transaction before sending. This may
                                                 // get changed when WS-AT 
is available.
                                                 if(transaction != null) {
@@ -335,10 +333,15 @@
 
                                                 transaction = null;
 
-                                                if 
((processResponseForFaults || successfullySent) && !msgCtx.isServerSide()) 

- checkForSyncResponses(msgCtx);
- 
-                                                if 
((rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
+            if ((processResponseForFaults || successfullySent) && 
!msgCtx.isServerSide()) {
+                boolean  validCs = checkForSyncResponses(msgCtx );
+                if (!validCs) {
+                    invokeCallBackObject(storageManager,msgCtx ,
+                            "Sandesha2 sender thread has not received a 
valid CreateSequnceResponse");
+                }
+            }
+
+            if ((rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
  &&
  
(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) 
{
                                                                 try {
@@ -505,8 +508,13 @@
                                                 log.debug("Exit: 
SenderWorker::isAckPiggybackableMsgType, " + piggybackable);
                                 return piggybackable;
                 }
- 
-                private void checkForSyncResponses(MessageContext msgCtx) 
{
+
+    /**
+     * return value will be false if the create sequence fails else it 
will be true
+     * @param msgCtx
+     * @return
+     */
+    private boolean checkForSyncResponses(MessageContext msgCtx ) {
                                 if (log.isDebugEnabled())
                                                 log.debug("Enter: 
SenderWorker::checkForSyncResponses, " + 
msgCtx.getEnvelope().getHeader());
 
@@ -522,7 +530,7 @@
                                                 boolean 
transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) != 
null);
                                                 if (!transportInPresent 
&& (responseMessageContext==null || 
responseMessageContext.getEnvelope()==null)) {
 if(log.isDebugEnabled()) log.debug("Exit: 
SenderWorker::checkForSyncResponses, no response present");
-                                                                return;
+                                                                return 
true;
                                                 }
 
                                                 //to find out weather the 
response was built by me.
@@ -592,7 +600,7 @@
  log.error ("Caught exception", e);
  }
 
-  return;
+  return true;
                                                                 }
 
                                                                 //If 
addressing is disabled we will be adding this message simply as the 
application response of the request message.
@@ -631,7 +639,7 @@
                                                 //if the syncResponseWas 
not built here and the client was not expecting a sync response. We will 
not try to execute 
                                                 //here. Doing so will 
cause a double invocation for a async message. 
                                                 if 
(msgCtx.getOptions().isUseSeparateListener()==true &&  !syncResponseBuilt) 
{
-                                                                return;
+                                                                return 
true;
                                                 }
 
 
@@ -650,13 +658,22 @@
                                                 }
 
                                 } catch (Exception e) {
-                                                String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
-                                                if (log.isWarnEnabled())
- log.warn(message, e);
+
+            String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
+            if (msgCtx != null &&! msgCtx.isServerSide() &&
+ 
(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())
+                            || 
Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())) 
){
+                // We have not received a valid createSequnce reponse for 
the request we send so we need to terminate the seunce here
+                return false;
+            } else {
+                if (log.isWarnEnabled())
+                    log.warn(message, e);
+            }
                                 }
                                 if (log.isDebugEnabled())
                                                 log.debug("Exit: 
SenderWorker::checkForSyncResponses");
-                }
+        return true;
+    }
 
                 private void recordError (Exception e, RMMsgContext 
outRMMsg, StorageManager storageManager) throws SandeshaStorageException {
                                 // Store the Exception as a sequence 
property to enable the client to lookup the last 
@@ -702,5 +719,60 @@
                                                 }
                                 }
                 }
- 
+
+    private void invokeCallBackObject(StorageManager storageManager,
+                                      MessageContext msgCtx,
+                                      String message) throws 
SandeshaStorageException {
+        Transaction transaction = null;
+        if (msgCtx.isServerSide()) {
+            return;
+        }
+        try {
+            transaction = storageManager.getTransaction();
+            //terminate message sent using the SandeshaClient. Since the 
terminate message will simply get the
+            //InFlow of the reference message get called which could be 
zero sized (OutOnly operations).
+
+            // terminate sending side if this is the WSRM 1.0 spec.
+            // If the WSRM versoion is 1.1 termination will happen in the 
terminate sequence response message.
+
+            String internalSequenceId = (String) 
msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+            if (internalSequenceId == null) internalSequenceId = 
senderBean.getInternalSequenceID();
+            if (internalSequenceId != null) {
+                // Create a new Transaction
+                transaction = storageManager.getTransaction();
+                RMSBean bean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
internalSequenceId);
+                TerminateManager.terminateSendingSide(bean, 
storageManager);
+
+                OperationContext opCtx =
+ 
configurationContext.getOperationContext(bean.getApplicationMessageMessageId());
+                if (opCtx != null) {
+                    AxisOperation applicationAxisOperation = 
opCtx.getAxisOperation();
+                    if (applicationAxisOperation != null) {
+                        MessageReceiver msgReceiver = 
applicationAxisOperation.getMessageReceiver();
+                        if ((msgReceiver != null) && (msgReceiver 
instanceof CallbackReceiver)) {
+                            Object callback = ((CallbackReceiver) 
msgReceiver)
+ .lookupCallback(bean.getApplicationMessageMessageId());
+                            if (callback != null) {
+                                AxisCallback axisCallback = 
((AxisCallback) callback);
+                                axisCallback.onError(new 
Exception(message));
+                                axisCallback.onComplete();
+                            }
+                        }
+                    }
+                }
+                if (transaction != null && transaction.isActive()) 
transaction.commit();
+                transaction = null;
+            }
+
+        } catch (Exception e) {
+            if (log.isWarnEnabled())
+                log.warn(e);
+        } finally {
+            if (transaction != null && transaction.isActive()) {
+                transaction.rollback();
+                transaction = null;
+            }
+        }
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org








Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU







---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org