You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/02/02 21:00:40 UTC
svn commit: r740092 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Author: cwiklik
Date: Mon Feb 2 20:00:39 2009
New Revision: 740092
URL: http://svn.apache.org/viewvc?rev=740092&view=rev
Log:
UIMA-1286 Closes idle connections to broker if not used within a defined interval
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=740092&r1=740091&r2=740092&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon Feb 2 20:00:39 2009
@@ -22,6 +22,7 @@
import java.io.ByteArrayOutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
+import java.nio.channels.AsynchronousCloseException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -1518,6 +1519,39 @@
}
return false;
}
+
+ private void dispatch( Message aMessage, Endpoint anEndpoint, CacheEntry entry, boolean isRequest, JmsEndpointConnection_impl endpointConnection, long msgSize ) throws Exception {
+ // Add stats
+ populateStats(aMessage, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
+ if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "dispatch", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+ new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+ }
+ // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+ // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+ // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+ // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+ // by anEndpoint.getDestination != null, we dont start the timer.
+ boolean startConnectionTimer = isRequest ? false : true; // connection time is for replies
+ // ----------------------------------------------------
+ // Send Request Messsage to the Endpoint
+ // ----------------------------------------------------
+ // Add the CAS to the delegate's list of CASes pending reply. Do the add before
+ // the send to eliminate a race condition where the reply is received (on different
+ // thread) *before* the CAS is added to the list.
+ if ( isRequest ) {
+ anEndpoint.setWaitingForResponse(true);
+ // Add CAS to the list of CASes pending reply
+ addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
+ } else {
+ addIdleTime(aMessage);
+ }
+ if ( endpointConnection.send(aMessage, msgSize, startConnectionTimer) == false ) {
+ removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
+ }
+
+ }
private void sendCasToRemoteEndpoint( boolean isRequest, String aSerializedCAS, String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence)
throws AsynchAEException, ServiceShutdownException
{
@@ -1587,48 +1621,7 @@
}
}
}
-
- // Add stats
- populateStats(tm, anEndpoint, aCasReferenceId, AsynchAEMessage.Process, isRequest);
- if ( startTimer)
- {
- // Start a timer for this request. The amount of time to wait
- // for response is provided in configuration for the endpoint
- anEndpoint.startProcessRequestTimer(aCasReferenceId);
- }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
- new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
- }
- // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
- // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
- // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
- // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
- // by anEndpoint.getDestination != null, we dont start the timer.
- boolean startConnectionTimer = true;
-
- if ( anEndpoint.getDestination() != null || !isRequest )
- {
- startConnectionTimer = false;
- }
- // ----------------------------------------------------
- // Send Request Messsage to the Endpoint
- // ----------------------------------------------------
- // Add the CAS to the delegate's list of CASes pending reply. Do the add before
- // the send to eliminate a race condition where the reply is received (on different
- // thread) *before* the CAS is added to the list.
- if ( isRequest ) {
- // Add CAS to the list of CASes pending reply
- addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
- } else {
- addIdleTime(tm);
- }
- if ( endpointConnection.send(tm, msgSize, startConnectionTimer) == false ) {
- System.out.println("Send Failed");
- removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
- }
-
+ dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
}
catch( JMSException e)
{
@@ -1707,7 +1700,6 @@
}
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
{
-// CacheEntry cacheEntry = getCacheEntry(aCasReferenceId);
if ( entry != null )
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
@@ -1716,47 +1708,7 @@
}
}
}
-
- // Add stats
- populateStats(tm, anEndpoint, aCasReferenceId, AsynchAEMessage.Process, isRequest);
- if ( startTimer)
- {
- // Start a timer for this request. The amount of time to wait
- // for response is provided in configuration for the endpoint
- anEndpoint.startProcessRequestTimer(aCasReferenceId);
- }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
- new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
- }
- // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
- // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
- // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
- // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
- // by anEndpoint.getDestination != null, we dont start the timer.
- boolean startConnectionTimer = true;
-
- if ( anEndpoint.getDestination() != null || !isRequest )
- {
- startConnectionTimer = false;
- }
- // ----------------------------------------------------
- // Send Request Messsage to the Endpoint
- // ----------------------------------------------------
- // Add the CAS to the delegate's list of CASes pending reply. Do the add before
- // the send to eliminate a race condition where the reply is received (on different
- // thread) *before* the CAS is added to the list.
- if ( isRequest ) {
- // Add CAS to the list of CASes pending reply
- addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
- } else {
- addIdleTime(tm);
- }
- if ( endpointConnection.send(tm, msgSize, startConnectionTimer) == false ) {
- System.out.println("Send Failed");
- removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
- }
+ dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
}
catch( JMSException e)
{
@@ -1855,47 +1807,8 @@
}
}
- // Add stats
- populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
- if ( startTimer)
- {
- // Start a timer for this request. The amount of time to wait
- // for response is provided in configuration for the endpoint
- anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
- }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
- new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
- }
- // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
- // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
- // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
- // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
- // by anEndpoint.getDestination != null, we dont start the timer.
- boolean startConnectionTimer = true;
-
- if ( anEndpoint.getDestination() != null || !isRequest )
- {
- startConnectionTimer = false;
- }
-
- // ----------------------------------------------------
- // Send Request Messsage to the Endpoint
- // ----------------------------------------------------
- // Add the CAS to the delegate's list of CASes pending reply. Do the add before
- // the send to eliminate a race condition where the reply is received (on different
- // thread) *before* the CAS is added to the list.
- if ( isRequest ) {
- // Add CAS to the list of CASes pending reply
- addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
- } else {
- addIdleTime(tm);
- }
- if ( endpointConnection.send(tm, msgSize, startConnectionTimer) == false ) {
- System.out.println("Send Failed");
- removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
- }
+ dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
+
}
catch( JMSException e)
{
@@ -1956,7 +1869,6 @@
else
{
populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
-// tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
}
if ( casStateEntry == null ) {
return;
@@ -1990,50 +1902,8 @@
new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
}
}
-
- // Add stats
- populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
- if ( startTimer)
- {
- // Start a timer for this request. The amount of time to wait
- // for response is provided in configuration for the endpoint
- anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
- }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
- new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
- }
- // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
- // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
- // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
- // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
- // by anEndpoint.getDestination != null, we dont start the timer.
- boolean startConnectionTimer = true;
-
- if ( anEndpoint.getDestination() != null || !isRequest )
- {
- startConnectionTimer = false;
- }
+ dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
- // ----------------------------------------------------
- // Send Request Messsage to the Endpoint
- // ----------------------------------------------------
- // Add the CAS to the delegate's list of CASes pending reply. Do the add before
- // the send to eliminate a race condition where the reply is received (on different
- // thread) *before* the CAS is added to the list.
- if ( isRequest ) {
- // Add CAS to the list of CASes pending reply
- addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
- } else {
- addIdleTime(tm);
- }
- // Send the message to the delegate. If the send fails, remove the CAS id
- // from the delegate's list of CASes pending reply.
- if ( endpointConnection.send(tm, msgSize, startConnectionTimer) == false ) {
- System.out.println("Send Failed");
- removeCasFromOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
- }
}
catch( JMSException e)
{
@@ -2044,7 +1914,6 @@
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
}
}
-
catch( ServiceShutdownException e)
{
throw e;