You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2007/05/21 18:14:09 UTC

svn commit: r540189 - in /webservices/sandesha/trunk/java/modules/core/src/main: java/org/apache/sandesha2/ java/org/apache/sandesha2/i18n/ java/org/apache/sandesha2/util/ java/org/apache/sandesha2/workers/ resources/org/apache/sandesha2/i18n/

Author: mlovett
Date: Mon May 21 09:14:08 2007
New Revision: 540189

URL: http://svn.apache.org/viewvc?view=rev&rev=540189
Log:
Release the transport if it is blocked too long. This frees up transport resources when doing sync-2-way messaging with RM.

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
    webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java Mon May 21 09:14:08 2007
@@ -490,6 +490,8 @@
 	int CLIENT_SLEEP_TIME = 10000;
 
 	int TERMINATE_DELAY = 100;
+	
+	static final int TRANSPORT_WAIT_TIME = 60000;
 
 	static final String TEMP_SEQUENCE_ID = "uuid:tempID";
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Mon May 21 09:14:08 2007
@@ -230,4 +230,7 @@
 	public final static String referenceMessageNotSetForSequence = "referenceMessageNotSetForSequence";
 	public final static String moduleNotSet = "moduleNotSet";
 	public final static String cannotSetPolicyBeanServiceNull = "cannotSetPolicyBeanServiceNull";
+	public final static String noPolling="noPolling";
+	public final static String freeingTransport="freeingTransport";
+
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java Mon May 21 09:14:08 2007
@@ -198,7 +198,7 @@
 
 		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
 				Sandesha2Constants.MessageTypes.ACK,
-				referenceRMMessage.getRMSpecVersion(),
+				rmdBean.getRMVersion(),
 				referenceMsg.getAxisService());
 
 		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Mon May 21 09:14:08 2007
@@ -21,8 +21,15 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.axis2.transport.RequestResponseTransport.RequestResponseTransportStatus;
+import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
@@ -35,6 +42,8 @@
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
 
@@ -81,7 +90,15 @@
 				
 				// At this point - delete any sequences that have timed out, or been terminated.
 				deleteTerminatedSequences(storageManager);
+
+				// Also clean up and sender beans that are not yet eligible for sending, but
+				// are blocking the transport threads.
+				unblockTransportThreads(storageManager);
 				
+				// Finally, check for messages that can only be serviced by polling, and warn
+				// the user if they are too old
+				checkForOrphanMessages(storageManager);
+
 				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
 				return sleep;
 			}
@@ -339,5 +356,146 @@
 
 		if (log.isDebugEnabled()) 
 			log.debug("Exit: Sender::deleteRMSBeans");
