You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:16:24 UTC
svn commit: r810547 [3/5] -
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Sep 2 15:16:23 2009
@@ -76,238 +76,224 @@
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.util.Level;
-public class JmsOutputChannel implements OutputChannel
-{
-
- private static final Class CLASS_NAME = JmsOutputChannel.class;
- private static final long INACTIVITY_TIMEOUT = 1800000; // 30 minutes in term of millis
- private CountDownLatch controllerLatch = new CountDownLatch(1);
-
- private ActiveMQConnectionFactory connectionFactory;
- // Name of the external queue this service uses to receive messages
- private String serviceInputEndpoint;
- // Name of the internal queue this services uses to receive messages from delegates
- private String controllerInputEndpoint;
- // Name of the queue used by Cas Multiplier to receive requests to free CASes
- private String secondaryInputEndpoint;
- // The service controller
- private AnalysisEngineController analysisEngineController;
- // Cache containing connections to destinations this service interacts with
- // Each entry in this cache has an inactivity timer that times amount of time
- // elapsed since the last time a message was sent to the destination.
- private ConcurrentHashMap connectionMap = new ConcurrentHashMap();
-
- private String serverURI;
-
- private String serviceProtocolList ="";
-
- private volatile boolean aborting = false;
-
- private Destination freeCASTempQueue;
+public class JmsOutputChannel implements OutputChannel {
+
+ private static final Class CLASS_NAME = JmsOutputChannel.class;
+
+ private static final long INACTIVITY_TIMEOUT = 1800000; // 30 minutes in term of millis
+
+ private CountDownLatch controllerLatch = new CountDownLatch(1);
+
+ private ActiveMQConnectionFactory connectionFactory;
+
+ // Name of the external queue this service uses to receive messages
+ private String serviceInputEndpoint;
+
+ // Name of the internal queue this services uses to receive messages from delegates
+ private String controllerInputEndpoint;
+
+ // Name of the queue used by Cas Multiplier to receive requests to free CASes
+ private String secondaryInputEndpoint;
+
+ // The service controller
+ private AnalysisEngineController analysisEngineController;
+
+ // Cache containing connections to destinations this service interacts with
+ // Each entry in this cache has an inactivity timer that times amount of time
+ // elapsed since the last time a message was sent to the destination.
+ private ConcurrentHashMap connectionMap = new ConcurrentHashMap();
+
+ private String serverURI;
+
+ private String serviceProtocolList = "";
+
+ private volatile boolean aborting = false;
+
+ private Destination freeCASTempQueue;
private String hostIP = null;
+
private UimaSerializer uimaSerializer = new UimaSerializer();
- // By default every message will have expiration time added
+
+ // By default every message will have expiration time added
private volatile boolean addTimeToLive = true;
- public JmsOutputChannel()
- {
- try
- {
+ public JmsOutputChannel() {
+ try {
hostIP = InetAddress.getLocalHost().getHostAddress();
+ } catch (Exception e) { /* silently deal with this error */
}
- catch ( Exception e) { /* silently deal with this error */ }
- // Check the environment for existence of NoTTL tag. If present,
- // the deployer of the service wants to avoid message expiration.
- if ( System.getProperty("NoTTL") != null) {
+ // Check the environment for existence of NoTTL tag. If present,
+ // the deployer of the service wants to avoid message expiration.
+ if (System.getProperty("NoTTL") != null) {
addTimeToLive = false;
}
}
- /**
- * Sets the ActiveMQ Broker URI
- */
- public void setServerURI( String aServerURI )
- {
- serverURI = aServerURI;
- }
- protected void setFreeCasQueue( Destination destination)
- {
- freeCASTempQueue = destination;
- }
- public String getServerURI()
- {
- return System.getProperty("BrokerURI");
- }
- public String getName()
- {
- return "";
- }
- /**
- *
- * @param connectionFactory
- */
- public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory)
- {
- this.connectionFactory = connectionFactory;
- }
-
- public void setServiceInputEndpoint(String anEnpoint)
- {
- serviceInputEndpoint = anEnpoint;
- }
-
- public void setSecondaryInputQueue( String anEndpoint )
- {
- secondaryInputEndpoint = anEndpoint;
- }
-
- public ActiveMQConnectionFactory getConnectionFactory()
- {
- return this.connectionFactory;
- }
-
-
- public void initialize() throws AsynchAEException
- {
- if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController )
- {
+
+ /**
+ * Sets the ActiveMQ Broker URI
+ */
+ public void setServerURI(String aServerURI) {
+ serverURI = aServerURI;
+ }
+
+ protected void setFreeCasQueue(Destination destination) {
+ freeCASTempQueue = destination;
+ }
+
+ public String getServerURI() {
+ return System.getProperty("BrokerURI");
+ }
+
+ public String getName() {
+ return "";
+ }
+
+ /**
+ *
+ * @param connectionFactory
+ */
+ public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public void setServiceInputEndpoint(String anEnpoint) {
+ serviceInputEndpoint = anEnpoint;
+ }
+
+ public void setSecondaryInputQueue(String anEndpoint) {
+ secondaryInputEndpoint = anEndpoint;
+ }
+
+ public ActiveMQConnectionFactory getConnectionFactory() {
+ return this.connectionFactory;
+ }
+
+ public void initialize() throws AsynchAEException {
+ if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connector_list__FINE",
- new Object[] { System.getProperty("ActiveMQConnectors") });
- }
- // Aggregate controller set this System property at startup in
- // org.apache.uima.adapter.jms.service.UIMA_Service.startInternalBroker()
- serviceProtocolList = System.getProperty("ActiveMQConnectors");
- }
-
- try
- {
- String uri = System.getProperty("BrokerURI");
- setServerURI(uri);
- }
- catch( Exception e)
- {
- e.printStackTrace();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "initialize",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connector_list__FINE",
+ new Object[] { System.getProperty("ActiveMQConnectors") });
+ }
+ // Aggregate controller set this System property at startup in
+ // org.apache.uima.adapter.jms.service.UIMA_Service.startInternalBroker()
+ serviceProtocolList = System.getProperty("ActiveMQConnectors");
+ }
+
+ try {
+ String uri = System.getProperty("BrokerURI");
+ setServerURI(uri);
+ } catch (Exception e) {
+ e.printStackTrace();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
- new Object[] { JmsConstants.threadName(), e });
+ "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
+ new Object[] { JmsConstants.threadName(), e });
}
- }
-
- }
- /**
- * Serializes CAS using indicated Serializer.
- *
- * @param aCAS - CAS instance to serialize
- * @param aSerializerKey - a key identifying which serializer to use
- * @return - String - serialized CAS as String
- * @throws Exception
- */
- public String serializeCAS(boolean isReply, CAS aCAS, String aCasReferenceId, String aSerializerKey) throws Exception
- {
-
- long start = getAnalysisEngineController().getCpuTime();
-
- String serializedCas = null;
-
- if ( isReply || "xmi".equalsIgnoreCase(aSerializerKey ) )
- {
- CacheEntry cacheEntry =
- getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-
- XmiSerializationSharedData serSharedData;
+ }
+
+ }
+
+ /**
+ * Serializes CAS using indicated Serializer.
+ *
+ * @param aCAS
+ * - CAS instance to serialize
+ * @param aSerializerKey
+ * - a key identifying which serializer to use
+ * @return - String - serialized CAS as String
+ * @throws Exception
+ */
+ public String serializeCAS(boolean isReply, CAS aCAS, String aCasReferenceId,
+ String aSerializerKey) throws Exception {
+
+ long start = getAnalysisEngineController().getCpuTime();
+
+ String serializedCas = null;
+
+ if (isReply || "xmi".equalsIgnoreCase(aSerializerKey)) {
+ CacheEntry cacheEntry = getAnalysisEngineController().getInProcessCache()
+ .getCacheEntryForCAS(aCasReferenceId);
+
+ XmiSerializationSharedData serSharedData;
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "serializeCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_serialize_cas__FINE",
- new Object[] {aCasReferenceId});
- }
- if ( isReply )
- {
- serSharedData = cacheEntry.getDeserSharedData();
- if (cacheEntry.acceptsDeltaCas()) {
- serializedCas = uimaSerializer.serializeCasToXmi(aCAS, serSharedData, cacheEntry.getMarker());
- cacheEntry.setSentDeltaCas(true);
- } else {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "serializeCAS",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_serialize_cas__FINE",
+ new Object[] { aCasReferenceId });
+ }
+ if (isReply) {
+ serSharedData = cacheEntry.getDeserSharedData();
+ if (cacheEntry.acceptsDeltaCas()) {
+ serializedCas = uimaSerializer.serializeCasToXmi(aCAS, serSharedData, cacheEntry
+ .getMarker());
+ cacheEntry.setSentDeltaCas(true);
+ } else {
serializedCas = uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
- cacheEntry.setSentDeltaCas(false);
- }
- }
- else
- {
- serSharedData = cacheEntry.getDeserSharedData();
- if (serSharedData == null) {
- serSharedData = new XmiSerializationSharedData();
- cacheEntry.setXmiSerializationData(serSharedData);
- }
+ cacheEntry.setSentDeltaCas(false);
+ }
+ } else {
+ serSharedData = cacheEntry.getDeserSharedData();
+ if (serSharedData == null) {
+ serSharedData = new XmiSerializationSharedData();
+ cacheEntry.setXmiSerializationData(serSharedData);
+ }
serializedCas = uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
- int maxOutgoingXmiId = serSharedData.getMaxXmiId();
- // Save High Water Mark in case a merge is needed
- getAnalysisEngineController().
- getInProcessCache().
- getCacheEntryForCAS(aCasReferenceId).
- setHighWaterMark(maxOutgoingXmiId);
- }
-
- }
- else if ( "xcas".equalsIgnoreCase(aSerializerKey))
- {
- // Default is XCAS
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try
- {
+ int maxOutgoingXmiId = serSharedData.getMaxXmiId();
+ // Save High Water Mark in case a merge is needed
+ getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId)
+ .setHighWaterMark(maxOutgoingXmiId);
+ }
+
+ } else if ("xcas".equalsIgnoreCase(aSerializerKey)) {
+ // Default is XCAS
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
uimaSerializer.serializeToXCAS(bos, aCAS, null, null, null);
- serializedCas = bos.toString();
- }
- catch ( Exception e)
- {
- throw e;
- }
- finally
- {
- bos.close();
- }
- }
-
- LongNumericStatistic statistic;
- if ( (statistic = getAnalysisEngineController().getMonitor().getLongNumericStatistic("",Monitor.TotalSerializeTime)) != null )
- {
- statistic.increment(getAnalysisEngineController().getCpuTime() - start);
- }
-
- return serializedCas;
- }
- /**
- * This method verifies that the destination (queue) exists. It opens
- * a connection the a broker, creates a session and a message producer.
- * Finally, using the message producer, sends an empty message to
- * a queue. This API support enables checking for existence of the
- * reply (temp) queue before any processing of a cas is done. This is
- * an optimization to prevent expensive processing if the client
- * destination is no longer available.
- */
- public void bindWithClientEndpoint( Endpoint anEndpoint ) throws Exception
- {
- // check if the reply endpoint is a temp destination
- if ( anEndpoint.getDestination() != null )
- {
- // create message producer if one doesnt exist for this destination
- JmsEndpointConnection_impl endpointConnection =
- getEndpointConnection(anEndpoint);
- // Create empty message
- TextMessage tm = endpointConnection.produceTextMessage("");
- // test sending a message to reply endpoint. This tests existence of
- // a temp queue. If the client has been shutdown, this will fail
- // with an exception.
- endpointConnection.send(tm, 0, false);
- }
- }
- private long getInactivityTimeout( String destination, String brokerURL ) {
+ serializedCas = bos.toString();
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ bos.close();
+ }
+ }
+
+ LongNumericStatistic statistic;
+ if ((statistic = getAnalysisEngineController().getMonitor().getLongNumericStatistic("",
+ Monitor.TotalSerializeTime)) != null) {
+ statistic.increment(getAnalysisEngineController().getCpuTime() - start);
+ }
+
+ return serializedCas;
+ }
+
+ /**
+ * This method verifies that the destination (queue) exists. It opens a connection the a broker,
+ * creates a session and a message producer. Finally, using the message producer, sends an empty
+ * message to a queue. This API support enables checking for existence of the reply (temp) queue
+ * before any processing of a cas is done. This is an optimization to prevent expensive processing
+ * if the client destination is no longer available.
+ */
+ public void bindWithClientEndpoint(Endpoint anEndpoint) throws Exception {
+ // check if the reply endpoint is a temp destination
+ if (anEndpoint.getDestination() != null) {
+ // create message producer if one doesnt exist for this destination
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ // Create empty message
+ TextMessage tm = endpointConnection.produceTextMessage("");
+ // test sending a message to reply endpoint. This tests existence of
+ // a temp queue. If the client has been shutdown, this will fail
+ // with an exception.
+ endpointConnection.send(tm, 0, false);
+ }
+ }
+
+ private long getInactivityTimeout(String destination, String brokerURL) {
if (System.getProperty(JmsConstants.SessionTimeoutOverride) != null) {
try {
- long overrideTimeoutValue = Long.parseLong(System.getProperty(JmsConstants.SessionTimeoutOverride));
+ long overrideTimeoutValue = Long.parseLong(System
+ .getProperty(JmsConstants.SessionTimeoutOverride));
// endpointConnection.setInactivityTimeout(overrideTimeoutValue); // If the connection is
// not used within this interval it will be removed
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -318,7 +304,7 @@
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_override_connection_timeout__FINE",
new Object[] { analysisEngineController, overrideTimeoutValue, destination,
- brokerURL });
+ brokerURL });
}
return overrideTimeoutValue;
} catch (NumberFormatException e) {
@@ -327,38 +313,42 @@
// endpointConnection.setInactivityTimeout(INACTIVITY_TIMEOUT); // If the connection is not
// used within this interval it will be removed
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "getEndpointConnection",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_connection_timeout__FINE",
- new Object[] { analysisEngineController, INACTIVITY_TIMEOUT, destination,
- brokerURL });
- }
- }
- return (int)INACTIVITY_TIMEOUT; // default
- }
- /**
- * Returns {@link JmsEndpointConnection_impl} instance bound to a destination defined in the {@link Endpoint}
- * The endpoint identifies the destination that should receive the message. This method refrences a cache
- * that stores active connections. Active connections are those that are fully bound and being used for
- * communication. The key to locate the entry in the connection cache is the queue name + broker URI. This
- * uniquely identifies the destination. If an entry does not exist in the cache, this routine will create
- * a new connection, initialize it, and cache it for future use. The cache is purely for optimization, to
- * prevent openinig a connection for every message which is a costly operation. Instead the connection is
- * open, cached and reused. The {@link JmsEndpointConnection_impl} instance is stored in the cache, and
- * uses a timer to make sure stale connection are removed. If a connection is not used in a given time
- * interval, the connection is considered stale and is dropped from the cache.
- *
- * @param anEndpoint - endpoint configuration containing connection information to a destination
- * @return -
- * @throws AsynchAEException
- */
- private synchronized JmsEndpointConnection_impl getEndpointConnection( Endpoint anEndpoint )
- throws AsynchAEException, ServiceShutdownException, ConnectException {
-
- try {
+ UIMAFramework.getLogger(CLASS_NAME)
+ .logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "getEndpointConnection",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_connection_timeout__FINE",
+ new Object[] { analysisEngineController, INACTIVITY_TIMEOUT, destination,
+ brokerURL });
+ }
+ }
+ return (int) INACTIVITY_TIMEOUT; // default
+ }
+
+ /**
+ * Returns {@link JmsEndpointConnection_impl} instance bound to a destination defined in the
+ * {@link Endpoint} The endpoint identifies the destination that should receive the message. This
+ * method refrences a cache that stores active connections. Active connections are those that are
+ * fully bound and being used for communication. The key to locate the entry in the connection
+ * cache is the queue name + broker URI. This uniquely identifies the destination. If an entry
+ * does not exist in the cache, this routine will create a new connection, initialize it, and
+ * cache it for future use. The cache is purely for optimization, to prevent openinig a connection
+ * for every message which is a costly operation. Instead the connection is open, cached and
+ * reused. The {@link JmsEndpointConnection_impl} instance is stored in the cache, and uses a
+ * timer to make sure stale connection are removed. If a connection is not used in a given time
+ * interval, the connection is considered stale and is dropped from the cache.
+ *
+ * @param anEndpoint
+ * - endpoint configuration containing connection information to a destination
+ * @return -
+ * @throws AsynchAEException
+ */
+ private synchronized JmsEndpointConnection_impl getEndpointConnection(Endpoint anEndpoint)
+ throws AsynchAEException, ServiceShutdownException, ConnectException {
+
+ try {
controllerLatch.await();
} catch (InterruptedException e) {
}
@@ -369,10 +359,12 @@
BrokerConnectionEntry brokerConnectionEntry = null;
if (connectionMap.containsKey(anEndpoint.getServerURI())) {
brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(anEndpoint.getServerURI());
- // Findbugs thinks that the above may return null, perhaps due to a race condition. Add
- // the null check just in case
- if ( brokerConnectionEntry == null ) {
- throw new AsynchAEException("Controller:"+getAnalysisEngineController().getComponentName()+" Unable to Lookup Broker Connection For URL:"+anEndpoint.getServerURI());
+ // Findbugs thinks that the above may return null, perhaps due to a race condition. Add
+ // the null check just in case
+ if (brokerConnectionEntry == null) {
+ throw new AsynchAEException("Controller:"
+ + getAnalysisEngineController().getComponentName()
+ + " Unable to Lookup Broker Connection For URL:" + anEndpoint.getServerURI());
}
} else {
brokerConnectionEntry = new BrokerConnectionEntry();
@@ -381,7 +373,7 @@
connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
brokerConnectionEntry.setConnectionTimer(connectionTimer);
}
-
+
// create a key to lookup the endpointConnection object
String key = anEndpoint.getEndpoint() + anEndpoint.getServerURI();
String destination = anEndpoint.getEndpoint();
@@ -413,9 +405,11 @@
new Object[] { getAnalysisEngineController().getComponentName(), destination,
anEndpoint.getServerURI() });
}
- endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint, getAnalysisEngineController());
+ endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
+ getAnalysisEngineController());
brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
- long replyQueueInactivityTimeout = getInactivityTimeout( destination, anEndpoint.getServerURI() );
+ long replyQueueInactivityTimeout = getInactivityTimeout(destination, anEndpoint
+ .getServerURI());
brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
// Connection is not in the cache, create a new connection, initialize it and cache it
@@ -425,14 +419,14 @@
"UIMAJMS_open_new_connection_to_endpoint__FINE",
new Object[] { destination, anEndpoint.getServerURI() });
}
-
+
/**
* Open connection to a broker, create JMS session and MessageProducer
*/
endpointConnection.open();
- brokerConnectionEntry.getConnectionTimer()
- .setConnectionCreationTimestamp(endpointConnection.connectionCreationTimestamp);
-
+ brokerConnectionEntry.getConnectionTimer().setConnectionCreationTimestamp(
+ endpointConnection.connectionCreationTimestamp);
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -472,1724 +466,1693 @@
}
}
return endpointConnection;
- }
+ }
+
+ /**
+ * Sends request message to a delegate.
+ *
+ * @param aCommand
+ * - the type of request [Process|GetMeta]
+ * @param anEndpoint
+ * - the destination where the delegate receives messages
+ *
+ * @throws AsynchAEException
+ */
+ public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint)
+ throws AsynchAEException {
+ try {
- /**
- * Sends request message to a delegate.
- *
- * @param aCommand - the type of request [Process|GetMeta]
- * @param anEndpoint - the destination where the delegate receives messages
- *
- * @throws AsynchAEException
- */
- public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException
- {
- try
- {
-
- JmsEndpointConnection_impl endpointConnection =
- getEndpointConnection(anEndpoint);
-
- TextMessage tm = endpointConnection.produceTextMessage("");
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
- if ( aCommand == AsynchAEMessage.ReleaseCAS || aCommand == AsynchAEMessage.Stop)
- {
-
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_release_cas_req__FINE", new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),aCasReferenceId });
- }
- }
- // Only used to send a Stop or ReleaseCas request so probably no need to start a connection timer ?
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+
+ TextMessage tm = endpointConnection.produceTextMessage("");
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+ populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
+ if (aCommand == AsynchAEMessage.ReleaseCAS || aCommand == AsynchAEMessage.Stop) {
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_release_cas_req__FINE",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),
+ aCasReferenceId });
+ }
+ }
+ // Only used to send a Stop or ReleaseCas request so probably no need to start a connection
+ // timer ?
endpointConnection.send(tm, 0, true);
- }
- catch( JMSException e)
- {
- // Unable to establish connection to the endpoint. Logit and continue
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
- }
- }
- catch( ServiceShutdownException e)
- {
- e.printStackTrace();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_shutdown__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
- }
- }
- catch ( Exception e)
- {
- throw new AsynchAEException(e);
- }
- }
- /**
- * Sends request message to a delegate.
- *
- * @param aCommand - the type of request [Process|GetMeta]
- * @param anEndpoint - the destination where the delegate receives messages
- *
- * @throws AsynchAEException
- */
- public void sendRequest(int aCommand, Endpoint anEndpoint)
- {
- Delegate delegate = null;
- try
- {
- JmsEndpointConnection_impl endpointConnection =
- getEndpointConnection(anEndpoint);
-
- TextMessage tm = endpointConnection.produceTextMessage("");
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- tm.setText(""); // Need this to prevent the Broker from throwing an exception when sending a message to C++ service
-
- populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
-
- // For remotes add a special property to the message. This property
- // will be echoed back by the service. This property enables matching
- // the reply with the right endpoint object managed by the aggregate.
- if ( anEndpoint.isRemote() )
- {
- tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
- }
- boolean startTimer = false;
- // Start timer for endpoints that are remote and are managed by a different broker
- // than this service. If an endpoint contains a destination object, the outgoing
- // request will contain a JMSReplyTo object which will point to a temp queue
- if ( anEndpoint.isRemote() && anEndpoint.getDestination() == null)
- {
- startTimer = true;
- }
- if ( aCommand == AsynchAEMessage.CollectionProcessComplete )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest",
- UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_send_cpc_req__FINE", new Object[] { anEndpoint.getEndpoint() });
- }
- }
- else if ( aCommand == AsynchAEMessage.ReleaseCAS )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_releasecas_request__endpoint__FINEST",
- new Object[] {getAnalysisEngineController().getName(), endpointConnection.getEndpoint()});
- }
- }
- else if ( aCommand == AsynchAEMessage.GetMeta )
- {
- if ( anEndpoint.getDestination() != null )
- {
- String replyQueueName = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName().replaceAll(":","_");
- if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController )
- {
- String delegateKey =
- ((AggregateAnalysisEngineController)getAnalysisEngineController()).lookUpDelegateKey(anEndpoint.getEndpoint());
- ServiceInfo serviceInfo =((AggregateAnalysisEngineController)getAnalysisEngineController()).getDelegateServiceInfo(delegateKey);
- if (serviceInfo != null )
- {
- serviceInfo.setReplyQueueName(replyQueueName);
- serviceInfo.setServiceKey(delegateKey);
- }
- delegate = lookupDelegate(delegateKey);
- if ( delegate.getGetMetaTimeout() > 0 ) {
- delegate.startGetMetaRequestTimer();
- }
- }
- }
- else if ( !anEndpoint.isRemote())
- {
- ServiceInfo serviceInfo =((AggregateAnalysisEngineController)getAnalysisEngineController()).getServiceInfo();
- if (serviceInfo != null )
- {
- serviceInfo.setReplyQueueName(controllerInputEndpoint);
- }
- }
- }
- else
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_metadata_request__endpoint__FINEST",
- new Object[] { endpointConnection.getEndpoint(), endpointConnection.getServerUri() });
- }
- }
- if ( endpointConnection.send(tm, 0, startTimer) != true )
- {
+ } catch (JMSException e) {
+ // Unable to establish connection to the endpoint. Logit and continue
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
+ }
+ } catch (ServiceShutdownException e) {
+ e.printStackTrace();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_shutdown__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
+ }
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+
+ /**
+ * Sends request message to a delegate.
+ *
+ * @param aCommand
+ * - the type of request [Process|GetMeta]
+ * @param anEndpoint
+ * - the destination where the delegate receives messages
+ *
+ * @throws AsynchAEException
+ */
+ public void sendRequest(int aCommand, Endpoint anEndpoint) {
+ Delegate delegate = null;
+ try {
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+
+ TextMessage tm = endpointConnection.produceTextMessage("");
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ tm.setText(""); // Need this to prevent the Broker from throwing an exception when sending a
+ // message to C++ service
+
+ populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
+
+ // For remotes add a special property to the message. This property
+ // will be echoed back by the service. This property enables matching
+ // the reply with the right endpoint object managed by the aggregate.
+ if (anEndpoint.isRemote()) {
+ tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
+ }
+ boolean startTimer = false;
+ // Start timer for endpoints that are remote and are managed by a different broker
+ // than this service. If an endpoint contains a destination object, the outgoing
+ // request will contain a JMSReplyTo object which will point to a temp queue
+ if (anEndpoint.isRemote() && anEndpoint.getDestination() == null) {
+ startTimer = true;
+ }
+ if (aCommand == AsynchAEMessage.CollectionProcessComplete) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_send_cpc_req__FINE", new Object[] { anEndpoint.getEndpoint() });
+ }
+ } else if (aCommand == AsynchAEMessage.ReleaseCAS) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_releasecas_request__endpoint__FINEST",
+ new Object[] { getAnalysisEngineController().getName(),
+ endpointConnection.getEndpoint() });
+ }
+ } else if (aCommand == AsynchAEMessage.GetMeta) {
+ if (anEndpoint.getDestination() != null) {
+ String replyQueueName = ((ActiveMQDestination) anEndpoint.getDestination())
+ .getPhysicalName().replaceAll(":", "_");
+ if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
+ String delegateKey = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+ .lookUpDelegateKey(anEndpoint.getEndpoint());
+ ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+ .getDelegateServiceInfo(delegateKey);
+ if (serviceInfo != null) {
+ serviceInfo.setReplyQueueName(replyQueueName);
+ serviceInfo.setServiceKey(delegateKey);
+ }
+ delegate = lookupDelegate(delegateKey);
+ if (delegate.getGetMetaTimeout() > 0) {
+ delegate.startGetMetaRequestTimer();
+ }
+ }
+ } else if (!anEndpoint.isRemote()) {
+ ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+ .getServiceInfo();
+ if (serviceInfo != null) {
+ serviceInfo.setReplyQueueName(controllerInputEndpoint);
+ }
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_metadata_request__endpoint__FINEST",
+ new Object[] { endpointConnection.getEndpoint(),
+ endpointConnection.getServerUri() });
+ }
+ }
+ if (endpointConnection.send(tm, 0, startTimer) != true) {
throw new ServiceNotFoundException();
}
-
- }
- catch ( Exception e)
- {
- if ( delegate != null && aCommand == AsynchAEMessage.GetMeta )
- {
- delegate.cancelDelegateTimer();
- }
- // Handle the error
- ErrorContext errorContext = new ErrorContext();
- errorContext.add(AsynchAEMessage.Command, aCommand);
- errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
- getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext, getAnalysisEngineController());
- }
- }
-
- public void sendRequest( String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException
- {
-
+
+ } catch (Exception e) {
+ if (delegate != null && aCommand == AsynchAEMessage.GetMeta) {
+ delegate.cancelDelegateTimer();
+ }
+ // Handle the error
+ ErrorContext errorContext = new ErrorContext();
+ errorContext.add(AsynchAEMessage.Command, aCommand);
+ errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
+ getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext,
+ getAnalysisEngineController());
+ }
+ }
+
+ public void sendRequest(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
- new Object[] { anEndpoint.getEndpoint() });
- }
- try
- {
- if (anEndpoint.isRemote())
- {
- if ( anEndpoint.getSerializer().equals("xmi")) {
- String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
- if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST",
- new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS });
- }
- // Send process request to remote delegate and start timeout timer
- sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
- } else {
- byte[] serializedCAS = getBinaryCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
- if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_binary_cas__FINEST",
- new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS });
- }
-
-
- // Send process request to remote delegate and start timeout timer
- sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
-
- }
-
- }
- else
- {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+ new Object[] { anEndpoint.getEndpoint() });
+ }
+ try {
+ if (anEndpoint.isRemote()) {
+ if (anEndpoint.getSerializer().equals("xmi")) {
+ String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, anEndpoint,
+ anEndpoint.isRetryEnabled());
+ if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_serialized_cas__FINEST",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ anEndpoint.getEndpoint(), aCasReferenceId, serializedCAS });
+ }
+ // Send process request to remote delegate and start timeout timer
+ sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
+ } else {
+ byte[] serializedCAS = getBinaryCasAndReleaseIt(false, aCasReferenceId, anEndpoint,
+ anEndpoint.isRetryEnabled());
+ if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_binary_cas__FINEST",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ anEndpoint.getEndpoint(), aCasReferenceId, serializedCAS });
+ }
+
+ // Send process request to remote delegate and start timeout timer
+ sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
+
+ }
+
+ } else {
// Not supported
- }
- }
- catch( ServiceShutdownException e)
- {
- e.printStackTrace();
- }
-
- catch ( AsynchAEException e)
- {
- throw e;
- }
- catch ( Exception e)
- {
- throw new AsynchAEException(e);
- }
- }
- /**
- * Sends request message to process CAS to the given destinations. This method enables
- * processing of the same CAS in multiple Analysis Engines at the same time.
- *
- * @param aCasReferenceId
- * @param anEndpoint
- * @throws AsynchAEException
- */
- public void sendRequest( String aCasReferenceId, Endpoint[] endpoints) throws AsynchAEException
- {
- Endpoint currentEndpoint = null;
- try
- {
- boolean cacheSerializedCas = endpointRetryEnabled(endpoints);
+ }
+ } catch (ServiceShutdownException e) {
+ e.printStackTrace();
+ }
+
+ catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+
+ /**
+ * Sends request message to process CAS to the given destinations. This method enables processing
+ * of the same CAS in multiple Analysis Engines at the same time.
+ *
+ * @param aCasReferenceId
+ * @param anEndpoint
+ * @throws AsynchAEException
+ */
+ public void sendRequest(String aCasReferenceId, Endpoint[] endpoints) throws AsynchAEException {
+ Endpoint currentEndpoint = null;
+ try {
+ boolean cacheSerializedCas = endpointRetryEnabled(endpoints);
// The default serialization strategy for parallel step is xmi.
// Binary serialization doesnt support merge.
endpoints[0].setSerializer("xmi");
-
- // Serialize CAS using serializer defined in the first endpoint. All endpoints in the parallel Flow
- // must use the same format (either XCAS or XMI)
- String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, endpoints[0], cacheSerializedCas);
- // Using provided endpoints, create JMS message for each destination and sent the serialized CAS to it.
- for( int i=0; i < endpoints.length; i++)
- {
- // For remote delegates, optionally cache serialized CAS in case a retry on timeout is required.
- if (endpoints[i].isRemote())
- {
-
- currentEndpoint = endpoints[i];
- if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST",
- new Object[] { getAnalysisEngineController().getComponentName(), endpoints[i].getEndpoint(),aCasReferenceId,serializedCAS });
- }
-
-
- // The default serialization strategy for parallel step is xmi.
- // Binary serialization doesnt support merge.
- endpoints[i].setSerializer("xmi");
-
- sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, endpoints[i], true, 0);
- }
- else
- {
- // Currently this use case is not supported. Parallel processing of CAS is only supported with remote Delegates
- }
- }
- }
- catch ( Exception e)
- {
- // Handle the error
- ErrorContext errorContext = new ErrorContext();
- errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
- errorContext.add(AsynchAEMessage.Endpoint, currentEndpoint);
- errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
- getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext, getAnalysisEngineController());
- }
- }
-
- public void sendReply( CAS aCas, String anInputCasReferenceId, String aNewCasReferenceId, Endpoint anEndpoint, long sequence ) throws AsynchAEException
- {
- try
- {
- anEndpoint.setReplyEndpoint(true);
- if ( anEndpoint.isRemote() )
- {
- // Serializes CAS and releases it back to CAS Pool
- String serializedCAS = getSerializedCas(true, aNewCasReferenceId, anEndpoint, anEndpoint.isRetryEnabled());
- sendCasToRemoteEndpoint(false, serializedCAS, anInputCasReferenceId, aNewCasReferenceId, anEndpoint, false, sequence);
- }
- else
- {
+
+ // Serialize CAS using serializer defined in the first endpoint. All endpoints in the parallel
+ // Flow
+ // must use the same format (either XCAS or XMI)
+ String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, endpoints[0],
+ cacheSerializedCas);
+ // Using provided endpoints, create JMS message for each destination and sent the serialized
+ // CAS to it.
+ for (int i = 0; i < endpoints.length; i++) {
+ // For remote delegates, optionally cache serialized CAS in case a retry on timeout is
+ // required.
+ if (endpoints[i].isRemote()) {
+
+ currentEndpoint = endpoints[i];
+ if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_serialized_cas__FINEST",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ endpoints[i].getEndpoint(), aCasReferenceId, serializedCAS });
+ }
+
+ // The default serialization strategy for parallel step is xmi.
+ // Binary serialization doesnt support merge.
+ endpoints[i].setSerializer("xmi");
+
+ sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, endpoints[i], true, 0);
+ } else {
+ // Currently this use case is not supported. Parallel processing of CAS is only supported
+ // with remote Delegates
+ }
+ }
+ } catch (Exception e) {
+ // Handle the error
+ ErrorContext errorContext = new ErrorContext();
+ errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ errorContext.add(AsynchAEMessage.Endpoint, currentEndpoint);
+ errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
+ getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext,
+ getAnalysisEngineController());
+ }
+ }
+
+ public void sendReply(CAS aCas, String anInputCasReferenceId, String aNewCasReferenceId,
+ Endpoint anEndpoint, long sequence) throws AsynchAEException {
+ try {
+ anEndpoint.setReplyEndpoint(true);
+ if (anEndpoint.isRemote()) {
+ // Serializes CAS and releases it back to CAS Pool
+ String serializedCAS = getSerializedCas(true, aNewCasReferenceId, anEndpoint, anEndpoint
+ .isRetryEnabled());
+ sendCasToRemoteEndpoint(false, serializedCAS, anInputCasReferenceId, aNewCasReferenceId,
+ anEndpoint, false, sequence);
+ } else {
// Not supported
- }
- }
- catch( ServiceShutdownException e)
- {
- e.printStackTrace();
- }
- catch (AsynchAEException e)
- {
- throw e;
- }
-
- catch (Exception e)
- {
- throw new AsynchAEException(e);
- }
-
- }
-
- public void sendReply( CacheEntry entry, Endpoint anEndpoint ) throws AsynchAEException
- {
- try
- {
- anEndpoint.setReplyEndpoint(true);
- if ( anEndpoint.isRemote() )
- {
- if ( anEndpoint.getSerializer().equals("xmi")) {
- // Serializes CAS and releases it back to CAS Pool
- String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
- sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
- } else {
- byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
- if ( binaryCas == null ) {
- return;
- }
- sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
- }
-
- }
- else
- {
+ }
+ } catch (ServiceShutdownException e) {
+ e.printStackTrace();
+ } catch (AsynchAEException e) {
+ throw e;
+ }
+
+ catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+ public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
+ try {
+ anEndpoint.setReplyEndpoint(true);
+ if (anEndpoint.isRemote()) {
+ if (anEndpoint.getSerializer().equals("xmi")) {
+ // Serializes CAS and releases it back to CAS Pool
+ String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint,
+ anEndpoint.isRetryEnabled());
+ sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
+ } else {
+ byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint
+ .isRetryEnabled());
+ if (binaryCas == null) {
+ return;
+ }
+ sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
+ }
+
+ } else {
// Not supported
- }
- }
- catch( ServiceShutdownException e)
- {
- e.printStackTrace();
- }
- catch (AsynchAEException e)
- {
- throw e;
- }
-
- catch (Exception e)
- {
- throw new AsynchAEException(e);
- }
-
- }
- public void sendReply( int aCommand, Endpoint anEndpoint, String aCasReferenceId ) throws AsynchAEException {
- try
- {
- if ( aborting )
- {
+ }
+ } catch (ServiceShutdownException e) {
+ e.printStackTrace();
+ } catch (AsynchAEException e) {
+ throw e;
+ }
+
+ catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+ public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId)
+ throws AsynchAEException {
+ try {
+ if (aborting) {
return;
}
anEndpoint.setReplyEndpoint(true);
- JmsEndpointConnection_impl endpointConnection =
- getEndpointConnection(anEndpoint);
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
TextMessage tm = endpointConnection.produceTextMessage("");
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
populateHeaderWithResponseContext(tm, anEndpoint, aCommand);
tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- // If this service is a Cas Multiplier add to the message a FreeCasQueue.
- // The client may need send Stop request to that queue.
- if ( aCommand == AsynchAEMessage.ServiceInfo &&
- getAnalysisEngineController().isCasMultiplier() &&
- freeCASTempQueue != null ) {
- // Attach a temp queue to the outgoing message. This a queue where
- // Free CAS notifications need to be sent from the client
- tm.setJMSReplyTo(freeCASTempQueue);
+ // If this service is a Cas Multiplier add to the message a FreeCasQueue.
+ // The client may need send Stop request to that queue.
+ if (aCommand == AsynchAEMessage.ServiceInfo
+ && getAnalysisEngineController().isCasMultiplier() && freeCASTempQueue != null) {
+ // Attach a temp queue to the outgoing message. This a queue where
+ // Free CAS notifications need to be sent from the client
+ tm.setJMSReplyTo(freeCASTempQueue);
}
- endpointConnection.send(tm, 0, false);
+ endpointConnection.send(tm, 0, false);
addIdleTime(tm);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cpc_reply_sent__FINE",
- new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint()});
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_cpc_reply_sent__FINE",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ anEndpoint.getEndpoint() });
}
+ } catch (JMSException e) {
+ // Unable to establish connection to the endpoint. Logit and continue
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
+ new Object[] { e });
+ }
+ }
+
+ catch (ServiceShutdownException e) {
+ e.printStackTrace();
+ }
+
+ catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
}
- catch( JMSException e)
- {
- // Unable to establish connection to the endpoint. Logit and continue
+
+ }
+
+ public void sendReply(int aCommand, Endpoint anEndpoint) throws AsynchAEException {
+ anEndpoint.setReplyEndpoint(true);
+ try {
+ if (aborting) {
+ return;
+ }
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+
+ TextMessage tm = endpointConnection.produceTextMessage("");
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ populateHeaderWithResponseContext(tm, anEndpoint, aCommand);
+
+ endpointConnection.send(tm, 0, false);
+ addIdleTime(tm);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_cpc_reply_sent__FINE",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ anEndpoint.getEndpoint() });
+ }
+ } catch (JMSException e) {
+ // Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
- new Object[] { e});
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
+ new Object[] { e });
}
}
- catch( ServiceShutdownException e)
- {
+ catch (ServiceShutdownException e) {
+ e.printStackTrace();
+ }
+
+ catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+
+ /**
+ * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
+ * full stack)
+ *
+ * @param t
+ * - Throwable to include in the reply message
+ * @param anEndpoint
+ * - an endpoint to receive the reply message
+ * @param aCasReferenceId
+ * - a unique CAS reference id
+ *
+ * @throws AsynchAEException
+ */
+ public void sendReply(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
+ anEndpoint.setReplyEndpoint(true);
+ try {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_replyto_endpoint__FINE",
+ new Object[] { anEndpoint.getEndpoint(), aCasReferenceId });
+ }
+ if (anEndpoint.isRemote()) {
+ CacheEntry entry = null;
+ try {
+ entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
+ aCasReferenceId);
+
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_cas_not_found__INFO",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ anEndpoint.getEndpoint(), aCasReferenceId });
+ }
+ return;
+ }
+ if (anEndpoint.getSerializer().equals("xmi")) {
+ // Serializes CAS and releases it back to CAS Pool
+ String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
+ sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
+ } else {
+ byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint
+ .isRetryEnabled());
+ if (binaryCas == null) {
+ return;
+ }
+ sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
+ }
+ } else {
+ // Not supported
+ }
+ } catch (ServiceShutdownException e) {
e.printStackTrace();
}
- catch (AsynchAEException e)
- {
- throw e;
+ catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+
+ /**
+ * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
+ * full stack)
+ *
+ * @param t
+ * - Throwable to include in the reply message
+ * @param anEndpoint
+ * - an endpoint to receive the reply message
+ * @param aCasReferenceId
+ * - a unique CAS reference id
+ *
+ * @throws AsynchAEException
+ */
+ public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId,
+ Endpoint anEndpoint, int aCommand) throws AsynchAEException {
+ anEndpoint.setReplyEndpoint(true);
+ try {
+ Throwable wrapper = null;
+ if (!(t instanceof UimaEEServiceException)) {
+ // Strip off AsyncAEException and replace with UimaEEServiceException
+ if (t instanceof AsynchAEException && t.getCause() != null) {
+ wrapper = new UimaEEServiceException(t.getCause());
+ } else {
+ wrapper = new UimaEEServiceException(t);
+ }
+ }
+ if (aborting) {
+ return;
+ }
+ anEndpoint.setReplyEndpoint(true);
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ // Create Message that will contain serialized Exception with stack
+ ObjectMessage om = endpointConnection.produceObjectMessage();
+ if (wrapper == null) {
+ om.setObject(t);
+ } else {
+ om.setObject(wrapper);
+ }
+ // Add common header properties
+ populateHeaderWithResponseContext(om, anEndpoint, aCommand); // AsynchAEMessage.Process);
+
+ om.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
+ if (aCasReferenceId != null) {
+ om.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+ if (aParentCasReferenceId != null) {
+ om.setStringProperty(AsynchAEMessage.InputCasReference, aParentCasReferenceId);
+ }
+ }
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_exception__FINE",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
+ }
+ // Dispatch Message to destination
+ endpointConnection.send(om, 0, false);
+ addIdleTime(om);
+ } catch (JMSException e) {
+ // Unable to establish connection to the endpoint. Logit and continue
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
+ }
+ } catch (ServiceShutdownException e) {
+ e.printStackTrace();
+ } catch (AsynchAEException e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
+ }
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+
+ /**
+ *
+ * @param aMetadata
+ * @param anEndpoint
+ * @throws AsynchAEException
+ */
+ public void sendReply(ProcessingResourceMetaData aProcessingResourceMetadata,
+ Endpoint anEndpoint, boolean serialize) throws AsynchAEException {
+ if (aborting) {
+ return;
+ }
+ long msgSize = 0;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ anEndpoint.setReplyEndpoint(true);
+ // Initialize JMS connection to given endpoint
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_produce_txt_msg__FINE",
+ new Object[] {});
+ }
+ TextMessage tm = endpointConnection.produceTextMessage("");
+
+ // Collocated Aggregate components dont send metadata just empty reply
+ // Such aggregate has merged its typesystem already since it shares
+ // CasManager with its parent
+ if (serialize) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_serializing_meta__FINE", new Object[] {});
+ }
+ // Serialize metadata
+ aProcessingResourceMetadata.toXML(bos);
+ tm.setText(bos.toString());
+ msgSize = bos.toString().length();
+ }
+
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Metadata);
+ // This service supports Binary Serialization
+ tm.setIntProperty(AsynchAEMessage.Serialization, AsynchAEMessage.BinarySerialization);
+
+ populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.GetMeta);
+ if (freeCASTempQueue != null) {
+ // Attach a temp queue to the outgoing message. This a queue where
+ // Free CAS notifications need to be sent from the client
+ tm.setJMSReplyTo(freeCASTempQueue);
+ }
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_metadata_reply__endpoint__FINEST",
+ new Object[] { serviceInputEndpoint, anEndpoint.getEndpoint() });
+ }
+ endpointConnection.send(tm, msgSize, false);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ // Unable to establish connection to the endpoint. Log it and continue
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
+ }
+ }
+
+ catch (ServiceShutdownException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AsynchAEException(e);
+ } finally {
+ try {
+ bos.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ private byte[] getBinaryCas(boolean isReply, String aCasReferenceId, Endpoint anEndpoint,
+ boolean cacheSerializedCas) throws Exception {
+ CAS cas = null;
+ try {
+ byte[] serializedCAS = null;
+ // Using Cas reference Id retrieve CAS from the shared Cash
+ cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
+ ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
+ CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
+ aCasReferenceId);
+ long t1 = getAnalysisEngineController().getCpuTime();
+ // Serialize CAS for remote Delegates
+ String serializer = anEndpoint.getSerializer();
+ if (cas == null || entry == null) {
+ return null;
+ }
+ if (serializer.equals("binary")) {
+ if (entry.acceptsDeltaCas() && isReply) {
+ serializedCAS = uimaSerializer.serializeCasToBinary(cas, entry.getMarker());
+ entry.setSentDeltaCas(true);
+ } else {
+ serializedCAS = uimaSerializer.serializeCasToBinary(cas);
+ entry.setSentDeltaCas(false);
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "getBinaryCas",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_invalid_serializer__WARNING",
+ new Object[] { getAnalysisEngineController().getName(), serializer,
+ anEndpoint.getEndpoint() });
+ }
+ throw new UimaEEServiceException("Invalid Serializer:" + serializer + " For Endpoint:"
+ + anEndpoint.getEndpoint());
+ }
+ long timeToSerializeCas = getAnalysisEngineController().getCpuTime() - t1;
+
+ getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
+
+ entry.incrementTimeToSerializeCAS(timeToSerializeCas);
+ casStats.incrementCasSerializationTime(timeToSerializeCas);
+ getAnalysisEngineController().getServicePerformance().incrementCasSerializationTime(
+ timeToSerializeCas);
+ return serializedCAS;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+ private String getSerializedCas(boolean isReply, String aCasReferenceId, Endpoint anEndpoint,
+ boolean cacheSerializedCas) throws Exception {
+ CAS cas = null;
+ try {
+ String serializedCAS = null;
+ // Using Cas reference Id retrieve CAS from the shared Cash
+ cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
+ ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
+ if (cas == null) {
+ serializedCAS = getAnalysisEngineController().getInProcessCache().getSerializedCAS(
+ aCasReferenceId);
+ } else {
+ CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
+ aCasReferenceId);
+ long t1 = getAnalysisEngineController().getCpuTime();
+ // Serialize CAS for remote Delegates
+ String serializer = anEndpoint.getSerializer();
+ if (serializer == null || serializer.trim().length() == 0) {
+ serializer = "xmi";
+ }
+ serializedCAS = serializeCAS(isReply, cas, aCasReferenceId, serializer);
+ long timeToSerializeCas = getAnalysisEngineController().getCpuTime() - t1;
+ getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
+
+ entry.incrementTimeToSerializeCAS(timeToSerializeCas);
+ casStats.incrementCasSerializationTime(timeToSerializeCas);
+ getAnalysisEngineController().getServicePerformance().incrementCasSerializationTime(
+ timeToSerializeCas);
+ if (cacheSerializedCas) {
+ getAnalysisEngineController().getInProcessCache().saveSerializedCAS(aCasReferenceId,
+ serializedCAS);
+ }
+ }
+ return serializedCAS;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+
+ private byte[] getBinaryCasAndReleaseIt(boolean isReply, String aCasReferenceId,
+ Endpoint anEndpoint, boolean cacheSerializedCas) throws Exception {
+ try {
+ return getBinaryCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ } finally {
+ if (getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController
+ && anEndpoint.isRemote()) {
+ getAnalysisEngineController().dropCAS(aCasReferenceId, true);
+ }
+ }
+ }
+
+ private String getSerializedCasAndReleaseIt(boolean isReply, String aCasReferenceId,
+ Endpoint anEndpoint, boolean cacheSerializedCas) throws Exception {
+ try {
+ return getSerializedCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ } finally {
+ if (getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController
+ && anEndpoint.isRemote()) {
+ getAnalysisEngineController().dropCAS(aCasReferenceId, true);
+ }
+ }
+ }
+
+ private boolean endpointRetryEnabled(Endpoint[] endpoints) {
+ for (int i = 0; i < endpoints.length; i++) {
+ if (endpoints[i].isRetryEnabled()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void populateStats(Message aTextMessage, Endpoint anEndpoint, String aCasReferenceId,
+ int anAdminCommand, boolean isRequest) throws Exception {
+ if (anEndpoint.isFinal()) {
+ aTextMessage.setLongProperty("SENT-TIME", System.nanoTime());
+ }
+
+ if (anAdminCommand == AsynchAEMessage.Process) {
+ if (isRequest) {
+ long departureTime = System.nanoTime();
+ getAnalysisEngineController().saveTime(departureTime, aCasReferenceId,
+ anEndpoint.getEndpoint());
+ } else {
+
+ ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(
+ aCasReferenceId);
+
+ aTextMessage.setLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
+ .getRawCasSerializationTime());
+ aTextMessage.setLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
+ .getRawCasDeserializationTime());
+ aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats
+ .getRawAnalysisTime());
+ aTextMessage.setLongProperty(AsynchAEMessage.TimeWaitingForCAS,
+ getAnalysisEngineController().getServicePerformance().getTimeWaitingForCAS());
+ long iT = getAnalysisEngineController().getIdleTimeBetweenProcessCalls(
+ AsynchAEMessage.Process);
+ aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT);
+ String lookupKey = getAnalysisEngineController().getName();
+ long arrivalTime = getAnalysisEngineController().getTime(aCasReferenceId, lookupKey); // serviceInputEndpoint);
+ long timeInService = getAnalysisEngineController().getCpuTime() - arrivalTime;
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "populateStats", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_timein_service__FINEST",
+ new Object[] { serviceInputEndpoint, (double) timeInService / (double) 1000000 });
+ }
+ }
+ }
+ }
+
+ private long getCommandTimeoutValue(Endpoint anEndpoint, int aCommand) {
+ switch (aCommand) {
+ case AsynchAEMessage.GetMeta:
+ return anEndpoint.getMetadataRequestTimeout();
+ case AsynchAEMessage.Process:
+ return anEndpoint.getProcessRequestTimeout();
+ }
+ return 0; // no match for the command
+ }
+
+ /**
+ * Adds Request specific properties to the JMS Header.
+ *
+ * @param aMessage
+ * @param anEndpoint
+ * @param aCommand
+ * @throws Exception
+ */
+ private void populateHeaderWithRequestContext(Message aMessage, Endpoint anEndpoint, int aCommand)
+ throws Exception {
+ aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ aMessage.setIntProperty(AsynchAEMessage.Command, aCommand);
+ // TODO override default based on system property
+ aMessage.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
+ long timeout = getCommandTimeoutValue(anEndpoint, aCommand);
+ // If the timeout is defined in the Deployment Descriptor and
+ // the service is configured to use time to live (TTL), add
+ // JMS message expiration time. The TTL is by default always
+ // added to the message. To override this add "-DNoTTL" to the
+ // command line.
+ if (timeout > 0 && addTimeToLive) {
+ Delegate delegate = lookupDelegate(anEndpoint.getDelegateKey());
+ long ttl = timeout;
+ // How many CASes are in the list of CASes pending reply for this delegate
+ int currentOutstandingCasListSize = delegate.getCasPendingReplyListSize();
+ if (currentOutstandingCasListSize > 0) {
+ // increase the time-to-live
+ ttl *= currentOutstandingCasListSize;
+ }
+ aMessage.setJMSExpiration(ttl);
+ }
+ if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
+ aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
+ if (anEndpoint.isRemote()) {
+ String protocol = serviceProtocolList;
+ if (anEndpoint.getServerURI().trim().toLowerCase().startsWith("http")
+ || (anEndpoint.getReplyToEndpoint() != null && anEndpoint.getReplyToEndpoint()
+ .trim().length() > 0)) {
+ protocol = anEndpoint.getServerURI().trim();
+ // protocol = extractURLWithProtocol(serviceProtocolList, "http");
+
+ // get the replyto endpoint name
+ String replyTo = anEndpoint.getReplyToEndpoint();
+ if (replyTo == null && anEndpoint.getDestination() == null) {
+ throw new AsynchAEException(
+ "replyTo endpoint name not specified for HTTP-based endpoint:"
+ + anEndpoint.getEndpoint());
+ }
+ if (replyTo == null) {
+ replyTo = "";
+ }
+ aMessage.setStringProperty(AsynchAEMessage.MessageFrom, replyTo);
+
+ }
+
+ Object destination;
+ if ((destination = anEndpoint.getDestination()) != null) {
+ aMessage.setJMSReplyTo((Destination) destination);
+ aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
+ } else {
+ aMessage.setStringProperty(UIMAMessage.ServerURI, protocol);
+ }
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "populateHeaderWithRequestContext",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_new_msg_to_remote_FINE",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ anEndpoint.getServerURI(), anEndpoint.getEndpoint() });
+ }
+ } else // collocated
+ {
+ aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
+ }
+ }
+ }
+
+ /**
+ * Adds Response specific properties to the JMS Header
+ *
+ * @param aMessage
+ * @param anEndpoint
+ * @param aCommand
+ * @throws Exception
+ */
+ private void populateHeaderWithResponseContext(Message aMessage, Endpoint anEndpoint, int aCommand)
+ throws Exception {
+ aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Response);
+ aMessage.setIntProperty(AsynchAEMessage.Command, aCommand);
+ aMessage.setStringProperty(AsynchAEMessage.MessageFrom, serviceInputEndpoint);
+
+ if (anEndpoint.isRemote()) {
+ aMessage.setStringProperty(UIMAMessage.ServerURI, getServerURI());
+ if (hostIP != null) {
+ aMessage.setStringProperty(AsynchAEMessage.ServerIP, hostIP);
+ }
+ if (anEndpoint.getEndpointServer() != null) {
+ aMessage.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getEndpointServer());
+ }
+ } else {
+ aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
+ }
+ }
+
+ public AnalysisEngineController getAnalysisEngineController() {
+ return analysisEngineController;
+ }
+
+ public void setController(AnalysisEngineController analysisEngineController) {
+ this.analysisEngineController = analysisEngineController;
+ controllerLatch.countDown();
+ }
+
+ public String getControllerInputEndpoint() {
+ return controllerInputEndpoint;
+ }
+
+ public void setControllerInputEndpoint(String controllerInputEndpoint) {
+ this.controllerInputEndpoint = controllerInputEndpoint;
+ }
+
+ private void dispatch(Message aMessage, Endpoint anEndpoint, CacheEntry entry, boolean isRequest,
+ JmsEndpointConnection_impl endpointConnection, long msgSize) throws Exception {
+ // Add stats
+ populateStats(aMessage, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process,
+ isRequest);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "dispatch",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_new_msg_to_remote_FINE",
+ new Object[] { getAnalysisEngineController().getName(),
+ endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+ }
+ // By default start a timer associated with a connection to the endpoint. Once a connection is
+ // established with an
+ // endpoint it is cached and reused for subsequent messaging. If the connection is not used
+ // within a given interval
+ // the timer silently expires and closes the connection. This mechanism is similar to what Web
+ // Server does when
+ // managing sessions. In case when we want the remote delegate to respond to a temporary queue,
+ // which is implied
+ // by anEndpoint.getDestination != null, we dont start the timer.
+ boolean startConnectionTimer = isRequest ? false : true; // connection time is for replies
+ // ----------------------------------------------------
+ // Send Request Messsage to the Endpoint
+ // ----------------------------------------------------
+ // Add the CAS to the delegate's list of CASes pending reply. Do the add before
+ // the send to eliminate a race condition where the reply is received (on different
+ // thread) *before* the CAS is added to the list.
+ if (isRequest) {
+ anEndpoint.setWaitingForResponse(true);
+ // Add CAS to the list of CASes pending reply
+ addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
+ } else {
+ addIdleTime(aMessage);
}
- catch ( Exception e)
- {
- throw new AsynchAEException(e);
+ // If the send fails it returns false.
+ if (endpointConnection.send(aMessage, msgSize, startConnectionTimer) == false) {
+ // Failure on sending a request requires cleanup that includes stopping a listener
+ // on the delegate that we were unable to send a message to. The delegate state is
+ // set to FAILED. If there are retries or more CASes to send to this delegate the
+ // connection will be retried.
+ if (isRequest) {
+ // Spin recovery thread to handle send error. After the recovery thread
+ // is started the current (process) thread goes back to a thread pool in
+ // ThreadPoolExecutor. The recovery thread can than stop the listener and the
+ // ThreadPoolExecutor since all threads are back in the pool. Any retries will
+ // be done in the recovery thread.
+ RecoveryThread recoveryThread = new RecoveryThread(this, anEndpoint, entry, isRequest,
+ getAnalysisEngineController());
+ Thread t = new Thread(Thread.currentThread().getThreadGroup().getParent(), recoveryThread);
+ t.start();
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "dispatch",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_send_reply_failed__INFO",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+ }
+ }
}
-
}
- public void sendReply( int aCommand, Endpoint anEndpoint ) throws AsynchAEException
- {
- anEndpoint.setReplyEndpoint(true);
- try
- {
- if ( aborting )
- {
- return;
- }
- JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
-
- TextMessage tm = endpointConnection.produceTextMessage("");
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- populateHeaderWithResponseContext(tm, anEndpoint, aCommand);
-
- endpointConnection.send(tm, 0, false);
- addIdleTime(tm);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cpc_reply_sent__FINE",
- new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint()});
- }
- }
- catch( JMSException e)
- {
- // Unable to establish connection to the endpoint. Logit and continue
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
- new Object[] { e});
- }
- }
-
- catch( ServiceShutdownException e)
- {
- e.printStackTrace();
- }
-
- catch (AsynchAEException e)
- {
- throw e;
- }
- catch ( Exception e)
- {
- throw new AsynchAEException(e);
- }
- }
-
-
-
- /**
- * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with full stack)
- *
- * @param t - Throwable to include in the reply message
- * @param anEndpoint - an endpoint to receive the reply message
- * @param aCasReferenceId - a unique CAS reference id
- *
- * @throws AsynchAEException
- */
- public void sendReply( String aCasReferenceId, Endpoint anEndpoint ) throws AsynchAEException
- {
- anEndpoint.setReplyEndpoint(true);
- try
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_replyto_endpoint__FINE",
- new Object[] { anEndpoint.getEndpoint(), aCasReferenceId });
- }
- if ( anEndpoint.isRemote() )
- {
- CacheEntry entry = null;
- try {
- entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-
- } catch ( Exception e ) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_not_found__INFO",
- new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(), aCasReferenceId });
- }
- return;
- }
- if ( anEndpoint.getSerializer().equals("xmi")) {
- // Serializes CAS and releases it back to CAS Pool
- String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
- sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
- } else {
- byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
- if ( binaryCas == null ) {
- return;
- }
- sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
+ private void sendCasToRemoteEndpoint(boolean isRequest, String aSerializedCAS,
+ String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint,
+ boolean startTimer, long sequence) throws AsynchAEException, ServiceShutdownException {
+ long msgSize = 0;
+ try {
+ if (aborting) {
+ return;
+ }
+ CacheEntry entry = this.getCacheEntry(aCasReferenceId);
+ if (entry == null) {
+ throw new AsynchAEException("Controller:"
+ + getAnalysisEngineController().getComponentName()
+ + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
+ + " CAS:" + aCasReferenceId + " Not In The Cache");
+ }
+
+ // Get the connection object for a given endpoint
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ if (endpointConnection == null) {
+ throw new AsynchAEException("Controller:"
+ + getAnalysisEngineController().getComponentName()
[... 1867 lines stripped ...]