You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/02/02 21:00:40 UTC

svn commit: r740092 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Author: cwiklik
Date: Mon Feb  2 20:00:39 2009
New Revision: 740092

URL: http://svn.apache.org/viewvc?rev=740092&view=rev
Log:
UIMA-1286 Closes idle connections to broker if not used within a  defined interval

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=740092&r1=740091&r2=740092&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon Feb  2 20:00:39 2009
@@ -22,6 +22,7 @@
 import java.io.ByteArrayOutputStream;
 import java.net.ConnectException;
 import java.net.InetAddress;
+import java.nio.channels.AsynchronousCloseException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -1518,6 +1519,39 @@
 		}
 		return false;
 	}
+	
+	private void dispatch( Message aMessage,  Endpoint anEndpoint, CacheEntry entry, boolean isRequest, JmsEndpointConnection_impl endpointConnection, long msgSize ) throws Exception {
+	  //  Add stats
+    populateStats(aMessage, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
+    if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                  "dispatch", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+                  new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+    }
+	  //  By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+	  //  endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+	  //  the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+	  //  managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+	  //  by anEndpoint.getDestination != null, we dont start the timer.
+	  boolean startConnectionTimer = isRequest ? false : true;  // connection time is for replies
+	  // ----------------------------------------------------
+	  //  Send Request Messsage to the Endpoint
+	  // ----------------------------------------------------
+	  //  Add the CAS to the delegate's list of CASes pending reply. Do the add before
+	  //  the send to eliminate a race condition where the reply is received (on different
+	  //  thread) *before* the CAS is added to the list.
+	  if ( isRequest ) {
+	    anEndpoint.setWaitingForResponse(true);
+	    // Add CAS to the list of CASes pending reply
+	    addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
+	  } else {
+	    addIdleTime(aMessage);
+	  }
+	  if ( endpointConnection.send(aMessage, msgSize, startConnectionTimer) == false ) {
+	    removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
+	  }
+
+	}
 	private void sendCasToRemoteEndpoint( boolean isRequest, String aSerializedCAS, String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence) 
 	throws AsynchAEException, ServiceShutdownException
 	{
@@ -1587,48 +1621,7 @@
 					}
 				}
 			}
-
-			//	Add stats
-			populateStats(tm, anEndpoint, aCasReferenceId, AsynchAEMessage.Process, isRequest);
-			if ( startTimer)
-			{
-				//	Start a timer for this request. The amount of time to wait
-				//	for response is provided in configuration for the endpoint
-				anEndpoint.startProcessRequestTimer(aCasReferenceId);
-			}
-      if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
-                    new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
-      }
-			//	By default start a timer associated with a connection to the endpoint. Once a connection is established with an
-			//	endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
-			//	the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
-			//	managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
-			//	by anEndpoint.getDestination != null, we dont start the timer.
-			boolean startConnectionTimer = true;
-			
-			if ( anEndpoint.getDestination() != null || !isRequest )
-			{
-				startConnectionTimer = false;
-			}
-			// ----------------------------------------------------
-			//	Send Request Messsage to the Endpoint
-			// ----------------------------------------------------
-      //  Add the CAS to the delegate's list of CASes pending reply. Do the add before
-      //  the send to eliminate a race condition where the reply is received (on different
-      //  thread) *before* the CAS is added to the list.
-      if ( isRequest ) {
-        // Add CAS to the list of CASes pending reply
-        addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
-      } else {
-        addIdleTime(tm);
-      }
-      if ( endpointConnection.send(tm, msgSize, startConnectionTimer) == false ) {
-        System.out.println("Send Failed");
-        removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
-      }
-        
+      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
 		}
 		catch( JMSException e)
 		{
@@ -1707,7 +1700,6 @@
         }
         if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
         {
-//          CacheEntry cacheEntry = getCacheEntry(aCasReferenceId);
           if ( entry != null )
           {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
@@ -1716,47 +1708,7 @@
           }
         }
       }
