You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2007/05/21 18:14:09 UTC
svn commit: r540189 - in
/webservices/sandesha/trunk/java/modules/core/src/main:
java/org/apache/sandesha2/ java/org/apache/sandesha2/i18n/
java/org/apache/sandesha2/util/ java/org/apache/sandesha2/workers/
resources/org/apache/sandesha2/i18n/
Author: mlovett
Date: Mon May 21 09:14:08 2007
New Revision: 540189
URL: http://svn.apache.org/viewvc?view=rev&rev=540189
Log:
Release the transport if it is blocked too long. This frees up transport resources when doing sync-2-way messaging with RM.
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java Mon May 21 09:14:08 2007
@@ -490,6 +490,8 @@
int CLIENT_SLEEP_TIME = 10000;
int TERMINATE_DELAY = 100;
+
+ static final int TRANSPORT_WAIT_TIME = 60000;
static final String TEMP_SEQUENCE_ID = "uuid:tempID";
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Mon May 21 09:14:08 2007
@@ -230,4 +230,7 @@
public final static String referenceMessageNotSetForSequence = "referenceMessageNotSetForSequence";
public final static String moduleNotSet = "moduleNotSet";
public final static String cannotSetPolicyBeanServiceNull = "cannotSetPolicyBeanServiceNull";
+ public final static String noPolling="noPolling";
+ public final static String freeingTransport="freeingTransport";
+
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java Mon May 21 09:14:08 2007
@@ -198,7 +198,7 @@
AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
Sandesha2Constants.MessageTypes.ACK,
- referenceRMMessage.getRMSpecVersion(),
+ rmdBean.getRMVersion(),
referenceMsg.getAxisService());
MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Mon May 21 09:14:08 2007
@@ -21,8 +21,15 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.axis2.transport.RequestResponseTransport.RequestResponseTransportStatus;
+import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
@@ -35,6 +42,8 @@
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
@@ -81,7 +90,15 @@
// At this point - delete any sequences that have timed out, or been terminated.
deleteTerminatedSequences(storageManager);
+
+ // Also clean up and sender beans that are not yet eligible for sending, but
+ // are blocking the transport threads.
+ unblockTransportThreads(storageManager);
+ // Finally, check for messages that can only be serviced by polling, and warn
+ // the user if they are too old
+ checkForOrphanMessages(storageManager);
+
if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
return sleep;
}
@@ -339,5 +356,146 @@
if (log.isDebugEnabled())
log.debug("Exit: Sender::deleteRMSBeans");
+ }
+
+ private void unblockTransportThreads(StorageManager manager)
+ throws SandeshaStorageException
+ {
+ if (log.isDebugEnabled()) log.debug("Enter: Sender::unblockTransportThreads");
+
+ Transaction transaction = null;
+ try {
+ transaction = manager.getTransaction();
+
+ // This finder will look for beans that have been locking the transport for longer than
+ // the TRANSPORT_WAIT_TIME. The match method for SenderBeans does the time comparison
+ // for us.
+ SenderBean finder = new SenderBean();
+ finder.setSend(false);
+ finder.setTransportAvailable(true);
+ finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
+
+ List beans = manager.getSenderBeanMgr().find(finder);
+ Iterator beanIter = beans.iterator();
+ while(beanIter.hasNext()) {
+ // The beans we have found are assigned to an internal sequence id, but the create
+ // sequence has not completed yet (and perhaps never will). Server-side, most of the
+ // info that we can usefully print is associated with the inbound sequence that generated
+ // this message.
+ SenderBean bean = (SenderBean) beanIter.next();
+
+ // Load the message, so that we can free the transport (if there is one there). The
+ // case we are trying to free up is when there is a request-response transport, and
+ // it's still there waiting.
+ MessageContext msgCtx = manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
+
+ RequestResponseTransport t = null;
+ MessageContext inMsg = null;
+ OperationContext op = msgCtx.getOperationContext();
+ if (op != null)
+ inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ if (inMsg != null)
+ t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ if((t != null || !t.getStatus().equals(RequestResponseTransportStatus.WAITING))) {
+ if(log.isWarnEnabled()) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
+ log.warn(message);
+ }
+ // If the message is a reply, then the request may need to be acked. Rather
+ // than just return a HTTP 202, we should try to send an ack.
+ boolean sendAck = false;
+ RMDBean inbound = null;
+ String inboundSeq = bean.getInboundSequenceId();
+ if(inboundSeq != null)
+ inbound = SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
+
+ if(inbound != null) {
+ String acksTo = inbound.getAcksToEPR();
+ EndpointReference acksToEPR = new EndpointReference(acksTo);
+ if(acksTo == null || acksToEPR.hasAnonymousAddress())
+ sendAck = true;
+ }
+
+ if(sendAck) {
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+ RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
+ rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
+ AcknowledgementManager.sendAckNow(ackRMMsgCtx);
+ msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+ t.signalResponseReady();
+ } else {
+ msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ t.acknowledgeMessage(msgCtx);
+ }
+
+ // Mark the bean so that we know the transport is missing, and reset the send time
+ bean.setTransportAvailable(false);
+ bean.setTimeToSend(System.currentTimeMillis());
+
+ // Update the bean
+ manager.getSenderBeanMgr().update(bean);
+ }
+ }
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+
+ } catch(Exception e) {
+ // There isn't much we can do here, so log the exception and continue.
+ if(log.isDebugEnabled()) log.debug("Exception", e);
+ } finally {
+ if(transaction != null && transaction.isActive()) transaction.rollback();
+ }
+
+ if (log.isDebugEnabled()) log.debug("Exit: Sender::unblockTransportThreads");
+ }
+
+ private void checkForOrphanMessages(StorageManager manager)
+ throws SandeshaStorageException
+ {
+ if(log.isDebugEnabled()) log.debug("Enter: Sender::checkForOrphanMessages");
+
+ Transaction tran = null;
+ try {
+ tran = manager.getTransaction();
+
+ // This finder will look for beans that should have been sent, but could not be sent
+ // because they need a MakeConnection message to come in to pick it up. We also factor
+ // in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to arrive.
+ SenderBean finder = new SenderBean();
+ finder.setSend(true);
+ finder.setTransportAvailable(false);
+ finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
+
+ List beans = manager.getSenderBeanMgr().find(finder);
+ Iterator beanIter = beans.iterator();
+ while(beanIter.hasNext()) {
+ SenderBean bean = (SenderBean) beanIter.next();
+
+ // Emit a message to warn the user that MakeConnections are not arriving to pick
+ // messages up
+ if(log.isWarnEnabled()) {
+ String messageType = Integer.toString(bean.getMessageType());
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, messageType);
+ log.warn(message);
+ }
+
+ // Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
+ bean.setTimeToSend(System.currentTimeMillis());
+ manager.getSenderBeanMgr().update(bean);
+ }
+
+ if(tran != null && tran.isActive()) tran.commit();
+ tran = null;
+
+ } catch(Exception e) {
+ // There isn't much we can do here, so log the exception and continue.
+ if(log.isDebugEnabled()) log.debug("Exception", e);
+ } finally {
+ if(tran != null && tran.isActive()) tran.rollback();
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: Sender::checkForOrphanMessages");
}
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties Mon May 21 09:14:08 2007
@@ -78,6 +78,8 @@
rmEnforceFailure=The message with MessageID ''{0}'' is not WSRM enabled but the service enforces WSRM.
referenceMessageNotSetForSequence=ReferenceMessage has not been set for the sequence ''{0}''
moduleNotSet=Sandesha Module has not been set at the initiation
+noPolling=A message has been waiting for a MakeConnection call. The message will continue to wait, but there may be a problem with the client configuration. Sandesha message type {0}.
+freeingTransport=Freeing transport resources. A message has held the transport for too long, check the log for other failures.
#-------------------------------------
#
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org