You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2008/07/30 21:20:58 UTC

svn commit: r681179 - /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java

Author: mckierna
Date: Wed Jul 30 12:20:57 2008
New Revision: 681179

URL: http://svn.apache.org/viewvc?rev=681179&view=rev
Log:
See https://issues.apache.org/jira/browse/SANDESHA2-171, Thanks Sara

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=681179&r1=681178&r2=681179&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Wed Jul 30 12:20:57 2008
@@ -36,6 +36,7 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.Sandesha2Constants.MessageTypes;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
@@ -68,10 +69,14 @@
 	boolean processedMessage = false;
 
 	long lastHousekeeping = 0;
+	
+	long lastRanCleanup = 0;
 
 	private static int HOUSEKEEPING_INTERVAL = 20000;
 
 	private ConcurrentHashMap ackMap = new ConcurrentHashMap();
+	
+	private ConcurrentHashMap warnedAlreadyOrphans = new ConcurrentHashMap();
 
 	private static class AckHolder {
 		public long tts = 0;
@@ -563,19 +568,57 @@
 					String message = null;
 					String internalSequenceID = bean.getInternalSequenceID();
 					String sequenceID = bean.getSequenceID();
-					if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)
-						message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);
-					else {
-						String messageType = Integer.toString(bean.getMessageType());
-						message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, messageType, sequenceID, internalSequenceID);
+				
+					if(!warnedAlreadyOrphans.containsKey(sequenceID)){ // we only want to do log.warn once per orphaned sequenceId
+					
+						if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)					
+							message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);				
+						else
+						{
+							String messageType = Integer.toString(bean.getMessageType());
+							message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, messageType, sequenceID, internalSequenceID);
+						}
+						warnedAlreadyOrphans.put(sequenceID, System.currentTimeMillis());
+						log.warn(message);
+ 					}
+ 				}
+				
+				// If client shuts down too quickly, termination messages get orphaned and this has an impact on performance.  
+				// Will delete these once they have been recognised as orphans.
+				int messageType = bean.getMessageType();
+				if(MessageTypes.TERMINATE_SEQ ==  messageType || MessageTypes.TERMINATE_SEQ_RESPONSE ==  messageType){
+					String id = bean.getSequenceID(); // get this again as it is an error case
+					if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages.  Orphaned message of type TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found.  Deleting this message with a sequence ID of : " + id);
+					manager.getSenderBeanMgr().delete(id);
+					
+					
+				} else {					
+					// Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
+					bean.setTimeToSend(System.currentTimeMillis());
+					manager.getSenderBeanMgr().update(bean);
+				}
+			
+				// clean up warnedAlreadyOrphans list when it gets big - currently over a thousand entries, or every10 minutes
+				// delete everything over an hour old
+				long currentTime = System.currentTimeMillis();
+				if(lastRanCleanup == 0){
+					lastRanCleanup = System.currentTimeMillis();
+				}
+				if( warnedAlreadyOrphans.size() > 1000 || currentTime > (lastRanCleanup + 600000)){
+					if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages.  Cleaning up list of orphans");
+					long timeAnHourAgo = currentTime - 3600000; 
+					Iterator it = warnedAlreadyOrphans.keySet().iterator();
+					while(it.hasNext()){
+						Object key = it.next();
+						long ageOfThisOrphan = ((Long)warnedAlreadyOrphans.get(key)).longValue();
+						if (ageOfThisOrphan < timeAnHourAgo) {
+							warnedAlreadyOrphans.remove(key);
+						}
+						
 					}
-					log.warn(message);
+					lastRanCleanup = System.currentTimeMillis();
 				}
 
-				// Update the bean so that we won't emit another message for
-				// another TRANSPORT_WAIT_TIME
-				bean.setTimeToSend(System.currentTimeMillis());
-				manager.getSenderBeanMgr().update(bean);
 			}
 
 			if (tran != null && tran.isActive())
@@ -595,4 +638,6 @@
 		if (log.isDebugEnabled())
 			log.debug("Exit: Sender::checkForOrphanMessages");
 	}
+
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org