You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2008/07/30 21:20:58 UTC
svn commit: r681179 -
/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
Author: mckierna
Date: Wed Jul 30 12:20:57 2008
New Revision: 681179
URL: http://svn.apache.org/viewvc?rev=681179&view=rev
Log:
See https://issues.apache.org/jira/browse/SANDESHA2-171, Thanks Sara
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=681179&r1=681178&r2=681179&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Wed Jul 30 12:20:57 2008
@@ -36,6 +36,7 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.Sandesha2Constants.MessageTypes;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
@@ -68,10 +69,14 @@
boolean processedMessage = false;
long lastHousekeeping = 0;
+
+ long lastRanCleanup = 0;
private static int HOUSEKEEPING_INTERVAL = 20000;
private ConcurrentHashMap ackMap = new ConcurrentHashMap();
+
+ private ConcurrentHashMap warnedAlreadyOrphans = new ConcurrentHashMap();
private static class AckHolder {
public long tts = 0;
@@ -563,19 +568,57 @@
String message = null;
String internalSequenceID = bean.getInternalSequenceID();
String sequenceID = bean.getSequenceID();
- if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)
- message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);
- else {
- String messageType = Integer.toString(bean.getMessageType());
- message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, messageType, sequenceID, internalSequenceID);
+
+ if(!warnedAlreadyOrphans.containsKey(sequenceID)){ // we only want to do log.warn once per orphaned sequenceId
+
+ if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)
+ message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);
+ else
+ {
+ String messageType = Integer.toString(bean.getMessageType());
+ message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, messageType, sequenceID, internalSequenceID);
+ }
+ warnedAlreadyOrphans.put(sequenceID, System.currentTimeMillis());
+ log.warn(message);
+ }
+ }
+
+ // If client shuts down too quickly, termination messages get orphaned and this has an impact on performance.
+ // Will delete these once they have been recognised as orphans.
+ int messageType = bean.getMessageType();
+ if(MessageTypes.TERMINATE_SEQ == messageType || MessageTypes.TERMINATE_SEQ_RESPONSE == messageType){
+ String id = bean.getSequenceID(); // get this again as it is an error case
+ if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages. Orphaned message of type TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found. Deleting this message with a sequence ID of : " + id);
+ manager.getSenderBeanMgr().delete(id);
+
+
+ } else {
+ // Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
+ bean.setTimeToSend(System.currentTimeMillis());
+ manager.getSenderBeanMgr().update(bean);
+ }
+
+ // clean up warnedAlreadyOrphans list when it gets big - currently over a thousand entries, or every10 minutes
+ // delete everything over an hour old
+ long currentTime = System.currentTimeMillis();
+ if(lastRanCleanup == 0){
+ lastRanCleanup = System.currentTimeMillis();
+ }
+ if( warnedAlreadyOrphans.size() > 1000 || currentTime > (lastRanCleanup + 600000)){
+ if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages. Cleaning up list of orphans");
+ long timeAnHourAgo = currentTime - 3600000;
+ Iterator it = warnedAlreadyOrphans.keySet().iterator();
+ while(it.hasNext()){
+ Object key = it.next();
+ long ageOfThisOrphan = ((Long)warnedAlreadyOrphans.get(key)).longValue();
+ if (ageOfThisOrphan < timeAnHourAgo) {
+ warnedAlreadyOrphans.remove(key);
+ }
+
}
- log.warn(message);
+ lastRanCleanup = System.currentTimeMillis();
}
- // Update the bean so that we won't emit another message for
- // another TRANSPORT_WAIT_TIME
- bean.setTimeToSend(System.currentTimeMillis());
- manager.getSenderBeanMgr().update(bean);
}
if (tran != null && tran.isActive())
@@ -595,4 +638,6 @@
if (log.isDebugEnabled())
log.debug("Exit: Sender::checkForOrphanMessages");
}
+
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org