You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2008/10/22 17:28:53 UTC

svn commit: r707102 - in /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/AcknowledgementProcessor.java util/FaultManager.java

Author: mckierna
Date: Wed Oct 22 08:28:52 2008
New Revision: 707102

URL: http://svn.apache.org/viewvc?rev=707102&view=rev
Log:
More refactoring to invalid ack processing

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=707102&r1=707101&r2=707102&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Wed Oct 22 08:28:52 2008
@@ -132,7 +132,8 @@
 		}
 		
 		EndpointReference replyTo = rmsBean.getReplyToEndpointReference();
-		boolean anonReplyTo = replyTo==null || replyTo.hasAnonymousAddress();
+		boolean anonReplyTo = replyTo==null || replyTo.isWSAddressingAnonymous(); //if this is wsa anonymous
+																				//then we might be using replay
 		
 		String rmVersion = rmMsgCtx.getRMSpecVersion();
 		
@@ -140,6 +141,11 @@
 		// is any new information in this ack message
 		RangeString completedMessages = rmsBean.getClientCompletedMessages();
 		long numberOfNewMessagesAcked = 0;
+		
+		boolean ackNeedsToSendInvalidFault = false; //if this ack includes a msg that we have not sent then
+													//we should try to send a fault back to the client
+		Range firstInvalidRange = null;				//If there is a single invalid range then we set it here.
+													//If there is more than one we report the first invalid range
 
 		while(ackRangeIterator.hasNext()) {
 			Range ackRange = (Range) ackRangeIterator.next();
@@ -162,35 +168,50 @@
 						if (retransmitterBean != null && retransmitterBean.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) {
 							// Check we haven't got an Ack for an application message that hasn't been sent yet !
 							if (retransmitterBean.getSentCount() == 0 ) {
-								FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, ackRange,
-										storageManager, piggybackedAck, null); //do not want to send the fault to acksTo in this case
-								if (log.isDebugEnabled())
-									log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack as message has not been sent");
-								return;
-							}
-							
-							String storageKey = retransmitterBean.getMessageContextRefKey();
-							
-							boolean syncResponseNeeded = false;
-							if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && anonReplyTo) {
-								MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
-								AxisOperation operation = applicationMessage.getAxisOperation();
-								if(operation!= null) {
-									int mep = operation.getAxisSpecificMEPConstant();
-									syncResponseNeeded = (mep == WSDLConstants.MEP_CONSTANT_OUT_IN);
+								//invalid ack range
+								if(!ackNeedsToSendInvalidFault){
+									ackNeedsToSendInvalidFault = true;
+									firstInvalidRange = newRanges[rangeIndex];
+									if (log.isDebugEnabled())
+										log.debug("unsent msg has been acked " + retransmitterBean);
 								}
 							}
+							else{
+								//delete the sender bean that has been validly acknowledged (unless
+								//we use replay model)
+								String storageKey = retransmitterBean.getMessageContextRefKey();
+								
+								boolean syncResponseNeeded = false;
+								if (anonReplyTo) {
+									MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
+									AxisOperation operation = applicationMessage.getAxisOperation();
+									if(operation!= null) {
+										int mep = operation.getAxisSpecificMEPConstant();
+										syncResponseNeeded = (mep == WSDLConstants.MEP_CONSTANT_OUT_IN);
+									}
+								}
+
 
-							if (!syncResponseNeeded) {
-								// removing the application message from the storage.
-								retransmitterMgr.delete(retransmitterBean.getMessageID());
-								storageManager.removeMessageContext(storageKey);
+								if (!syncResponseNeeded) {
+									// removing the application message from the storage if there is no replay model
+									retransmitterMgr.delete(retransmitterBean.getMessageID());
+									storageManager.removeMessageContext(storageKey);
+								}								
 							}
 						}
 					}//end for
 				}//end for
 			} //end while
 		}
+		
+		if(ackNeedsToSendInvalidFault){
+			//try to send an invalid ack
+			FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, firstInvalidRange,
+					storageManager, piggybackedAck, null); //do not want to send the fault to acksTo in this case
+			if (log.isDebugEnabled())
+				log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack as message has not been sent");
+			return;
+		}
 
 		// updating the last activated time of the sequence.
 		rmsBean.setLastActivatedTime(System.currentTimeMillis());
@@ -220,7 +241,8 @@
 
 		// Try and terminate the sequence
 		if (!rmsBean.isAvoidAutoTermination()) 
-			TerminateManager.checkAndTerminate(rmMsgCtx.getConfigurationContext(), storageManager, rmsBean);
+			TerminateManager.checkAndTerminate(rmMsgCtx.getConfigurationContext(), storageManager, rmsBean);			
+
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: AcknowledgementProcessor::processAckHeader");

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=707102&r1=707101&r2=707102&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Wed Oct 22 08:28:52 2008
@@ -269,6 +269,9 @@
 			data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
 		else
 			data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+		
+		if (log.isDebugEnabled())
+			log.debug("makingInvalidAck piggy=" + piggybackedMessage + ": soap=" + SOAPVersion);
 
 		data.setType(Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT);
 		data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMsgCtx.getRMNamespaceValue(), 
@@ -276,10 +279,7 @@
 		data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckFault));
 		
 		data.setDetail(sequenceAcknowledgement.getOriginalSequenceAckElement());
-							
-		if (log.isDebugEnabled())
-			log.debug("Exit: FaultManager::checkForInvalidAcknowledgement, invalid ACK");
-		
+									
 		boolean throwable = !piggybackedMessage;
 		getOrSendFault(rmMsgCtx, data, throwable, acksToEPR);
   }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org