You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ga...@apache.org on 2007/02/21 11:55:08 UTC

svn commit: r509965 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: msgprocessors/SequenceProcessor.java storage/beans/SenderBean.java

Author: gatfora
Date: Wed Feb 21 02:55:08 2007
New Revision: 509965

URL: http://svn.apache.org/viewvc?view=rev&rev=509965
Log:
Modify SequenceProcessor so that ACKS are sent to non anonymous acks-to addresses when processing a duplicate message

Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=509965&r1=509964&r2=509965
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Wed Feb 21 02:55:08 2007
@@ -223,37 +223,32 @@
 			if((replyTo==null || replyTo.hasAnonymousAddress()) &&
 			   (specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
 
-			    SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-			    SenderBean findSenderBean = new SenderBean ();
-			    findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
-			    findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
-			    findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
-			    findSenderBean.setSend(true);
+			  SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+			  SenderBean findSenderBean = new SenderBean ();
+			  findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+			  findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
+			  findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
+			  findSenderBean.setSend(true);
 		
-			    SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
+			  SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
 			    
-			    // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
-			    // processor. This will use this thread to re-send the reply, writing it into the transport.
-			    // As the reply is now written we do not want to continue processing, or suspend, so we abort.
-			    if(replyMessageBean != null) {
-			    	if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
-			    	MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
+			  // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
+			  // processor. This will use this thread to re-send the reply, writing it into the transport.
+			  // As the reply is now written we do not want to continue processing, or suspend, so we abort.
+			  if(replyMessageBean != null) {
+			  	if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
+			   	MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
 					result = InvocationResponse.ABORT;
 					if (log.isDebugEnabled())
 						log.debug("Exit: SequenceProcessor::processReliableMessage, replayed message: " + result);
 					return result;
-			    }
-		    }
+			  }
+		  }
+			
 			EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
-			if (acksTo.hasAnonymousAddress()) {
-				RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId, storageManager,false,true);
-				msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
-				AcknowledgementManager.sendAckNow(ackRMMsgContext);
-				result = InvocationResponse.ABORT;
-				if (log.isDebugEnabled())
-					log.debug("Exit: SequenceProcessor::processReliableMessage, acking duplicate message: " + result);
-				return result;
-			}
+			
+			// Send an Ack if needed.
+			sendAckIfNeeded(sequenceId, rmMsgCtx, storageManager, true, acksTo.hasAnonymousAddress());			
 			
 			result = InvocationResponse.ABORT;
 			if (log.isDebugEnabled())
@@ -383,59 +378,28 @@
 	}
 
 
-	public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, StorageManager storageManager, boolean serverSide)
+	private static void sendAckIfNeeded(String sequenceId, RMMsgContext rmMsgCtx, 
+			StorageManager storageManager, boolean serverSide, boolean anonymousAcksTo)
 					throws AxisFault {
 
 		if (log.isDebugEnabled())
-			log.debug("Enter: SequenceProcessor::sendAckIfNeeded");
-		
-		Sequence sequence = (Sequence) rmMsgCtx
-				.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-		
-		if(sequence!=null){
-			String sequenceId = sequence.getIdentifier().getIdentifier();
-			ConfigurationContext configCtx = rmMsgCtx.getMessageContext()
-					.getConfigurationContext();
-			if (configCtx == null) {
-				String message = SandeshaMessageHelper
-						.getMessage(SandeshaMessageKeys.configContextNotSet);
-				if (log.isDebugEnabled())
-					log.debug(message);
-				throw new SandeshaException(message);
-			}
+			log.debug("Enter: SequenceProcessor::sendAckIfNeeded " + sequenceId);
 
 			RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
 					rmMsgCtx , sequenceId, storageManager,
 					false, serverSide);
-			MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
-
-			EndpointReference acksTo = ackRMMsgCtx.getTo();
-			EndpointReference replyTo = rmMsgCtx.getReplyTo();
-			boolean anonAck = (acksTo == null) || acksTo.hasAnonymousAddress();
-			boolean anonReply = (replyTo == null) || replyTo.hasAnonymousAddress();
-
-			// Only use the backchannel for ack messages if we are sure that the
-			// application
-			// doesn't need it. A 1-way MEP should be complete by now.
-			boolean complete = ackMsgCtx.getOperationContext().isComplete();
-			if (anonAck && anonReply && !complete) {
-				if (log.isDebugEnabled())
-					log
-							.debug("Exit: SequenceProcessor::sendAckIfNeeded, avoiding using backchannel");
-				return;
-			}
-
-			long ackInterval = SandeshaUtil.getPropertyBean(
-					rmMsgCtx.getMessageContext().getAxisService())
-					.getAcknowledgementInterval();
 
-			long timeToSend = System.currentTimeMillis() + ackInterval;
-			if (anonAck) {
+			if (anonymousAcksTo) {
+				rmMsgCtx.getMessageContext().getOperationContext().
+					setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
 				AcknowledgementManager.sendAckNow(ackRMMsgCtx);
-			} else if (!anonAck) {
+			} else {				
+				long ackInterval = SandeshaUtil.getPropertyBean(
+						rmMsgCtx.getMessageContext().getAxisService())
+						.getAcknowledgementInterval();
+				long timeToSend = System.currentTimeMillis() + ackInterval;
 				AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, storageManager);
 			}			
-		}
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: SequenceProcessor::sendAckIfNeeded");

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=509965&r1=509964&r2=509965
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java Wed Feb 21 02:55:08 2007
@@ -268,6 +268,7 @@
 		result.append(this.getClass().getName());
 		result.append("\nSequence Id    : "); result.append(sequenceID);
 		result.append("\nInternal Seq Id: "); result.append(internalSequenceID);
+		result.append("\nTo             : "); result.append(toAddress);
 		result.append("\nMessage Number : "); result.append(messageNumber);
 		result.append("\nMessage Type   : "); result.append(messageType);
 		result.append("\nMessage Key    : "); result.append(messageContextRefKey);



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org