You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ga...@apache.org on 2007/02/21 11:55:08 UTC
svn commit: r509965 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2:
msgprocessors/SequenceProcessor.java storage/beans/SenderBean.java
Author: gatfora
Date: Wed Feb 21 02:55:08 2007
New Revision: 509965
URL: http://svn.apache.org/viewvc?view=rev&rev=509965
Log:
Modify SequenceProcessor so that ACKS are sent to non anonymous acks-to addresses when processing a duplicate message
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=509965&r1=509964&r2=509965
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Wed Feb 21 02:55:08 2007
@@ -223,37 +223,32 @@
if((replyTo==null || replyTo.hasAnonymousAddress()) &&
(specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
- SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
- SenderBean findSenderBean = new SenderBean ();
- findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
- findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
- findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
- findSenderBean.setSend(true);
+ SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+ SenderBean findSenderBean = new SenderBean ();
+ findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
+ findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
+ findSenderBean.setSend(true);
- SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
+ SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
- // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
- // processor. This will use this thread to re-send the reply, writing it into the transport.
- // As the reply is now written we do not want to continue processing, or suspend, so we abort.
- if(replyMessageBean != null) {
- if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
- MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
+ // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
+ // processor. This will use this thread to re-send the reply, writing it into the transport.
+ // As the reply is now written we do not want to continue processing, or suspend, so we abort.
+ if(replyMessageBean != null) {
+ if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
+ MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
result = InvocationResponse.ABORT;
if (log.isDebugEnabled())
log.debug("Exit: SequenceProcessor::processReliableMessage, replayed message: " + result);
return result;
- }
- }
+ }
+ }
+
EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
- if (acksTo.hasAnonymousAddress()) {
- RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId, storageManager,false,true);
- msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
- AcknowledgementManager.sendAckNow(ackRMMsgContext);
- result = InvocationResponse.ABORT;
- if (log.isDebugEnabled())
- log.debug("Exit: SequenceProcessor::processReliableMessage, acking duplicate message: " + result);
- return result;
- }
+
+ // Send an Ack if needed.
+ sendAckIfNeeded(sequenceId, rmMsgCtx, storageManager, true, acksTo.hasAnonymousAddress());
result = InvocationResponse.ABORT;
if (log.isDebugEnabled())
@@ -383,59 +378,28 @@
}
- public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, StorageManager storageManager, boolean serverSide)
+ private static void sendAckIfNeeded(String sequenceId, RMMsgContext rmMsgCtx,
+ StorageManager storageManager, boolean serverSide, boolean anonymousAcksTo)
throws AxisFault {
if (log.isDebugEnabled())
- log.debug("Enter: SequenceProcessor::sendAckIfNeeded");
-
- Sequence sequence = (Sequence) rmMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
- if(sequence!=null){
- String sequenceId = sequence.getIdentifier().getIdentifier();
- ConfigurationContext configCtx = rmMsgCtx.getMessageContext()
- .getConfigurationContext();
- if (configCtx == null) {
- String message = SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.configContextNotSet);
- if (log.isDebugEnabled())
- log.debug(message);
- throw new SandeshaException(message);
- }
+ log.debug("Enter: SequenceProcessor::sendAckIfNeeded " + sequenceId);
RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
rmMsgCtx , sequenceId, storageManager,
false, serverSide);
- MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
-
- EndpointReference acksTo = ackRMMsgCtx.getTo();
- EndpointReference replyTo = rmMsgCtx.getReplyTo();
- boolean anonAck = (acksTo == null) || acksTo.hasAnonymousAddress();
- boolean anonReply = (replyTo == null) || replyTo.hasAnonymousAddress();
-
- // Only use the backchannel for ack messages if we are sure that the
- // application
- // doesn't need it. A 1-way MEP should be complete by now.
- boolean complete = ackMsgCtx.getOperationContext().isComplete();
- if (anonAck && anonReply && !complete) {
- if (log.isDebugEnabled())
- log
- .debug("Exit: SequenceProcessor::sendAckIfNeeded, avoiding using backchannel");
- return;
- }
-
- long ackInterval = SandeshaUtil.getPropertyBean(
- rmMsgCtx.getMessageContext().getAxisService())
- .getAcknowledgementInterval();
- long timeToSend = System.currentTimeMillis() + ackInterval;
- if (anonAck) {
+ if (anonymousAcksTo) {
+ rmMsgCtx.getMessageContext().getOperationContext().
+ setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
AcknowledgementManager.sendAckNow(ackRMMsgCtx);
- } else if (!anonAck) {
+ } else {
+ long ackInterval = SandeshaUtil.getPropertyBean(
+ rmMsgCtx.getMessageContext().getAxisService())
+ .getAcknowledgementInterval();
+ long timeToSend = System.currentTimeMillis() + ackInterval;
AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, storageManager);
}
- }
if (log.isDebugEnabled())
log.debug("Exit: SequenceProcessor::sendAckIfNeeded");
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=509965&r1=509964&r2=509965
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java Wed Feb 21 02:55:08 2007
@@ -268,6 +268,7 @@
result.append(this.getClass().getName());
result.append("\nSequence Id : "); result.append(sequenceID);
result.append("\nInternal Seq Id: "); result.append(internalSequenceID);
+ result.append("\nTo : "); result.append(toAddress);
result.append("\nMessage Number : "); result.append(messageNumber);
result.append("\nMessage Type : "); result.append(messageType);
result.append("\nMessage Key : "); result.append(messageContextRefKey);
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org