You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2008/06/24 10:22:55 UTC

svn commit: r671059 - in /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2: util/AcknowledgementManager.java util/SequenceManager.java workers/Sender.java

Author: gatfora
Date: Tue Jun 24 01:22:54 2008
New Revision: 671059

URL: http://svn.apache.org/viewvc?rev=671059&view=rev
Log:
Applying patches from SANDESHA2-164, thanks David and Sara

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java Tue Jun 24 01:22:54 2008
@@ -42,6 +42,7 @@
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.workers.Sender;
 
 /**
  * Contains logic for managing acknowledgements.
@@ -58,96 +59,102 @@
 	 * @param applicationRMMsgContext
 	 * @throws SandeshaException
 	 */
-	public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager)
-			throws SandeshaException {
+	public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager) throws SandeshaException {
 		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
-		
+
 		SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
 
-		// If this message is going to an anonymous address, and the inbound sequence has
+		// If this message is going to an anonymous address, and the inbound
+		// sequence has
 		// anonymous acksTo, then we add in an ack for the inbound sequence.
 		EndpointReference target = rmMessageContext.getTo();
-		if(target == null || target.hasAnonymousAddress()) {
-			// We have no good indicator of the identity of the destination, so the only sequence
-			// we can ack is the inbound one that caused us to create this response.
+		if (target == null || target.hasAnonymousAddress()) {
+			// We have no good indicator of the identity of the destination, so
+			// the only sequence
+			// we can ack is the inbound one that caused us to create this
+			// response.
 			String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
-			if(inboundSequence != null) {
+			if (inboundSequence != null) {
 				RMDBean inboundBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
-				if(inboundBean != null && !inboundBean.isTerminated()) {
+				if (inboundBean != null && !inboundBean.isTerminated()) {
 					EndpointReference acksToEPR = inboundBean.getAcksToEndpointReference();
 
-					if(acksToEPR == null || acksToEPR.hasAnonymousAddress()) {
-						if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
+					if (acksToEPR == null || acksToEPR.hasAnonymousAddress()) {
+						if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+							log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
 						RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, inboundBean, false);
 					}
 				}
 			}
-			if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
+			if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+				log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
 			return;
-		}
-		else{
-			//an addressable EPR
-			if(SandeshaUtil.hasReferenceParameters(target)){
-				//we should not proceed since we cannot properly compare ref params
-				if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
+		} else {
+			// an addressable EPR
+			if (SandeshaUtil.hasReferenceParameters(target)) {
+				// we should not proceed since we cannot properly compare ref
+				// params
+				if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+					log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
 				return;
 			}
-			
-		    // From here on, we must be dealing with a real address. Piggyback all sequences that have an
-		    // acksTo that matches the To address, and that have an ackMessage queued up for sending. We
-		    // search for RMDBeans first, to avoid a deadlock.
-		    //
-		    // As a special case, if this is a terminate sequence message then add in ack messages for
-		    // any sequences that have an acksTo that matches the target address. This helps to ensure
-		    // that request-response sequence pairs end cleanly.
-		    RMDBean findRMDBean = new RMDBean();
-		    findRMDBean.setAcksToEndpointReference(target);
-		    findRMDBean.setTerminated(false);
-		    Collection rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
-		    Iterator sequences = rmdBeans.iterator();
-		    while(sequences.hasNext()) {
-		      RMDBean sequence = (RMDBean) sequences.next();
-		      if(SandeshaUtil.hasReferenceParameters(sequence.getAcksToEndpointReference())){
-		    	  //we should not piggy back if there are reference parameters in the acksTo EPR since we cannot compare them
-		    	  if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
-		    	  break;
-		      }
-					
-		      String sequenceId = sequence.getSequenceID();
-		      
-		      // Look for the SenderBean that carries the ack, there should be at most one
-		      SenderBean findBean = new SenderBean();
-		      findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-		      findBean.setSend(true);
-		      findBean.setSequenceID(sequenceId);
-		      findBean.setToAddress(target.getAddress());
-		      
-		      SenderBean ackBean = retransmitterBeanMgr.findUnique(findBean);
-		      
-				// Piggybacking will happen only if the end of ack interval (timeToSend) is not reached.
-				long timeNow = System.currentTimeMillis();
-			    if (ackBean != null && ackBean.getTimeToSend() > timeNow) {
-					// Delete the beans that would have sent the ack
-					retransmitterBeanMgr.delete(ackBean.getMessageID());
-					storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
-
-				if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
-		        RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false);
-
-
-			    } else if(rmMessageContext.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-			        if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Adding extra acks, as this is a terminate");
-			          
-			        if(sequence.getHighestInMessageNumber() > 0) {
-						  if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
 
-					RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false);
+			String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
+			// If there's an inbound sequence (i.e. we're provider side) we'll
+			// use that, otherwise
+			// we'll go to the expense of looking the sequence up by the acksTo
+			// address.
+			if (inboundSequence != null) {
+				// We used to look for an ack sender bean before piggybacking an
+				// ack, but in the high-througput
+				// scenarios there always was one, and in the low thoughput
+				// scenarios it's less of an issue if
+				// we piggyback when we don't have to. so for now, lets mimic
+				// the old high-throughout behaviour
+				// in a cheap way by always piggybacking.
+				if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+					log.debug("Piggybacking ack for sequence: " + inboundSequence);
+				RMDBean sequence = storageManager.getRMDBeanMgr().retrieve(inboundSequence);
+				RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, sequence, false);
+				((Sender) storageManager.getSender()).removeScheduledAcknowledgement(inboundSequence);
+			} else {
+				RMDBean findRMDBean = new RMDBean();
+				findRMDBean.setAcksToEndpointReference(target);
+				findRMDBean.setTerminated(false);
+				Collection rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
+				Iterator sequences = rmdBeans.iterator();
+				while (sequences.hasNext()) {
+					RMDBean sequence = (RMDBean) sequences.next();
+					if (SandeshaUtil.hasReferenceParameters(sequence.getAcksToEndpointReference())) {
+						// we should not piggy back if there are reference
+						// parameters in the acksTo EPR since we cannot compare
+						// them
+						if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+							log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
+						break;
 					}
+
+					String sequenceId = sequence.getSequenceID();
+
+					// We used to look for an ack sender bean before
+					// piggybacking an ack, but in the high-througput
+					// scenarios there always was one, and in the low thoughput
+					// scenarios it's less of an issue if
+					// we piggyback when we don't have to. so for now, lets
+					// mimic the old high-throughout behaviour
+					// in a cheap way by always piggybacking.
+					if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+						log.debug("Piggybacking ack for sequence: " + sequenceId);
+
+					RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false);
+
+					((Sender) storageManager.getSender()).removeScheduledAcknowledgement(sequenceId);
+
 				}
 			}
 		}
-		
+
 		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
 			log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
 		return;
@@ -159,20 +166,18 @@
 	 * @param sequencePropertyKey
 	 * @param sequenceId
 	 * @param storageManager
-	 * @param makeResponse Some work will be done to make the new ack message the response of the reference message.
+	 * @param makeResponse
+	 *            Some work will be done to make the new ack message the
+	 *            response of the reference message.
 	 * @return
 	 * @throws AxisFault
 	 */
 	public static RMMsgContext generateAckMessage(
-			
-			RMMsgContext referenceRMMessage,
-			RMDBean rmdBean,
-			String sequenceId,
-			StorageManager storageManager, 
-			boolean serverSide
-			
-			) throws AxisFault {
-		
+
+	RMMsgContext referenceRMMessage, RMDBean rmdBean, String sequenceId, StorageManager storageManager, boolean serverSide
+
+	) throws AxisFault {
+
 		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementManager::generateAckMessage " + rmdBean);
 
@@ -180,16 +185,14 @@
 
 		EndpointReference acksTo = rmdBean.getAcksToEndpointReference();
 
-		if (acksTo==null || acksTo.getAddress() == null)
+		if (acksTo == null || acksTo.getAddress() == null)
 			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
 
-		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
-				Sandesha2Constants.MessageTypes.ACK,
-				rmdBean.getRMVersion(),
-				referenceMsg.getAxisService());
+		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.ACK, rmdBean.getRMVersion(), referenceMsg
+				.getAxisService());
 
 		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
-		
+
 		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 
 		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
@@ -198,8 +201,7 @@
 
 		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
 
-		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
-				.getSOAPVersion(referenceMsg.getEnvelope()));
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(referenceMsg.getEnvelope()));
 
 		// Setting new envelope
 		SOAPEnvelope envelope = factory.getDefaultEnvelope();
@@ -207,7 +209,7 @@
 		ackMsgCtx.setEnvelope(envelope);
 
 		ackMsgCtx.setTo(acksTo);
-		
+
 		ackMsgCtx.setServerSide(serverSide);
 
 		// adding the SequenceAcknowledgement part.
@@ -218,16 +220,13 @@
 		return ackRMMsgCtx;
 	}
 
