You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2008/06/24 10:22:55 UTC
svn commit: r671059 - in
/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2:
util/AcknowledgementManager.java util/SequenceManager.java
workers/Sender.java
Author: gatfora
Date: Tue Jun 24 01:22:54 2008
New Revision: 671059
URL: http://svn.apache.org/viewvc?rev=671059&view=rev
Log:
Applying patches from SANDESHA2-164, thanks David and Sara
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java Tue Jun 24 01:22:54 2008
@@ -42,6 +42,7 @@
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.workers.Sender;
/**
* Contains logic for managing acknowledgements.
@@ -58,96 +59,102 @@
* @param applicationRMMsgContext
* @throws SandeshaException
*/
- public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager)
- throws SandeshaException {
+ public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager) throws SandeshaException {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
-
+
SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
- // If this message is going to an anonymous address, and the inbound sequence has
+ // If this message is going to an anonymous address, and the inbound
+ // sequence has
// anonymous acksTo, then we add in an ack for the inbound sequence.
EndpointReference target = rmMessageContext.getTo();
- if(target == null || target.hasAnonymousAddress()) {
- // We have no good indicator of the identity of the destination, so the only sequence
- // we can ack is the inbound one that caused us to create this response.
+ if (target == null || target.hasAnonymousAddress()) {
+ // We have no good indicator of the identity of the destination, so
+ // the only sequence
+ // we can ack is the inbound one that caused us to create this
+ // response.
String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
- if(inboundSequence != null) {
+ if (inboundSequence != null) {
RMDBean inboundBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
- if(inboundBean != null && !inboundBean.isTerminated()) {
+ if (inboundBean != null && !inboundBean.isTerminated()) {
EndpointReference acksToEPR = inboundBean.getAcksToEndpointReference();
- if(acksToEPR == null || acksToEPR.hasAnonymousAddress()) {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
+ if (acksToEPR == null || acksToEPR.hasAnonymousAddress()) {
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, inboundBean, false);
}
}
}
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
return;
- }
- else{
- //an addressable EPR
- if(SandeshaUtil.hasReferenceParameters(target)){
- //we should not proceed since we cannot properly compare ref params
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
+ } else {
+ // an addressable EPR
+ if (SandeshaUtil.hasReferenceParameters(target)) {
+ // we should not proceed since we cannot properly compare ref
+ // params
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
return;
}
-
- // From here on, we must be dealing with a real address. Piggyback all sequences that have an
- // acksTo that matches the To address, and that have an ackMessage queued up for sending. We
- // search for RMDBeans first, to avoid a deadlock.
- //
- // As a special case, if this is a terminate sequence message then add in ack messages for
- // any sequences that have an acksTo that matches the target address. This helps to ensure
- // that request-response sequence pairs end cleanly.
- RMDBean findRMDBean = new RMDBean();
- findRMDBean.setAcksToEndpointReference(target);
- findRMDBean.setTerminated(false);
- Collection rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
- Iterator sequences = rmdBeans.iterator();
- while(sequences.hasNext()) {
- RMDBean sequence = (RMDBean) sequences.next();
- if(SandeshaUtil.hasReferenceParameters(sequence.getAcksToEndpointReference())){
- //we should not piggy back if there are reference parameters in the acksTo EPR since we cannot compare them
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
- break;
- }
-
- String sequenceId = sequence.getSequenceID();
-
- // Look for the SenderBean that carries the ack, there should be at most one
- SenderBean findBean = new SenderBean();
- findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
- findBean.setSend(true);
- findBean.setSequenceID(sequenceId);
- findBean.setToAddress(target.getAddress());
-
- SenderBean ackBean = retransmitterBeanMgr.findUnique(findBean);
-
- // Piggybacking will happen only if the end of ack interval (timeToSend) is not reached.
- long timeNow = System.currentTimeMillis();
- if (ackBean != null && ackBean.getTimeToSend() > timeNow) {
- // Delete the beans that would have sent the ack
- retransmitterBeanMgr.delete(ackBean.getMessageID());
- storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
-
- if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
- RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false);
-
-
- } else if(rmMessageContext.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Adding extra acks, as this is a terminate");
-
- if(sequence.getHighestInMessageNumber() > 0) {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
- RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false);
+ String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
+ // If there's an inbound sequence (i.e. we're provider side) we'll
+ // use that, otherwise
+ // we'll go to the expense of looking the sequence up by the acksTo
+ // address.
+ if (inboundSequence != null) {
+ // We used to look for an ack sender bean before piggybacking an
+ // ack, but in the high-througput
+ // scenarios there always was one, and in the low thoughput
+ // scenarios it's less of an issue if
+ // we piggyback when we don't have to. so for now, lets mimic
+ // the old high-throughout behaviour
+ // in a cheap way by always piggybacking.
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Piggybacking ack for sequence: " + inboundSequence);
+ RMDBean sequence = storageManager.getRMDBeanMgr().retrieve(inboundSequence);
+ RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, sequence, false);
+ ((Sender) storageManager.getSender()).removeScheduledAcknowledgement(inboundSequence);
+ } else {
+ RMDBean findRMDBean = new RMDBean();
+ findRMDBean.setAcksToEndpointReference(target);
+ findRMDBean.setTerminated(false);
+ Collection rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
+ Iterator sequences = rmdBeans.iterator();
+ while (sequences.hasNext()) {
+ RMDBean sequence = (RMDBean) sequences.next();
+ if (SandeshaUtil.hasReferenceParameters(sequence.getAcksToEndpointReference())) {
+ // we should not piggy back if there are reference
+ // parameters in the acksTo EPR since we cannot compare
+ // them
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
+ break;
}
+
+ String sequenceId = sequence.getSequenceID();
+
+ // We used to look for an ack sender bean before
+ // piggybacking an ack, but in the high-througput
+ // scenarios there always was one, and in the low thoughput
+ // scenarios it's less of an issue if
+ // we piggyback when we don't have to. so for now, lets
+ // mimic the old high-throughout behaviour
+ // in a cheap way by always piggybacking.
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Piggybacking ack for sequence: " + sequenceId);
+
+ RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false);
+
+ ((Sender) storageManager.getSender()).removeScheduledAcknowledgement(sequenceId);
+
}
}
}
-
+
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
return;
@@ -159,20 +166,18 @@
* @param sequencePropertyKey
* @param sequenceId
* @param storageManager
- * @param makeResponse Some work will be done to make the new ack message the response of the reference message.
+ * @param makeResponse
+ * Some work will be done to make the new ack message the
+ * response of the reference message.
* @return
* @throws AxisFault
*/
public static RMMsgContext generateAckMessage(
-
- RMMsgContext referenceRMMessage,
- RMDBean rmdBean,
- String sequenceId,
- StorageManager storageManager,
- boolean serverSide
-
- ) throws AxisFault {
-
+
+ RMMsgContext referenceRMMessage, RMDBean rmdBean, String sequenceId, StorageManager storageManager, boolean serverSide
+
+ ) throws AxisFault {
+
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: AcknowledgementManager::generateAckMessage " + rmdBean);
@@ -180,16 +185,14 @@
EndpointReference acksTo = rmdBean.getAcksToEndpointReference();
- if (acksTo==null || acksTo.getAddress() == null)
+ if (acksTo == null || acksTo.getAddress() == null)
throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
- AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
- Sandesha2Constants.MessageTypes.ACK,
- rmdBean.getRMVersion(),
- referenceMsg.getAxisService());
+ AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.ACK, rmdBean.getRMVersion(), referenceMsg
+ .getAxisService());
MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
-
+
ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
@@ -198,8 +201,7 @@
ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
- SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
- .getSOAPVersion(referenceMsg.getEnvelope()));
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(referenceMsg.getEnvelope()));
// Setting new envelope
SOAPEnvelope envelope = factory.getDefaultEnvelope();
@@ -207,7 +209,7 @@
ackMsgCtx.setEnvelope(envelope);
ackMsgCtx.setTo(acksTo);
-
+
ackMsgCtx.setServerSide(serverSide);
// adding the SequenceAcknowledgement part.
@@ -218,16 +220,13 @@
return ackRMMsgCtx;
}
-
-
-
public static boolean verifySequenceCompletion(RangeString ackRanges, long lastMessageNo) {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
boolean result = false;
Range complete = new Range(1, lastMessageNo);
- if(ackRanges.isRangeCompleted(complete)) {
+ if (ackRanges.isRangeCompleted(complete)) {
result = true;
}
@@ -235,17 +234,14 @@
log.debug("Exit: AcknowledgementManager::verifySequenceCompletion " + result);
return result;
}
-
- public static void addAckBeanEntry (
- RMMsgContext ackRMMsgContext,
- String sequenceId,
- long timeToSend,
- StorageManager storageManager) throws AxisFault {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: AcknowledgementManager::addAckBeanEntry");
+
+ public static void addAckBeanEntry(RMMsgContext ackRMMsgContext, String sequenceId, long timeToSend, StorageManager storageManager) throws AxisFault {
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::addAckBeanEntry");
// Write the acks into the envelope
ackRMMsgContext.addSOAPEnvelope();
-
+
MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
@@ -258,7 +254,7 @@
ackBean.setReSend(false);
ackBean.setSequenceID(sequenceId);
EndpointReference to = ackMsgContext.getTo();
- if (to!=null)
+ if (to != null)
ackBean.setToAddress(to.getAddress());
ackBean.setSend(true);
@@ -275,9 +271,9 @@
Collection coll = retransmitterBeanMgr.find(findBean);
Iterator it = coll.iterator();
- while(it.hasNext()) {
+ while (it.hasNext()) {
SenderBean oldAckBean = (SenderBean) it.next();
- if(oldAckBean.getTimeToSend() < timeToSend)
+ if (oldAckBean.getTimeToSend() < timeToSend)
timeToSend = oldAckBean.getTimeToSend();
// removing the retransmitted entry for the oldAck
@@ -290,42 +286,44 @@
ackBean.setTimeToSend(timeToSend);
ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
+
// passing the message through sandesha2sender
ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
-
- SandeshaUtil.executeAndStore(ackRMMsgContext, key, storageManager);
+
+ SandeshaUtil.executeAndStore(ackRMMsgContext, key, storageManager);
// inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::addAckBeanEntry");
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::addAckBeanEntry");
}
-
- public static void sendAckNow (RMMsgContext ackRMMsgContext) throws AxisFault {
+
+ public static void sendAckNow(RMMsgContext ackRMMsgContext) throws AxisFault {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: AcknowledgementManager::sendAckNow");
// Write the acks into the envelope
ackRMMsgContext.addSOAPEnvelope();
-
+
MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
-
+
// setting CONTEXT_WRITTEN since acksto is anonymous
if (ackRMMsgContext.getMessageContext().getOperationContext() == null) {
// operation context will be null when doing in a GLOBAL
// handler.
AxisOperation op = ackMsgContext.getAxisOperation();
- OperationContext opCtx = OperationContextFactory.createOperationContext(op.getAxisSpecificMEPConstant(), op, ackRMMsgContext.getMessageContext().getServiceContext());
+ OperationContext opCtx = OperationContextFactory.createOperationContext(op.getAxisSpecificMEPConstant(), op, ackRMMsgContext.getMessageContext()
+ .getServiceContext());
ackRMMsgContext.getMessageContext().setOperationContext(opCtx);
}
ackRMMsgContext.getMessageContext().setServerSide(true);
-
+
AxisEngine.send(ackMsgContext);
-
+
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
- log.debug("Exit: AcknowledgementManager::sendAckNow");
- }
+ log.debug("Exit: AcknowledgementManager::sendAckNow");
+ }
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java Tue Jun 24 01:22:54 2008
@@ -19,10 +19,15 @@
package org.apache.sandesha2.util;
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMFactory;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.Parameter;
@@ -204,10 +209,11 @@
// Server side, we want the replyTo and AcksTo EPRs to point into this server.
// We can work that out by looking at the RMD bean that pulled the message in,
// and copying its 'ReplyTo' address.
- if(inboundBean != null && inboundBean.getReplyToEndpointReference() != null) {
- acksToEPR = inboundBean.getReplyToEndpointReference();
- replyToEPR = inboundBean.getReplyToEndpointReference();
- } else {
+ EndpointReference strippedReplyToEpr = stripAddress(inboundBean.getReplyToEndpointReference());
+ if(inboundBean != null && strippedReplyToEpr != null) {
+ acksToEPR = strippedReplyToEpr;
+ replyToEPR = strippedReplyToEpr;
+ } else {
String beanInfo = (inboundBean == null) ? "null" : inboundBean.toString();
String message = SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.cannotChooseAcksTo, inboundSequence, beanInfo);
@@ -384,4 +390,40 @@
return specVersion;
}
+
+ /* becuase RM reuses the incoming EPRs. Need to use only the address.
+ */
+ private static EndpointReference stripAddress(EndpointReference eprIn){
+ if(log.isDebugEnabled()) log.debug("stripAddress from EndpointReference : " + eprIn);
+ EndpointReference epr = new EndpointReference(eprIn.getAddress());
+ return epr;
+/**
+ String schemaNs = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd";
+ String securityNs = "http://schemas.xmlsoap.org/ws/2003/06/utility";
+
+ Iterator it = eprOut.getAttributes().iterator();
+ while(it.hasNext()){
+ OMAttribute attribute = (OMAttribute)it.next();
+ String ns = attribute.getNamespace().getNamespaceURI();
+ String name = attribute.getLocalName();
+ if((schemaNs.equals(ns) || securityNs.equals(ns)) && "Id".equals(name)){
+ //delete attribute
+ it.remove();
+ }
+ }
+ Iterator it2 = eprOut.getAddressAttributes().iterator();
+ while(it2.hasNext()){
+ OMAttribute attribute = (OMAttribute)it2.next();
+ String ns = attribute.getNamespace().getNamespaceURI();
+ String name = attribute.getLocalName();
+ if((schemaNs.equals(ns) || securityNs.equals(ns)) && "Id".equals(name)){
+ //delete attribute
+ it2.remove();
+ }
+ }
+
+ return eprOut;
+ **/
+}
+
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Tue Jun 24 01:22:54 2008
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.MessageContext;
@@ -63,17 +64,39 @@
// If this sender is working for several sequences, we use round-robin to
// try and give them all a chance to invoke messages.
int nextIndex = 0;
+
boolean processedMessage = false;
+
long lastHousekeeping = 0;
-
+
private static int HOUSEKEEPING_INTERVAL = 20000;
-
- public Sender () {
+
+ private ConcurrentHashMap ackMap = new ConcurrentHashMap();
+
+ private static class AckHolder {
+ public long tts = 0;
+
+ public RMMsgContext refMsg;
+ }
+
+ public Sender() {
super(Sandesha2Constants.SENDER_SLEEP_TIME);
}
+ public void scheduleAddressableAcknowledgement(String sequenceId, long ackInterval, RMMsgContext ref) {
+ AckHolder ackH = new AckHolder();
+ ackH.tts = System.currentTimeMillis() + ackInterval;
+ ackH.refMsg = ref;
+ ackMap.putIfAbsent(sequenceId, ackH);
+ }
+
+ public void removeScheduledAcknowledgement(String sequenceId) {
+ ackMap.remove(sequenceId);
+ }
+
protected boolean internalRun() {
- if (log.isDebugEnabled()) log.debug("Enter: Sender::internalRun");
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::internalRun");
Transaction transaction = null;
boolean sleep = false;
@@ -85,33 +108,38 @@
if (log.isDebugEnabled())
log.debug("Choosing one from " + size + " sequences");
- if(nextIndex >= size) {
+ if (nextIndex >= size) {
nextIndex = 0;
- // We just looped over the set of sequences. If we didn't process any
+ // We just looped over the set of sequences. If we didn't
+ // process any
// messages on this loop then we sleep before the next one
- if(size == 0 || !processedMessage) {
+ if (size == 0 || !processedMessage) {
sleep = true;
}
processedMessage = false;
-
- if(System.currentTimeMillis()-lastHousekeeping > HOUSEKEEPING_INTERVAL){
- // At this point - delete any sequences that have timed out, or been terminated.
+
+ if (System.currentTimeMillis() - lastHousekeeping > HOUSEKEEPING_INTERVAL) {
+ // At this point - delete any sequences that have timed out,
+ // or been terminated.
deleteTerminatedSequences(storageManager);
- // Also clean up and sender beans that are not yet eligible for sending, but
+ // Also clean up and sender beans that are not yet eligible
+ // for sending, but
// are blocking the transport threads.
unblockTransportThreads(storageManager);
- // Finally, check for messages that can only be serviced by polling, and warn
+ // Finally, check for messages that can only be serviced by
+ // polling, and warn
// the user if they are too old
checkForOrphanMessages(storageManager);
lastHousekeeping = System.currentTimeMillis();
}
- if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
return sleep;
}
-
+
transaction = storageManager.getTransaction();
SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
@@ -122,62 +150,78 @@
String rmVersion = null;
// Check that the sequence is still valid
boolean found = false;
- if(entry.isRmSource()) {
+ if (entry.isRmSource()) {
RMSBean matcher = new RMSBean();
matcher.setInternalSequenceID(sequenceId);
matcher.setTerminated(false);
RMSBean rms = storageManager.getRMSBeanMgr().findUnique(matcher);
- if(rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
- sequenceId = rms.getSequenceID();
- if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))
+ if (rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
+ sequenceId = rms.getSequenceID();
+ if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))
SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, storageManager);
else
found = true;
rmVersion = rms.getRMVersion();
}
-
+
} else {
RMDBean matcher = new RMDBean();
matcher.setSequenceID(sequenceId);
matcher.setTerminated(false);
RMDBean rmd = storageManager.getRMDBeanMgr().findUnique(matcher);
- if(rmd != null) {
+ if (rmd != null) {
found = true;
rmVersion = rmd.getRMVersion();
}
}
if (!found) {
stopThreadForSequence(sequenceId, entry.isRmSource());
- if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, sequence has ended");
-
- if(transaction != null && transaction.isActive()) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::internalRun, sequence has ended");
+
+ if (transaction != null && transaction.isActive()) {
transaction.commit();
transaction = null;
}
-
+
return false;
}
-
+ if (sequenceId != null) {
+ AckHolder acktts = (AckHolder) ackMap.get(sequenceId);
+ if (acktts != null && acktts.tts < System.currentTimeMillis()) {
+ ackMap.remove(sequenceId);
+ RMDBean rmd = storageManager.getRMDBeanMgr().retrieve(sequenceId);
+ if (rmd != null) {
+ RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(acktts.refMsg, rmd, sequenceId, storageManager, true);
+
+ AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, acktts.tts, storageManager);
+ transaction.commit();
+ transaction = storageManager.getTransaction();
+ }
+
+ }
+ }
SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
SenderBean senderBean = mgr.getNextMsgToSend(sequenceId);
-
+
if (senderBean == null) {
- if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, no message for this sequence");
-
- if(transaction != null && transaction.isActive()) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::internalRun, no message for this sequence");
+
+ if (transaction != null && transaction.isActive()) {
transaction.commit();
transaction = null;
}
-
+
return false; // Move on to the next sequence in the list
}
// work Id is used to define the piece of work that will be
// assigned to the Worker thread,
// to handle this Sender bean.
-
- //workId contains a timeTiSend part to cater for retransmissions.
- //This will cause retransmissions to be treated as new work.
+
+ // workId contains a timeTiSend part to cater for retransmissions.
+ // This will cause retransmissions to be treated as new work.
String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
// check weather the bean is already assigned to a worker.
@@ -185,51 +229,51 @@
// As there is already a worker running we are probably looping
// too fast, so sleep on the next loop.
if (log.isDebugEnabled()) {
- String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.workAlreadyAssigned,
- workId);
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
}
-
- if(transaction != null && transaction.isActive()) {
+
+ if (transaction != null && transaction.isActive()) {
transaction.commit();
transaction = null;
}
-
+
return true;
}
- //commiting the transaction here to release resources early.
- if(transaction != null && transaction.isActive()) transaction.commit();
+ // commiting the transaction here to release resources early.
+ if (transaction != null && transaction.isActive())
+ transaction.commit();
transaction = null;
// start a worker which will work on this messages.
SenderWorker worker = new SenderWorker(context, senderBean, rmVersion);
worker.setLock(getWorkerLock());
worker.setWorkId(workId);
-
+
try {
// Set the lock up before we start the thread, but roll it back
// if we hit any problems
getWorkerLock().addWork(workId, worker);
threadPool.execute(worker);
- } catch(Exception e) {
+ } catch (Exception e) {
getWorkerLock().removeWork(workId);
- }
+ }
- // If we got to here then we found work to do on the sequence, so we should
+ // If we got to here then we found work to do on the sequence, so we
+ // should
// remember not to sleep at the end of the list of sequences.
processedMessage = true;
-
+
} catch (Exception e) {
// TODO : when this is the client side throw the exception to
// the client when necessary.
-
- //TODO rollback only if a SandeshaStorageException.
- //This allows the other Exceptions to be used within the Normal flow.
-
+ // TODO rollback only if a SandeshaStorageException.
+ // This allows the other Exceptions to be used within the Normal
+ // flow.
+
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
log.debug(message, e);
} finally {
@@ -238,111 +282,113 @@
transaction.rollback();
transaction = null;
} catch (Exception e) {
- String message = SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.rollbackError, e.toString());
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
log.debug(message, e);
}
}
}
- if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, not sleeping");
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::internalRun, not sleeping");
return false;
}
/**
- * Finds any RMDBeans that have not been used inside the set InnactivityTimeoutInterval
+ * Finds any RMDBeans that have not been used inside the set
+ * InnactivityTimeoutInterval
+ *
+ * Iterates through RMSBeans and RMDBeans that have been terminated or timed
+ * out and deletes them.
*
- * Iterates through RMSBeans and RMDBeans that have been terminated or timed out and
- * deletes them.
- *
*/
private void deleteTerminatedSequences(StorageManager storageManager) {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled())
log.debug("Enter: Sender::deleteTerminatedSequences");
RMSBean finderBean = new RMSBean();
finderBean.setTerminated(true);
-
+
Transaction transaction = null;
-
+
try {
transaction = storageManager.getTransaction();
-
- SandeshaPolicyBean propertyBean =
- SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
-
- long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
- if (deleteTime < 0)
- deleteTime = 0;
- if (deleteTime > 0) {
+ SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
+
+ long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
+ if (deleteTime < 0)
+ deleteTime = 0;
+
+ if (deleteTime > 0) {
// Find terminated sequences.
- List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
-
- deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
-
- finderBean.setTerminated(false);
- finderBean.setTimedOut(true);
-
- // Find timed out sequences
- rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
-
- deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
-
- // Remove any terminated RMDBeans.
- RMDBean finderRMDBean = new RMDBean();
- finderRMDBean.setTerminated(true);
-
- List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
-
- Iterator beans = rmdBeans.iterator();
- while (beans.hasNext()) {
- RMDBean rmdBean = (RMDBean)beans.next();
-
- long timeNow = System.currentTimeMillis();
- long lastActivated = rmdBean.getLastActivatedTime();
-
- // delete sequences that have been timedout or deleted for more than
- // the SequenceRemovalTimeoutInterval
- if ((lastActivated + deleteTime) < timeNow) {
- if (log.isDebugEnabled())
- log.debug("Deleting RMDBean " + deleteTime + " : " + rmdBean);
- storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
- }
- }
- }
+ List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+
+ deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+
+ finderBean.setTerminated(false);
+ finderBean.setTimedOut(true);
+
+ // Find timed out sequences
+ rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+
+ deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+
+ // Remove any terminated RMDBeans.
+ RMDBean finderRMDBean = new RMDBean();
+ finderRMDBean.setTerminated(true);
+
+ List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+ Iterator beans = rmdBeans.iterator();
+ while (beans.hasNext()) {
+ RMDBean rmdBean = (RMDBean) beans.next();
- // Terminate RMD Sequences that have been inactive.
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmdBean.getLastActivatedTime();
+
+ // delete sequences that have been timedout or deleted for
+ // more than
+ // the SequenceRemovalTimeoutInterval
+ if ((lastActivated + deleteTime) < timeNow) {
+ if (log.isDebugEnabled())
+ log.debug("Deleting RMDBean " + deleteTime + " : " + rmdBean);
+ storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
+ }
+ }
+ }
+
+ // Terminate RMD Sequences that have been inactive.
if (propertyBean.getInactivityTimeoutInterval() > 0) {
- RMDBean finderRMDBean = new RMDBean();
- finderRMDBean.setTerminated(false);
-
- List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
-
- Iterator beans = rmdBeans.iterator();
- while (beans.hasNext()) {
- RMDBean rmdBean = (RMDBean)beans.next();
-
- long timeNow = System.currentTimeMillis();
- long lastActivated = rmdBean.getLastActivatedTime();
-
- if ((lastActivated + propertyBean.getInactivityTimeoutInterval()) < timeNow) {
- // Terminate
- rmdBean.setTerminated(true);
- rmdBean.setLastActivatedTime(timeNow);
- if (log.isDebugEnabled())
- log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + rmdBean);
- storageManager.getRMDBeanMgr().update(rmdBean);
- }
- }
- }
-
- if(transaction != null && transaction.isActive()) transaction.commit();
-
+ RMDBean finderRMDBean = new RMDBean();
+ finderRMDBean.setTerminated(false);
+
+ List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+ Iterator beans = rmdBeans.iterator();
+ while (beans.hasNext()) {
+ RMDBean rmdBean = (RMDBean) beans.next();
+
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmdBean.getLastActivatedTime();
+
+ if ((lastActivated + propertyBean.getInactivityTimeoutInterval()) < timeNow) {
+ // Terminate
+ rmdBean.setTerminated(true);
+ rmdBean.setLastActivatedTime(timeNow);
+ if (log.isDebugEnabled())
+ log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + rmdBean);
+ storageManager.getRMDBeanMgr().update(rmdBean);
+ }
+ }
+ }
+
+ if (transaction != null && transaction.isActive())
+ transaction.commit();
+
} catch (SandeshaException e) {
if (log.isErrorEnabled())
log.error(e);
} finally {
- if(transaction != null && transaction.isActive()) {
+ if (transaction != null && transaction.isActive()) {
try {
transaction.rollback();
} catch (SandeshaStorageException e) {
@@ -351,67 +397,72 @@
}
}
}
-
- if (log.isDebugEnabled())
+
+ if (log.isDebugEnabled())
log.debug("Exit: Sender::deleteTerminatedSequences");
}
-
- private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime)
- throws SandeshaStorageException {
- if (log.isDebugEnabled())
+ private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime)
+
+ throws SandeshaStorageException {
+ if (log.isDebugEnabled())
log.debug("Enter: Sender::deleteRMSBeans");
- Iterator beans = rmsBeans.iterator();
-
- while (beans.hasNext())
- {
- RMSBean rmsBean = (RMSBean)beans.next();
- long timeNow = System.currentTimeMillis();
- long lastActivated = rmsBean.getLastActivatedTime();
- // delete sequences that have been timedout or deleted for more than
- // the SequenceRemovalTimeoutInterval
-
- if ((lastActivated + deleteTime) < timeNow) {
- if (log.isDebugEnabled())
- log.debug("Removing RMSBean " + rmsBean);
- storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
- storageManager.removeMessageContext( rmsBean.getReferenceMessageStoreKey() );
- }
- }
+ Iterator beans = rmsBeans.iterator();
- if (log.isDebugEnabled())
+ while (beans.hasNext()) {
+ RMSBean rmsBean = (RMSBean) beans.next();
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmsBean.getLastActivatedTime();
+ // delete sequences that have been timedout or deleted for more than
+ // the SequenceRemovalTimeoutInterval
+
+ if ((lastActivated + deleteTime) < timeNow) {
+ if (log.isDebugEnabled())
+ log.debug("Removing RMSBean " + rmsBean);
+ storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+ storageManager.removeMessageContext(rmsBean.getReferenceMessageStoreKey());
+ }
+ }
+
+ if (log.isDebugEnabled())
log.debug("Exit: Sender::deleteRMSBeans");
}
- private void unblockTransportThreads(StorageManager manager)
- throws SandeshaStorageException
- {
- if (log.isDebugEnabled()) log.debug("Enter: Sender::unblockTransportThreads");
+ private void unblockTransportThreads(StorageManager manager) throws SandeshaStorageException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::unblockTransportThreads");
Transaction transaction = null;
try {
transaction = manager.getTransaction();
-
- // This finder will look for beans that have been locking the transport for longer than
- // the TRANSPORT_WAIT_TIME. The match method for SenderBeans does the time comparison
+
+ // This finder will look for beans that have been locking the
+ // transport for longer than
+ // the TRANSPORT_WAIT_TIME. The match method for SenderBeans does
+ // the time comparison
// for us.
SenderBean finder = new SenderBean();
finder.setSend(false);
finder.setTransportAvailable(true);
finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
-
+
List beans = manager.getSenderBeanMgr().find(finder);
Iterator beanIter = beans.iterator();
- while(beanIter.hasNext()) {
- // The beans we have found are assigned to an internal sequence id, but the create
- // sequence has not completed yet (and perhaps never will). Server-side, most of the
- // info that we can usefully print is associated with the inbound sequence that generated
+ while (beanIter.hasNext()) {
+ // The beans we have found are assigned to an internal sequence
+ // id, but the create
+ // sequence has not completed yet (and perhaps never will).
+ // Server-side, most of the
+ // info that we can usefully print is associated with the
+ // inbound sequence that generated
// this message.
SenderBean bean = (SenderBean) beanIter.next();
-
- // Load the message, so that we can free the transport (if there is one there). The
- // case we are trying to free up is when there is a request-response transport, and
+
+ // Load the message, so that we can free the transport (if there
+ // is one there). The
+ // case we are trying to free up is when there is a
+ // request-response transport, and
// it's still there waiting.
MessageContext msgCtx = manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
@@ -422,111 +473,126 @@
inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
if (inMsg != null)
t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
-
- if((t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
- if(log.isWarnEnabled()) {
+
+ if ((t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
+ if (log.isWarnEnabled()) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
log.warn(message);
}
- // If the message is a reply, then the request may need to be acked. Rather
- // than just return a HTTP 202, we should try to send an ack.
+ // If the message is a reply, then the request may need to
+ // be acked. Rather
+ // than just return a HTTP 202, we should try to send an
+ // ack.
boolean sendAck = false;
RMDBean inbound = null;
String inboundSeq = bean.getInboundSequenceId();
- if(inboundSeq != null)
+ if (inboundSeq != null)
inbound = SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
-
- if(inbound != null) {
+
+ if (inbound != null) {
EndpointReference acksToEPR = inbound.getAcksToEndpointReference();
- if(acksToEPR!=null && acksToEPR.hasAnonymousAddress())
+ if (acksToEPR != null && acksToEPR.hasAnonymousAddress())
sendAck = true;
}
-
- if(sendAck) {
+
+ if (sendAck) {
RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
- RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
- rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
+ RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
AcknowledgementManager.sendAckNow(ackRMMsgCtx);
TransportUtils.setResponseWritten(msgCtx, true);
} else {
TransportUtils.setResponseWritten(msgCtx, false);
}
-
- // Mark the bean so that we know the transport is missing, and reset the send time
+
+ // Mark the bean so that we know the transport is missing,
+ // and reset the send time
bean.setTransportAvailable(false);
bean.setTimeToSend(System.currentTimeMillis());
-
+
// Update the bean
manager.getSenderBeanMgr().update(bean);
}
}
-
- if(transaction != null && transaction.isActive()) transaction.commit();
+
+ if (transaction != null && transaction.isActive())
+ transaction.commit();
transaction = null;
-
- } catch(Exception e) {
- // There isn't much we can do here, so log the exception and continue.
- if(log.isDebugEnabled()) log.debug("Exception", e);
+
+ } catch (Exception e) {
+ // There isn't much we can do here, so log the exception and
+ // continue.
+ if (log.isDebugEnabled())
+ log.debug("Exception", e);
} finally {
- if(transaction != null && transaction.isActive()) transaction.rollback();
+ if (transaction != null && transaction.isActive())
+ transaction.rollback();
}
-
- if (log.isDebugEnabled()) log.debug("Exit: Sender::unblockTransportThreads");
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::unblockTransportThreads");
}
-
- private void checkForOrphanMessages(StorageManager manager)
- throws SandeshaStorageException
- {
- if(log.isDebugEnabled()) log.debug("Enter: Sender::checkForOrphanMessages");
-
+
+ private void checkForOrphanMessages(StorageManager manager) throws SandeshaStorageException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::checkForOrphanMessages");
+
Transaction tran = null;
try {
tran = manager.getTransaction();
-
- // This finder will look for beans that should have been sent, but could not be sent
- // because they need a MakeConnection message to come in to pick it up. We also factor
- // in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to arrive.
+
+ // This finder will look for beans that should have been sent, but
+ // could not be sent
+ // because they need a MakeConnection message to come in to pick it
+ // up. We also factor
+ // in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to
+ // arrive.
SenderBean finder = new SenderBean();
finder.setSend(true);
finder.setTransportAvailable(false);
finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
-
+
List beans = manager.getSenderBeanMgr().find(finder);
Iterator beanIter = beans.iterator();
- while(beanIter.hasNext()) {
+ while (beanIter.hasNext()) {
SenderBean bean = (SenderBean) beanIter.next();
-
- // Emit a message to warn the user that MakeConnections are not arriving to pick
+
+ // Emit a message to warn the user that MakeConnections are not
+ // arriving to pick
// messages up
- if(log.isWarnEnabled()) {
+ if (log.isWarnEnabled()) {
String message = null;
String internalSequenceID = bean.getInternalSequenceID();
String sequenceID = bean.getSequenceID();
- if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)
- message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);
- else
- {
+ if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)
+ message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);
+ else {
String messageType = Integer.toString(bean.getMessageType());
message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, messageType, sequenceID, internalSequenceID);
}
log.warn(message);
}
-
- // Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
+
+ // Update the bean so that we won't emit another message for
+ // another TRANSPORT_WAIT_TIME
bean.setTimeToSend(System.currentTimeMillis());
manager.getSenderBeanMgr().update(bean);
}
-
- if(tran != null && tran.isActive()) tran.commit();
+
+ if (tran != null && tran.isActive())
+ tran.commit();
tran = null;
-
- } catch(Exception e) {
- // There isn't much we can do here, so log the exception and continue.
- if(log.isDebugEnabled()) log.debug("Exception", e);
+
+ } catch (Exception e) {
+ // There isn't much we can do here, so log the exception and
+ // continue.
+ if (log.isDebugEnabled())
+ log.debug("Exception", e);
} finally {
- if(tran != null && tran.isActive()) tran.rollback();
+ if (tran != null && tran.isActive())
+ tran.rollback();
}
-
- if(log.isDebugEnabled()) log.debug("Exit: Sender::checkForOrphanMessages");
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::checkForOrphanMessages");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org