+	}
+
+	private void unblockTransportThreads(StorageManager manager)
+	throws SandeshaStorageException
+	{
+		if (log.isDebugEnabled()) log.debug("Enter: Sender::unblockTransportThreads");
+
+		Transaction transaction = null;
+		try {
+			transaction = manager.getTransaction();
+			
+			// This finder will look for beans that have been locking the transport for longer than
+			// the TRANSPORT_WAIT_TIME. The match method for SenderBeans does the time comparison
+			// for us.
+			SenderBean finder = new SenderBean();
+			finder.setSend(false);
+			finder.setTransportAvailable(true);
+			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
+			
+			List beans = manager.getSenderBeanMgr().find(finder);
+			Iterator beanIter = beans.iterator();
+			while(beanIter.hasNext()) {
+				// The beans we have found are assigned to an internal sequence id, but the create
+				// sequence has not completed yet (and perhaps never will). Server-side, most of the
+				// info that we can usefully print is associated with the inbound sequence that generated
+				// this message.
+				SenderBean bean = (SenderBean) beanIter.next();
+				
+				// Load the message, so that we can free the transport (if there is one there). The
+				// case we are trying to free up is when there is a request-response transport, and
+				// it's still there waiting.
+				MessageContext msgCtx = manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
+
+				RequestResponseTransport t = null;
+				MessageContext inMsg = null;
+				OperationContext op = msgCtx.getOperationContext();
+				if (op != null)
+					inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+				if (inMsg != null)
+					t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+	
+				if((t != null || !t.getStatus().equals(RequestResponseTransportStatus.WAITING))) {
+					if(log.isWarnEnabled()) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
+						log.warn(message);
+					}
+					// If the message is a reply, then the request may need to be acked. Rather
+					// than just return a HTTP 202, we should try to send an ack.
+					boolean sendAck = false;
+					RMDBean inbound = null;
+					String inboundSeq = bean.getInboundSequenceId();
+					if(inboundSeq != null) 
+						inbound = SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
+					
+					if(inbound != null) {
+						String acksTo = inbound.getAcksToEPR();
+						EndpointReference acksToEPR = new EndpointReference(acksTo);
+						if(acksTo == null || acksToEPR.hasAnonymousAddress())
+							sendAck = true;
+					}
+					
+					if(sendAck) {
+						RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+						RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
+								rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
+						AcknowledgementManager.sendAckNow(ackRMMsgCtx);
+						msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+						t.signalResponseReady();
+					} else {
+						msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+						t.acknowledgeMessage(msgCtx);
+					}
+	
+					// Mark the bean so that we know the transport is missing, and reset the send time
+					bean.setTransportAvailable(false);
+					bean.setTimeToSend(System.currentTimeMillis());
+					
+					// Update the bean
+					manager.getSenderBeanMgr().update(bean);
+				}
+			}
+	
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
+			
+		} catch(Exception e) {
+			// There isn't much we can do here, so log the exception and continue.
+			if(log.isDebugEnabled()) log.debug("Exception", e);
+		} finally {
+			if(transaction != null && transaction.isActive()) transaction.rollback();
+		}
+		
+		if (log.isDebugEnabled()) log.debug("Exit: Sender::unblockTransportThreads");
+	}
+		
+	private void checkForOrphanMessages(StorageManager manager)
+	throws SandeshaStorageException
+	{
+		if(log.isDebugEnabled()) log.debug("Enter: Sender::checkForOrphanMessages");
+		
+		Transaction tran = null;
+		try {
+			tran = manager.getTransaction();
+	
+			// This finder will look for beans that should have been sent, but could not be sent
+			// because they need a MakeConnection message to come in to pick it up. We also factor
+			// in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to arrive.
+			SenderBean finder = new SenderBean();
+			finder.setSend(true);
+			finder.setTransportAvailable(false);
+			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
+			
+			List beans = manager.getSenderBeanMgr().find(finder);
+			Iterator beanIter = beans.iterator();
+			while(beanIter.hasNext()) {
+				SenderBean bean = (SenderBean) beanIter.next();
+				
+				// Emit a message to warn the user that MakeConnections are not arriving to pick
+				// messages up
+				if(log.isWarnEnabled()) {
+					String messageType = Integer.toString(bean.getMessageType());
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, messageType);
+					log.warn(message);
+				}
+				
+				// Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
+				bean.setTimeToSend(System.currentTimeMillis());
+				manager.getSenderBeanMgr().update(bean);
+			}
+	
+			if(tran != null && tran.isActive()) tran.commit();
+			tran = null;
+	
+		} catch(Exception e) {
+			// There isn't much we can do here, so log the exception and continue.
+			if(log.isDebugEnabled()) log.debug("Exception", e);
+		} finally {
+			if(tran != null && tran.isActive()) tran.rollback();
+		}
+		
+		if(log.isDebugEnabled()) log.debug("Exit: Sender::checkForOrphanMessages");
 	}
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties Mon May 21 09:14:08 2007
@@ -78,6 +78,8 @@
 rmEnforceFailure=The message with MessageID ''{0}'' is not WSRM enabled but the service enforces WSRM.
 referenceMessageNotSetForSequence=ReferenceMessage has not been set for the sequence ''{0}''
 moduleNotSet=Sandesha Module has not been set at the initiation
+noPolling=A message has been waiting for a MakeConnection call. The message will continue to wait, but there may be a problem with the client configuration. Sandesha message type {0}.
+freeingTransport=Freeing transport resources. A message has held the transport for too long, check the log for other failures.
 
 #-------------------------------------
 #



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org