-	
-	
-
 	public static boolean verifySequenceCompletion(RangeString ackRanges, long lastMessageNo) {
 		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
 
 		boolean result = false;
 		Range complete = new Range(1, lastMessageNo);
-		if(ackRanges.isRangeCompleted(complete)) {
+		if (ackRanges.isRangeCompleted(complete)) {
 			result = true;
 		}
 
@@ -235,17 +234,14 @@
 			log.debug("Exit: AcknowledgementManager::verifySequenceCompletion " + result);
 		return result;
 	}
-	
-	public static void addAckBeanEntry (
-			RMMsgContext ackRMMsgContext,
-			String sequenceId, 
-			long timeToSend,
-			StorageManager storageManager) throws AxisFault {
-		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: AcknowledgementManager::addAckBeanEntry");
+
+	public static void addAckBeanEntry(RMMsgContext ackRMMsgContext, String sequenceId, long timeToSend, StorageManager storageManager) throws AxisFault {
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::addAckBeanEntry");
 
 		// Write the acks into the envelope
 		ackRMMsgContext.addSOAPEnvelope();
-		
+
 		MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
 
 		SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
@@ -258,7 +254,7 @@
 		ackBean.setReSend(false);
 		ackBean.setSequenceID(sequenceId);
 		EndpointReference to = ackMsgContext.getTo();
-		if (to!=null)
+		if (to != null)
 			ackBean.setToAddress(to.getAddress());
 
 		ackBean.setSend(true);
@@ -275,9 +271,9 @@
 		Collection coll = retransmitterBeanMgr.find(findBean);
 		Iterator it = coll.iterator();
 
-		while(it.hasNext()) {
+		while (it.hasNext()) {
 			SenderBean oldAckBean = (SenderBean) it.next();
-			if(oldAckBean.getTimeToSend() < timeToSend)
+			if (oldAckBean.getTimeToSend() < timeToSend)
 				timeToSend = oldAckBean.getTimeToSend();
 
 			// removing the retransmitted entry for the oldAck
@@ -290,42 +286,44 @@
 		ackBean.setTimeToSend(timeToSend);
 
 		ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-		
+
 		// passing the message through sandesha2sender
 		ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
-		
-	    SandeshaUtil.executeAndStore(ackRMMsgContext, key, storageManager);
+
+		SandeshaUtil.executeAndStore(ackRMMsgContext, key, storageManager);
 
 		// inserting the new ack.
 		retransmitterBeanMgr.insert(ackBean);
 
-		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::addAckBeanEntry");
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::addAckBeanEntry");
 	}
-	
-	public static void sendAckNow (RMMsgContext ackRMMsgContext) throws AxisFault {
+
+	public static void sendAckNow(RMMsgContext ackRMMsgContext) throws AxisFault {
 		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementManager::sendAckNow");
 
 		// Write the acks into the envelope
 		ackRMMsgContext.addSOAPEnvelope();
-		
+
 		MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
-		
+
 		// setting CONTEXT_WRITTEN since acksto is anonymous
 		if (ackRMMsgContext.getMessageContext().getOperationContext() == null) {
 			// operation context will be null when doing in a GLOBAL
 			// handler.
 			AxisOperation op = ackMsgContext.getAxisOperation();
 
-			OperationContext opCtx = OperationContextFactory.createOperationContext(op.getAxisSpecificMEPConstant(), op, ackRMMsgContext.getMessageContext().getServiceContext());
+			OperationContext opCtx = OperationContextFactory.createOperationContext(op.getAxisSpecificMEPConstant(), op, ackRMMsgContext.getMessageContext()
+					.getServiceContext());
 			ackRMMsgContext.getMessageContext().setOperationContext(opCtx);
 		}
 
 		ackRMMsgContext.getMessageContext().setServerSide(true);
-		
+
 		AxisEngine.send(ackMsgContext);
-		
+
 		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementManager::sendAckNow");		
-	}	
+			log.debug("Exit: AcknowledgementManager::sendAckNow");
+	}
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java Tue Jun 24 01:22:54 2008
@@ -19,10 +19,15 @@
 
 package org.apache.sandesha2.util;
 
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMFactory;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.Parameter;
@@ -204,10 +209,11 @@
 			// Server side, we want the replyTo and AcksTo EPRs to point into this server.
 			// We can work that out by looking at the RMD bean that pulled the message in,
 			// and copying its 'ReplyTo' address.
-			if(inboundBean != null && inboundBean.getReplyToEndpointReference() != null) {
-				acksToEPR = inboundBean.getReplyToEndpointReference();
-				replyToEPR = inboundBean.getReplyToEndpointReference();
-			} else {
+			EndpointReference strippedReplyToEpr = stripAddress(inboundBean.getReplyToEndpointReference());
+			if(inboundBean != null && strippedReplyToEpr != null) {
+				acksToEPR = strippedReplyToEpr;
+				replyToEPR = strippedReplyToEpr;
+		} else {
 				String beanInfo = (inboundBean == null) ? "null" : inboundBean.toString();
 				String message = SandeshaMessageHelper.getMessage(
 						SandeshaMessageKeys.cannotChooseAcksTo, inboundSequence, beanInfo);
@@ -384,4 +390,40 @@
 
 		return specVersion;
 	}
+	
+	/* becuase RM reuses the incoming EPRs.  Need to use only the address.
+	 */
+	private static EndpointReference stripAddress(EndpointReference eprIn){
+		if(log.isDebugEnabled()) log.debug("stripAddress from EndpointReference : " + eprIn);
+		EndpointReference epr = new EndpointReference(eprIn.getAddress());
+		return epr;
+/**		
+		String schemaNs = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd";
+		String securityNs = "http://schemas.xmlsoap.org/ws/2003/06/utility";
+		
+		Iterator it = eprOut.getAttributes().iterator();
+		while(it.hasNext()){
+			OMAttribute attribute = (OMAttribute)it.next();
+			String ns = attribute.getNamespace().getNamespaceURI();
+			String name = attribute.getLocalName();
+			if((schemaNs.equals(ns) || securityNs.equals(ns)) && "Id".equals(name)){
+				//delete attribute
+				it.remove();
+			} 
+		}
+		Iterator it2 = eprOut.getAddressAttributes().iterator();
+		while(it2.hasNext()){
+			OMAttribute attribute = (OMAttribute)it2.next();
+			String ns = attribute.getNamespace().getNamespaceURI();
+			String name = attribute.getLocalName();
+			if((schemaNs.equals(ns) || securityNs.equals(ns)) && "Id".equals(name)){
+				//delete attribute
+				it2.remove();
+			} 
+		}
+		
+		return eprOut;
+		**/
+}
+
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Tue Jun 24 01:22:54 2008
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.MessageContext;
@@ -63,17 +64,39 @@
 	// If this sender is working for several sequences, we use round-robin to
 	// try and give them all a chance to invoke messages.
 	int nextIndex = 0;
+
 	boolean processedMessage = false;
+
 	long lastHousekeeping = 0;
-	
+
 	private static int HOUSEKEEPING_INTERVAL = 20000;
-	
-	public Sender () {
+
+	private ConcurrentHashMap ackMap = new ConcurrentHashMap();
+
+	private static class AckHolder {
+		public long tts = 0;
+
+		public RMMsgContext refMsg;
+	}
+
+	public Sender() {
 		super(Sandesha2Constants.SENDER_SLEEP_TIME);
 	}
 
+	public void scheduleAddressableAcknowledgement(String sequenceId, long ackInterval, RMMsgContext ref) {
+		AckHolder ackH = new AckHolder();
+		ackH.tts = System.currentTimeMillis() + ackInterval;
+		ackH.refMsg = ref;
+		ackMap.putIfAbsent(sequenceId, ackH);
+	}
+
+	public void removeScheduledAcknowledgement(String sequenceId) {
+		ackMap.remove(sequenceId);
+	}
+
 	protected boolean internalRun() {
-		if (log.isDebugEnabled()) log.debug("Enter: Sender::internalRun");
+		if (log.isDebugEnabled())
+			log.debug("Enter: Sender::internalRun");
 
 		Transaction transaction = null;
 		boolean sleep = false;
@@ -85,33 +108,38 @@
 
 			if (log.isDebugEnabled())
 				log.debug("Choosing one from " + size + " sequences");
-			if(nextIndex >= size) {
+			if (nextIndex >= size) {
 				nextIndex = 0;
 
-				// We just looped over the set of sequences. If we didn't process any
+				// We just looped over the set of sequences. If we didn't
+				// process any
 				// messages on this loop then we sleep before the next one
-				if(size == 0 || !processedMessage) {
+				if (size == 0 || !processedMessage) {
 					sleep = true;
 				}
 				processedMessage = false;
-				
-				if(System.currentTimeMillis()-lastHousekeeping > HOUSEKEEPING_INTERVAL){
-					// At this point - delete any sequences that have timed out, or been terminated.
+
+				if (System.currentTimeMillis() - lastHousekeeping > HOUSEKEEPING_INTERVAL) {
+					// At this point - delete any sequences that have timed out,
+					// or been terminated.
 					deleteTerminatedSequences(storageManager);
 
-					// Also clean up and sender beans that are not yet eligible for sending, but
+					// Also clean up and sender beans that are not yet eligible
+					// for sending, but
 					// are blocking the transport threads.
 					unblockTransportThreads(storageManager);
 
-					// Finally, check for messages that can only be serviced by polling, and warn
+					// Finally, check for messages that can only be serviced by
+					// polling, and warn
 					// the user if they are too old
 					checkForOrphanMessages(storageManager);
 					lastHousekeeping = System.currentTimeMillis();
 				}
-				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
+				if (log.isDebugEnabled())
+					log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
 				return sleep;
 			}
-			
+
 			transaction = storageManager.getTransaction();
 
 			SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
@@ -122,62 +150,78 @@
 			String rmVersion = null;
 			// Check that the sequence is still valid
 			boolean found = false;
-			if(entry.isRmSource()) {
+			if (entry.isRmSource()) {
 				RMSBean matcher = new RMSBean();
 				matcher.setInternalSequenceID(sequenceId);
 				matcher.setTerminated(false);
 				RMSBean rms = storageManager.getRMSBeanMgr().findUnique(matcher);
-				if(rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
-					sequenceId = rms.getSequenceID();					
-					if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))					
+				if (rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
+					sequenceId = rms.getSequenceID();
+					if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))
 						SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, storageManager);
 					else
 						found = true;
 					rmVersion = rms.getRMVersion();
 				}
-				
+
 			} else {
 				RMDBean matcher = new RMDBean();
 				matcher.setSequenceID(sequenceId);
 				matcher.setTerminated(false);
 				RMDBean rmd = storageManager.getRMDBeanMgr().findUnique(matcher);
-				if(rmd != null) {
+				if (rmd != null) {
 					found = true;
 					rmVersion = rmd.getRMVersion();
 				}
 			}
 			if (!found) {
 				stopThreadForSequence(sequenceId, entry.isRmSource());
-				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, sequence has ended");
-				
-				if(transaction != null && transaction.isActive()) {
+				if (log.isDebugEnabled())
+					log.debug("Exit: Sender::internalRun, sequence has ended");
+
+				if (transaction != null && transaction.isActive()) {
 					transaction.commit();
 					transaction = null;
 				}
-				
+
 				return false;
 			}
-			
+			if (sequenceId != null) {
+				AckHolder acktts = (AckHolder) ackMap.get(sequenceId);
+				if (acktts != null && acktts.tts < System.currentTimeMillis()) {
+					ackMap.remove(sequenceId);
+					RMDBean rmd = storageManager.getRMDBeanMgr().retrieve(sequenceId);
+					if (rmd != null) {
+						RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(acktts.refMsg, rmd, sequenceId, storageManager, true);
+
+						AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, acktts.tts, storageManager);
+						transaction.commit();
+						transaction = storageManager.getTransaction();
+					}
+
+				}
+			}
 			SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
 			SenderBean senderBean = mgr.getNextMsgToSend(sequenceId);
-			
+
 			if (senderBean == null) {
-				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, no message for this sequence");
-				
-				if(transaction != null && transaction.isActive()) {
+				if (log.isDebugEnabled())
+					log.debug("Exit: Sender::internalRun, no message for this sequence");
+
+				if (transaction != null && transaction.isActive()) {
 					transaction.commit();
 					transaction = null;
 				}
-				
+
 				return false; // Move on to the next sequence in the list
 			}
 
 			// work Id is used to define the piece of work that will be
 			// assigned to the Worker thread,
 			// to handle this Sender bean.
-			
-			//workId contains a timeTiSend part to cater for retransmissions.
-			//This will cause retransmissions to be treated as new work.
+
+			// workId contains a timeTiSend part to cater for retransmissions.
+			// This will cause retransmissions to be treated as new work.
 			String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
 
 			// check weather the bean is already assigned to a worker.
@@ -185,51 +229,51 @@
 				// As there is already a worker running we are probably looping
 				// too fast, so sleep on the next loop.
 				if (log.isDebugEnabled()) {
-					String message = SandeshaMessageHelper.getMessage(
-									SandeshaMessageKeys.workAlreadyAssigned,
-									workId);
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
 					log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
 				}
-				
-				if(transaction != null && transaction.isActive()) {
+
+				if (transaction != null && transaction.isActive()) {
 					transaction.commit();
 					transaction = null;
 				}
-				
+
 				return true;
 			}
 
-			//commiting the transaction here to release resources early.
-			if(transaction != null && transaction.isActive()) transaction.commit();
+			// commiting the transaction here to release resources early.
+			if (transaction != null && transaction.isActive())
+				transaction.commit();
 			transaction = null;
 
 			// start a worker which will work on this messages.
 			SenderWorker worker = new SenderWorker(context, senderBean, rmVersion);
 			worker.setLock(getWorkerLock());
 			worker.setWorkId(workId);
-			
+
 			try {
 				// Set the lock up before we start the thread, but roll it back
 				// if we hit any problems
 				getWorkerLock().addWork(workId, worker);
 				threadPool.execute(worker);
-			} catch(Exception e) {
+			} catch (Exception e) {
 				getWorkerLock().removeWork(workId);
-			}			
+			}
 
-			// If we got to here then we found work to do on the sequence, so we should
+			// If we got to here then we found work to do on the sequence, so we
+			// should
 			// remember not to sleep at the end of the list of sequences.
 			processedMessage = true;
-			
+
 		} catch (Exception e) {
 
 			// TODO : when this is the client side throw the exception to
 			// the client when necessary.
 
-			
-			//TODO rollback only if a SandeshaStorageException.
-			//This allows the other Exceptions to be used within the Normal flow.
-			
+			// TODO rollback only if a SandeshaStorageException.
+			// This allows the other Exceptions to be used within the Normal
+			// flow.
+
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
 			log.debug(message, e);
 		} finally {
@@ -238,111 +282,113 @@
 					transaction.rollback();
 					transaction = null;
 				} catch (Exception e) {
-					String message = SandeshaMessageHelper
-							.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
 					log.debug(message, e);
 				}
 			}
 		}
-		if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, not sleeping");
+		if (log.isDebugEnabled())
+			log.debug("Exit: Sender::internalRun, not sleeping");
 		return false;
 	}
 
 	/**
-	 * Finds any RMDBeans that have not been used inside the set InnactivityTimeoutInterval
+	 * Finds any RMDBeans that have not been used inside the set
+	 * InnactivityTimeoutInterval
+	 * 
+	 * Iterates through RMSBeans and RMDBeans that have been terminated or timed
+	 * out and deletes them.
 	 * 
-	 * Iterates through RMSBeans and RMDBeans that have been terminated or timed out and 
-	 * deletes them.
-	 *
 	 */
 	private void deleteTerminatedSequences(StorageManager storageManager) {
-		if (log.isDebugEnabled()) 
+		if (log.isDebugEnabled())
 			log.debug("Enter: Sender::deleteTerminatedSequences");
 
 		RMSBean finderBean = new RMSBean();
 		finderBean.setTerminated(true);
-		
+
 		Transaction transaction = null;
-		
+
 		try {
 			transaction = storageManager.getTransaction();
-			
-			SandeshaPolicyBean propertyBean = 
-				SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());			
-
-    	long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
-    	if (deleteTime < 0)
-    		deleteTime = 0;
 
-    	if (deleteTime > 0) {
+			SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
+
+			long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
+			if (deleteTime < 0)
+				deleteTime = 0;
+
+			if (deleteTime > 0) {
 				// Find terminated sequences.
-		    List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
-		    
-		    deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
-		    
-		    finderBean.setTerminated(false);
-		    finderBean.setTimedOut(true);
-		    
-		    // Find timed out sequences
-		    rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
-		    	    
-		    deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
-		    
-		    // Remove any terminated RMDBeans.
-		    RMDBean finderRMDBean = new RMDBean();
-		    finderRMDBean.setTerminated(true);
-		    
-		    List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
-	
-		    Iterator beans = rmdBeans.iterator();
-		    while (beans.hasNext()) {
-		    	RMDBean rmdBean = (RMDBean)beans.next();
-		    	
-		    	long timeNow = System.currentTimeMillis();
-		    	long lastActivated = rmdBean.getLastActivatedTime();
-	
-		    	// delete sequences that have been timedout or deleted for more than 
-		    	// the SequenceRemovalTimeoutInterval
-		    	if ((lastActivated + deleteTime) < timeNow) {
-		    		if (log.isDebugEnabled())
-		    			log.debug("Deleting RMDBean " + deleteTime + " : " + rmdBean);
-		    		storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
-		    	}	    		    	
-		    }
-    	}
+				List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+
+				deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+
+				finderBean.setTerminated(false);
+				finderBean.setTimedOut(true);
+
+				// Find timed out sequences
+				rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+
+				deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+
+				// Remove any terminated RMDBeans.
+				RMDBean finderRMDBean = new RMDBean();
+				finderRMDBean.setTerminated(true);
+
+				List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+				Iterator beans = rmdBeans.iterator();
+				while (beans.hasNext()) {
+					RMDBean rmdBean = (RMDBean) beans.next();
 
-	    // Terminate RMD Sequences that have been inactive.			
+					long timeNow = System.currentTimeMillis();
+					long lastActivated = rmdBean.getLastActivatedTime();
+
+					// delete sequences that have been timedout or deleted for
+					// more than
+					// the SequenceRemovalTimeoutInterval
+					if ((lastActivated + deleteTime) < timeNow) {
+						if (log.isDebugEnabled())
+							log.debug("Deleting RMDBean " + deleteTime + " : " + rmdBean);
+						storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
+					}
+				}
+			}
+
+			// Terminate RMD Sequences that have been inactive.
 			if (propertyBean.getInactivityTimeoutInterval() > 0) {
-		    RMDBean finderRMDBean = new RMDBean();
-		    finderRMDBean.setTerminated(false);
-				
-		    List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
-			
-		    Iterator beans = rmdBeans.iterator();
-		    while (beans.hasNext()) {
-		    	RMDBean rmdBean = (RMDBean)beans.next();
-		    	
-		    	long timeNow = System.currentTimeMillis();
-		    	long lastActivated = rmdBean.getLastActivatedTime();
-		    	
-		    	if ((lastActivated + propertyBean.getInactivityTimeoutInterval()) < timeNow) {
-		    		// Terminate
-		    		rmdBean.setTerminated(true);
-		    		rmdBean.setLastActivatedTime(timeNow);
-		    		if (log.isDebugEnabled())
-		    			log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + rmdBean);
-		    		storageManager.getRMDBeanMgr().update(rmdBean);
-		    	}	    		    	
-		    }
-			} 	    
-	    
-			if(transaction != null && transaction.isActive()) transaction.commit();
-			
+				RMDBean finderRMDBean = new RMDBean();
+				finderRMDBean.setTerminated(false);
+
+				List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+				Iterator beans = rmdBeans.iterator();
+				while (beans.hasNext()) {
+					RMDBean rmdBean = (RMDBean) beans.next();
+
+					long timeNow = System.currentTimeMillis();
+					long lastActivated = rmdBean.getLastActivatedTime();
+
+					if ((lastActivated + propertyBean.getInactivityTimeoutInterval()) < timeNow) {
+						// Terminate
+						rmdBean.setTerminated(true);
+						rmdBean.setLastActivatedTime(timeNow);
+						if (log.isDebugEnabled())
+							log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + rmdBean);
+						storageManager.getRMDBeanMgr().update(rmdBean);
+					}
+				}
+			}
+
+			if (transaction != null && transaction.isActive())
+				transaction.commit();
+
 		} catch (SandeshaException e) {
 			if (log.isErrorEnabled())
 				log.error(e);
 		} finally {
-			if(transaction != null && transaction.isActive()) {
+			if (transaction != null && transaction.isActive()) {
 				try {
 					transaction.rollback();
 				} catch (SandeshaStorageException e) {
@@ -351,67 +397,72 @@
 				}
 			}
 		}
-		
-		if (log.isDebugEnabled()) 
+
+		if (log.isDebugEnabled())
 			log.debug("Exit: Sender::deleteTerminatedSequences");
 	}
-	
-	private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime) 
 