-
-      //  Add stats
-      populateStats(tm, anEndpoint, aCasReferenceId, AsynchAEMessage.Process, isRequest);
-      if ( startTimer)
-      {
-        //  Start a timer for this request. The amount of time to wait
-        //  for response is provided in configuration for the endpoint
-        anEndpoint.startProcessRequestTimer(aCasReferenceId);
-      }
-      if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
-                    new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
-      }
-      //  By default start a timer associated with a connection to the endpoint. Once a connection is established with an
-      //  endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
-      //  the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
-      //  managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
-      //  by anEndpoint.getDestination != null, we dont start the timer.
-      boolean startConnectionTimer = true;
-      
-      if ( anEndpoint.getDestination() != null || !isRequest )
-      {
-        startConnectionTimer = false;
-      }
-      // ----------------------------------------------------
-      //  Send Request Messsage to the Endpoint
-      // ----------------------------------------------------
-      //  Add the CAS to the delegate's list of CASes pending reply. Do the add before
-      //  the send to eliminate a race condition where the reply is received (on different
-      //  thread) *before* the CAS is added to the list.
-      if ( isRequest ) {
-        // Add CAS to the list of CASes pending reply
-        addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
-      } else {
-        addIdleTime(tm);
-      }
-      if ( endpointConnection.send(tm, msgSize, startConnectionTimer) == false ) {
-        System.out.println("Send Failed");
-        removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
-      }
+      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
     }
     catch( JMSException e)
     {
@@ -1855,47 +1807,8 @@
 				}
 			}
 
-			//	Add stats
-			populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
-			if ( startTimer)
-			{
-				//	Start a timer for this request. The amount of time to wait
-				//	for response is provided in configuration for the endpoint
-				anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
-			}
-      if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
-                    new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
-      }
-			//	By default start a timer associated with a connection to the endpoint. Once a connection is established with an
-			//	endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
-			//	the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
-			//	managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
-			//	by anEndpoint.getDestination != null, we dont start the timer.
-			boolean startConnectionTimer = true;
-			
-			if ( anEndpoint.getDestination() != null || !isRequest )
-			{
-				startConnectionTimer = false;
-			}
-			
-			// ----------------------------------------------------
-			//	Send Request Messsage to the Endpoint
-			// ----------------------------------------------------
-      //  Add the CAS to the delegate's list of CASes pending reply. Do the add before
-      //  the send to eliminate a race condition where the reply is received (on different
-      //  thread) *before* the CAS is added to the list.
-			if ( isRequest ) {
-	      // Add CAS to the list of CASes pending reply
-	      addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
-			} else {
-        addIdleTime(tm);
-			}
-      if ( endpointConnection.send(tm, msgSize, startConnectionTimer) == false ) {
-        System.out.println("Send Failed");
-        removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
-      }
+      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
+
 		}
 		catch( JMSException e)
 		{
@@ -1956,7 +1869,6 @@
       else
       {
         populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
-//        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
       }
       if ( casStateEntry == null ) {
         return;
@@ -1990,50 +1902,8 @@
                         new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
         }
       }
-
-      //  Add stats
-      populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
-      if ( startTimer)
-      {
-        //  Start a timer for this request. The amount of time to wait
-        //  for response is provided in configuration for the endpoint
-        anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
-      }
-      if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
-                    new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
-      }
-      //  By default start a timer associated with a connection to the endpoint. Once a connection is established with an
-      //  endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
-      //  the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
-      //  managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
-      //  by anEndpoint.getDestination != null, we dont start the timer.
-      boolean startConnectionTimer = true;
-      
-      if ( anEndpoint.getDestination() != null || !isRequest )
-      {
-        startConnectionTimer = false;
-      }
+      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
       
-      // ----------------------------------------------------
-      //  Send Request Messsage to the Endpoint
-      // ----------------------------------------------------
-      //  Add the CAS to the delegate's list of CASes pending reply. Do the add before
-      //  the send to eliminate a race condition where the reply is received (on different
-      //  thread) *before* the CAS is added to the list.
-      if ( isRequest ) {
-        // Add CAS to the list of CASes pending reply
-        addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
-      } else {
-        addIdleTime(tm);
-      }
-      //  Send the message to the delegate. If the send fails, remove the CAS id
-      //  from the delegate's list of CASes pending reply.
-      if ( endpointConnection.send(tm, msgSize, startConnectionTimer) == false ) {
-        System.out.println("Send Failed");
-        removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
-      }
     }
     catch( JMSException e)
     {
@@ -2044,7 +1914,6 @@
                     new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
       }
     }
-
     catch( ServiceShutdownException e)
     {
       throw e;