You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by am...@apache.org on 2008/10/29 07:35:47 UTC
svn commit: r708804 - in
/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2:
msgprocessors/AcknowledgementProcessor.java workers/WorkerLock.java
Author: amilas
Date: Tue Oct 28 23:35:47 2008
New Revision: 708804
URL: http://svn.apache.org/viewvc?rev=708804&view=rev
Log:
apply the patch for SANDESHA2-179
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=708804&r1=708803&r2=708804&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Tue Oct 28 23:35:47 2008
@@ -103,15 +103,15 @@
log.debug("Exit: AcknowledgementProcessor::processAckHeader, Sequence bean not found");
return;
}
-
+
if (outSequenceId == null || outSequenceId.length()==0) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outSeqIDIsNull);
log.debug(message);
throw new SandeshaException(message);
}
- // If the message type is terminate sequence, then there may be a piggy backed ACK for a
+ // If the message type is terminate sequence, then there may be a piggy backed ACK for a
// sequence that has been terminated
- if (rmMsgCtx.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ &&
+ if (rmMsgCtx.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ &&
FaultManager.checkForSequenceTerminated(rmMsgCtx, outSequenceId, rmsBean, piggybackedAck)) {
if (log.isDebugEnabled())
log.debug("Exit: AcknowledgementProcessor::processAckHeader, Sequence terminated");
@@ -121,32 +121,44 @@
// Check that the sender of this Ack holds the correct token
String internalSequenceId = rmsBean.getInternalSequenceID();
SandeshaUtil.assertProofOfPossession(rmsBean, msgCtx, soapHeader);
-
+
if(log.isDebugEnabled()) log.debug("Got Ack for RM Sequence: " + outSequenceId + ", internalSeqId: " + internalSequenceId);
Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges().iterator();
-
+
if (FaultManager.checkForInvalidAcknowledgement(rmMsgCtx, sequenceAck, storageManager, rmsBean, piggybackedAck)) {
if (log.isDebugEnabled())
log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack range ");
return;
}
-
+
EndpointReference replyTo = rmsBean.getReplyToEndpointReference();
boolean anonReplyTo = replyTo==null || replyTo.isWSAddressingAnonymous(); //if this is wsa anonymous
//then we might be using replay
-
+
String rmVersion = rmMsgCtx.getRMSpecVersion();
-
+
// Compare the clientCompletedMessages with the range we just got, to work out if there
// is any new information in this ack message
RangeString completedMessages = rmsBean.getClientCompletedMessages();
long numberOfNewMessagesAcked = 0;
-
+
boolean ackNeedsToSendInvalidFault = false; //if this ack includes a msg that we have not sent then
//we should try to send a fault back to the client
Range firstInvalidRange = null; //If there is a single invalid range then we set it here.
//If there is more than one we report the first invalid range
+ //adding a MakeConnection for the response sequence if needed.
+ if (rmsBean.getOfferedSequence() != null) {
+
+ RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
+ RMDBean rMDBean = rMDBeanMgr.retrieve(outSequenceId);
+
+ if (rMDBean!=null && rMDBean.isPollingMode()) {
+ PollingManager manager = storageManager.getPollingManager();
+ if(manager != null) manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+ }
+ }
+
while(ackRangeIterator.hasNext()) {
Range ackRange = (Range) ackRangeIterator.next();
long lower = ackRange.lowerValue;
@@ -157,14 +169,14 @@
//we now know that this range is complete so we update it. This should aggregate the
//ranges together and tell us which numbers are newly acked
Range[] newRanges = completedMessages.addRange(ackedRange).getRanges();
-
+
// We now take each newly acked message in turn and see if we need to update a sender bean
for (int rangeIndex=0; rangeIndex < newRanges.length; rangeIndex++) {
//now work on each newly acked message in this range
for(long messageNo = newRanges[rangeIndex].lowerValue; messageNo<=newRanges[rangeIndex].upperValue; messageNo++){
-
+
numberOfNewMessagesAcked++;
- SenderBean retransmitterBean = retransmitterMgr.retrieve(outSequenceId, messageNo);
+ SenderBean retransmitterBean = retransmitterMgr.retrieve(outSequenceId, messageNo);
if (retransmitterBean != null && retransmitterBean.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) {
// Check we haven't got an Ack for an application message that hasn't been sent yet !
if (retransmitterBean.getSentCount() == 0 ) {
@@ -180,7 +192,7 @@
//delete the sender bean that has been validly acknowledged (unless
//we use replay model)
String storageKey = retransmitterBean.getMessageContextRefKey();
-
+
boolean syncResponseNeeded = false;
if (anonReplyTo) {
MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
@@ -196,14 +208,14 @@
// removing the application message from the storage if there is no replay model
retransmitterMgr.delete(retransmitterBean.getMessageID());
storageManager.removeMessageContext(storageKey);
- }
+ }
}
}
}//end for
}//end for
} //end while
}
-
+
if(ackNeedsToSendInvalidFault){
//try to send an invalid ack
FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, firstInvalidRange,
@@ -216,18 +228,6 @@
// updating the last activated time of the sequence.
rmsBean.setLastActivatedTime(System.currentTimeMillis());
- //adding a MakeConnection for the response sequence if needed.
- if (rmsBean.getOfferedSequence() != null) {
-
- RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
- RMDBean rMDBean = rMDBeanMgr.retrieve(outSequenceId);
-
- if (rMDBean!=null && rMDBean.isPollingMode()) {
- PollingManager manager = storageManager.getPollingManager();
- if(manager != null) manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
- }
- }
-
// We overwrite the previous client completed message ranges with the
// latest view, but only if it is an update i.e. contained a new
// ack range (which is because we do not previous acks arriving late
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java?rev=708804&r1=708803&r2=708804&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java Tue Oct 28 23:35:47 2008
@@ -82,7 +82,9 @@
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: WorkerLock::removeWork " + work);
Holder h = (Holder) locks.remove(work);
- h.release();
+ if (h != null){
+ h.release();
+ }
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: WorkerLock::removeWork");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org