-	throws SandeshaStorageException {		
-		if (log.isDebugEnabled()) 
+	private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime)
+
+	throws SandeshaStorageException {
+		if (log.isDebugEnabled())
 			log.debug("Enter: Sender::deleteRMSBeans");
 
-    Iterator beans = rmsBeans.iterator();
-    
-    while (beans.hasNext())
-    {
-    	RMSBean rmsBean = (RMSBean)beans.next();
-    	long timeNow = System.currentTimeMillis();
-    	long lastActivated = rmsBean.getLastActivatedTime();
-    	// delete sequences that have been timedout or deleted for more than 
-    	// the SequenceRemovalTimeoutInterval
-   	
-    	if ((lastActivated + deleteTime) < timeNow) {
-    		if (log.isDebugEnabled())
-    			log.debug("Removing RMSBean " + rmsBean);
-    		storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
-    		storageManager.removeMessageContext( rmsBean.getReferenceMessageStoreKey() );
-    	}	    	
-    }
+		Iterator beans = rmsBeans.iterator();
 
-		if (log.isDebugEnabled()) 
+		while (beans.hasNext()) {
+			RMSBean rmsBean = (RMSBean) beans.next();
+			long timeNow = System.currentTimeMillis();
+			long lastActivated = rmsBean.getLastActivatedTime();
+			// delete sequences that have been timedout or deleted for more than
+			// the SequenceRemovalTimeoutInterval
+
+			if ((lastActivated + deleteTime) < timeNow) {
+				if (log.isDebugEnabled())
+					log.debug("Removing RMSBean " + rmsBean);
+				storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+				storageManager.removeMessageContext(rmsBean.getReferenceMessageStoreKey());
+			}
+		}
+
+		if (log.isDebugEnabled())
 			log.debug("Exit: Sender::deleteRMSBeans");
 	}
 
-	private void unblockTransportThreads(StorageManager manager)
-	throws SandeshaStorageException
-	{
-		if (log.isDebugEnabled()) log.debug("Enter: Sender::unblockTransportThreads");
+	private void unblockTransportThreads(StorageManager manager) throws SandeshaStorageException {
+		if (log.isDebugEnabled())
+			log.debug("Enter: Sender::unblockTransportThreads");
 
 		Transaction transaction = null;
 		try {
 			transaction = manager.getTransaction();
-			
-			// This finder will look for beans that have been locking the transport for longer than
-			// the TRANSPORT_WAIT_TIME. The match method for SenderBeans does the time comparison
+
+			// This finder will look for beans that have been locking the
+			// transport for longer than
+			// the TRANSPORT_WAIT_TIME. The match method for SenderBeans does
+			// the time comparison
 			// for us.
 			SenderBean finder = new SenderBean();
 			finder.setSend(false);
 			finder.setTransportAvailable(true);
 			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
-			
+
 			List beans = manager.getSenderBeanMgr().find(finder);
 			Iterator beanIter = beans.iterator();
-			while(beanIter.hasNext()) {
-				// The beans we have found are assigned to an internal sequence id, but the create
-				// sequence has not completed yet (and perhaps never will). Server-side, most of the
-				// info that we can usefully print is associated with the inbound sequence that generated
+			while (beanIter.hasNext()) {
+				// The beans we have found are assigned to an internal sequence
+				// id, but the create
+				// sequence has not completed yet (and perhaps never will).
+				// Server-side, most of the
+				// info that we can usefully print is associated with the
+				// inbound sequence that generated
 				// this message.
 				SenderBean bean = (SenderBean) beanIter.next();
-				
-				// Load the message, so that we can free the transport (if there is one there). The
-				// case we are trying to free up is when there is a request-response transport, and
+
+				// Load the message, so that we can free the transport (if there
+				// is one there). The
+				// case we are trying to free up is when there is a
+				// request-response transport, and
 				// it's still there waiting.
 				MessageContext msgCtx = manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
 
@@ -422,111 +473,126 @@
 					inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
 				if (inMsg != null)
 					t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
-	
-				if((t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
-					if(log.isWarnEnabled()) {
+
+				if ((t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
+					if (log.isWarnEnabled()) {
 						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
 						log.warn(message);
 					}
-					// If the message is a reply, then the request may need to be acked. Rather
-					// than just return a HTTP 202, we should try to send an ack.
+					// If the message is a reply, then the request may need to
+					// be acked. Rather
+					// than just return a HTTP 202, we should try to send an
+					// ack.
 					boolean sendAck = false;
 					RMDBean inbound = null;
 					String inboundSeq = bean.getInboundSequenceId();
-					if(inboundSeq != null) 
+					if (inboundSeq != null)
 						inbound = SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
-					
-					if(inbound != null) {
+
+					if (inbound != null) {
 						EndpointReference acksToEPR = inbound.getAcksToEndpointReference();
-						if(acksToEPR!=null && acksToEPR.hasAnonymousAddress())
+						if (acksToEPR != null && acksToEPR.hasAnonymousAddress())
 							sendAck = true;
 					}
-					
-					if(sendAck) {
+
+					if (sendAck) {
 						RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
-						RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
-								rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
+						RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
 						AcknowledgementManager.sendAckNow(ackRMMsgCtx);
 						TransportUtils.setResponseWritten(msgCtx, true);
 					} else {
 						TransportUtils.setResponseWritten(msgCtx, false);
 					}
-	
-					// Mark the bean so that we know the transport is missing, and reset the send time
+
+					// Mark the bean so that we know the transport is missing,
+					// and reset the send time
 					bean.setTransportAvailable(false);
 					bean.setTimeToSend(System.currentTimeMillis());
-					
+
 					// Update the bean
 					manager.getSenderBeanMgr().update(bean);
 				}
 			}
-	
-			if(transaction != null && transaction.isActive()) transaction.commit();
+
+			if (transaction != null && transaction.isActive())
+				transaction.commit();
 			transaction = null;
-			
-		} catch(Exception e) {
-			// There isn't much we can do here, so log the exception and continue.
-			if(log.isDebugEnabled()) log.debug("Exception", e);
+
+		} catch (Exception e) {
+			// There isn't much we can do here, so log the exception and
+			// continue.
+			if (log.isDebugEnabled())
+				log.debug("Exception", e);
 		} finally {
-			if(transaction != null && transaction.isActive()) transaction.rollback();
+			if (transaction != null && transaction.isActive())
+				transaction.rollback();
 		}
-		
-		if (log.isDebugEnabled()) log.debug("Exit: Sender::unblockTransportThreads");
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: Sender::unblockTransportThreads");
 	}
-		
-	private void checkForOrphanMessages(StorageManager manager)
-	throws SandeshaStorageException
-	{
-		if(log.isDebugEnabled()) log.debug("Enter: Sender::checkForOrphanMessages");
-		
+
+	private void checkForOrphanMessages(StorageManager manager) throws SandeshaStorageException {
+		if (log.isDebugEnabled())
+			log.debug("Enter: Sender::checkForOrphanMessages");
+
 		Transaction tran = null;
 		try {
 			tran = manager.getTransaction();
-	
-			// This finder will look for beans that should have been sent, but could not be sent
-			// because they need a MakeConnection message to come in to pick it up. We also factor
-			// in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to arrive.
+
+			// This finder will look for beans that should have been sent, but
+			// could not be sent
+			// because they need a MakeConnection message to come in to pick it
+			// up. We also factor
+			// in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to
+			// arrive.
 			SenderBean finder = new SenderBean();
 			finder.setSend(true);
 			finder.setTransportAvailable(false);
 			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
-			
+
 			List beans = manager.getSenderBeanMgr().find(finder);
 			Iterator beanIter = beans.iterator();
-			while(beanIter.hasNext()) {
+			while (beanIter.hasNext()) {
 				SenderBean bean = (SenderBean) beanIter.next();
-				
-				// Emit a message to warn the user that MakeConnections are not arriving to pick
+
+				// Emit a message to warn the user that MakeConnections are not
+				// arriving to pick
 				// messages up
-				if(log.isWarnEnabled()) {
+				if (log.isWarnEnabled()) {
 					String message = null;
 					String internalSequenceID = bean.getInternalSequenceID();
 					String sequenceID = bean.getSequenceID();
-					if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)					
-						message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);				
-					else
-					{
+					if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)
+						message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);
+					else {
 						String messageType = Integer.toString(bean.getMessageType());
 						message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, messageType, sequenceID, internalSequenceID);
 					}
 					log.warn(message);
 				}
-				
-				// Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
+
+				// Update the bean so that we won't emit another message for
+				// another TRANSPORT_WAIT_TIME
 				bean.setTimeToSend(System.currentTimeMillis());
 				manager.getSenderBeanMgr().update(bean);
 			}
-	
-			if(tran != null && tran.isActive()) tran.commit();
+
+			if (tran != null && tran.isActive())
+				tran.commit();
 			tran = null;
-	
-		} catch(Exception e) {
-			// There isn't much we can do here, so log the exception and continue.
-			if(log.isDebugEnabled()) log.debug("Exception", e);
+
+		} catch (Exception e) {
+			// There isn't much we can do here, so log the exception and
+			// continue.
+			if (log.isDebugEnabled())
+				log.debug("Exception", e);
 		} finally {
-			if(tran != null && tran.isActive()) tran.rollback();
+			if (tran != null && tran.isActive())
+				tran.rollback();
 		}
-		
-		if(log.isDebugEnabled()) log.debug("Exit: Sender::checkForOrphanMessages");
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: Sender::checkForOrphanMessages");
 	}
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org