You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2020/06/10 13:15:10 UTC
[uima-async-scaleout] 26/34: UIMA-5501 refactored to use pluggagble
endpoints
This is an automated email from the ASF dual-hosted git repository.
cwiklik pushed a commit to branch uima-as-3
in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git
commit eec56f664844869f6072105109f0a2ee0728e767
Author: cwiklik <cwiklik>
AuthorDate: Thu Nov 29 17:29:40 2018 +0000
UIMA-5501 refactored to use pluggagble endpoints
---
.../adapter/jms/client/ActiveMQMessageSender.java | 673 ++++++++++-----------
.../client/BaseUIMAAsynchronousEngine_impl.java | 35 +-
.../apache/uima/as/dispatcher/LocalDispatcher.java | 81 ++-
.../apache/uima/ee/test/TestUimaASNoErrors.java | 9 +-
4 files changed, 439 insertions(+), 359 deletions(-)
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
index 0ec5a1f..a95be45 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
@@ -51,142 +51,138 @@ import org.apache.uima.util.Level;
import org.apache.uima.util.impl.ProcessTrace_impl;
/**
- * Initializes JMS session and creates JMS MessageProducer to be used for sending messages to a
- * given destination. It extends BaseMessageSender which starts the worker thread and is tasked with
- * sending messages. The application threads share a common 'queue' with the worker thread. The
- * application threads add messages to the pendingMessageList 'queue' and the worker thread consumes
- * them.
+ * Initializes JMS session and creates JMS MessageProducer to be used for
+ * sending messages to a given destination. It extends BaseMessageSender which
+ * starts the worker thread and is tasked with sending messages. The application
+ * threads share a common 'queue' with the worker thread. The application
+ * threads add messages to the pendingMessageList 'queue' and the worker thread
+ * consumes them.
*
*/
public class ActiveMQMessageSender extends BaseMessageSender {
- private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
-
- private volatile Connection connection = null;
-
- private Session session = null;
-
- private MessageProducer producer = null;
-
- private String destinationName = null;
-
- private ConcurrentHashMap<Destination, MessageProducer> producerMap =
- new ConcurrentHashMap<Destination, MessageProducer>();
-
- public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
- BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
- super(engine);
- connection = aConnection;
- destinationName = aDestinationName;
- }
-
- public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception {
- if (producerMap.containsKey(destination)) {
- return (MessageProducer) producerMap.get(destination);
- }
- createSession();
- MessageProducer mProducer = session.createProducer(destination);
- mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producerMap.put(destination, mProducer);
- return mProducer;
- }
- /**
- * This is called when a new Connection is created after broker is restarted
- */
- public void setConnection(Connection aConnection) {
- connection = aConnection;
- cleanup();
- try {
- initializeProducer();
- } catch( Exception e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
- "setConnection", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_exception__WARNING", e);
- }
- }
-
- }
- private String getBrokerURL() {
- try {
- return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
- } catch (Exception ex) { /* handle silently. */
- }
- return "";
- }
-
- private void createSession() throws Exception {
- String broker = getBrokerURL();
- try {
- if (session == null || engine.producerInitialized == false) {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- } catch (JMSException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_failed_creating_session_INFO",
- new Object[] { destinationName, broker });
- }
- if (connection == null) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_connection_not_ready_INFO", new Object[] { broker });
- }
- } else if (((ActiveMQConnection) connection).isClosed()
- || ((ActiveMQConnection) connection).isClosing()) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME)
- .logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_connection_closed_INFO",
- new Object[] { destinationName, broker });
- }
- }
- throw e;
- } catch (Exception e) {
- throw e;
- }
- }
- /**
- * Returns the full name of the destination queue
- */
- protected String getDestinationEndpoint() throws Exception {
- return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
- }
-
- /**
- * Creates a jms session object used to instantiate message producer
- */
- protected void initializeProducer() throws Exception {
- createSession();
- producer = getMessageProducer(session.createQueue(destinationName));
- }
-
-
- /**
- * Returns jsm MessageProducer
- */
- public MessageProducer getMessageProducer() {
- if ( engine.running && engine.producerInitialized == false ) {
- try {
- SharedConnection con = engine.lookupConnection(getBrokerURL());
- if ( con != null ) {
- setConnection(con.getConnection());
- initializeProducer();
- engine.producerInitialized = true;
- }
- } catch( Exception e) {
- e.printStackTrace();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
- "getMessageProducer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_exception__WARNING", e);
- }
- }
- }
- return producer;
- }
+ private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
+
+ private volatile Connection connection = null;
+
+ private Session session = null;
+
+ private MessageProducer producer = null;
+
+ private String destinationName = null;
+
+ private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>();
+
+ public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
+ BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
+ super(engine);
+ connection = aConnection;
+ destinationName = aDestinationName;
+ }
+
+ public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception {
+ if (producerMap.containsKey(destination)) {
+ return (MessageProducer) producerMap.get(destination);
+ }
+ createSession();
+ MessageProducer mProducer = session.createProducer(destination);
+ mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producerMap.put(destination, mProducer);
+ return mProducer;
+ }
+
+ /**
+ * This is called when a new Connection is created after broker is restarted
+ */
+ public void setConnection(Connection aConnection) {
+ connection = aConnection;
+ cleanup();
+ try {
+ initializeProducer();
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "setConnection",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ }
+
+ }
+
+ private String getBrokerURL() {
+ try {
+ return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
+ } catch (Exception ex) { /* handle silently. */
+ }
+ return "";
+ }
+
+ private void createSession() throws Exception {
+ String broker = getBrokerURL();
+ try {
+ if (session == null || engine.producerInitialized == false) {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ } catch (JMSException e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_failed_creating_session_INFO",
+ new Object[] { destinationName, broker });
+ }
+ if (connection == null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_not_ready_INFO",
+ new Object[] { broker });
+ }
+ } else if (((ActiveMQConnection) connection).isClosed() || ((ActiveMQConnection) connection).isClosing()) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO",
+ new Object[] { destinationName, broker });
+ }
+ }
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ /**
+ * Returns the full name of the destination queue
+ */
+ protected String getDestinationEndpoint() throws Exception {
+ return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
+ }
+
+ /**
+ * Creates a jms session object used to instantiate message producer
+ */
+ protected void initializeProducer() throws Exception {
+ createSession();
+ producer = getMessageProducer(session.createQueue(destinationName));
+ }
+
+ /**
+ * Returns jsm MessageProducer
+ */
+ public MessageProducer getMessageProducer() {
+ if (engine.running && engine.producerInitialized == false) {
+ try {
+ SharedConnection con = engine.lookupConnection(getBrokerURL());
+ if (con != null) {
+ setConnection(con.getConnection());
+ initializeProducer();
+ engine.producerInitialized = true;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "getMessageProducer",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ }
+ }
+ return producer;
+ }
public TextMessage createTextMessage() throws Exception {
synchronized (ActiveMQMessageSender.class) {
@@ -209,14 +205,14 @@ public class ActiveMQMessageSender extends BaseMessageSender {
}
- public BytesMessage createBytesMessage() throws Exception {
+ public BytesMessage createBytesMessage() throws Exception {
synchronized (ActiveMQMessageSender.class) {
- if (session == null) {
- // Force initialization of Producer
- initializeProducer();
- }
- BytesMessage msg = null;
- try {
+ if (session == null) {
+ // Force initialization of Producer
+ initializeProducer();
+ }
+ BytesMessage msg = null;
+ try {
msg = session.createBytesMessage();
} catch (IllegalStateException e) {
// stale Session
@@ -227,203 +223,202 @@ public class ActiveMQMessageSender extends BaseMessageSender {
return msg;
}
- // return session.createBytesMessage();
- }
-
- /**
- * Cleanup any jms resources used by the worker thread
- */
- protected void cleanup() {
- try {
- if (session != null) {
- session.close();
- session = null;
- }
- if (producer != null) {
- producer.close();
- producer = null;
- }
- } catch (Exception e) {
- // Ignore we are shutting down
- } finally {
- producerMap.clear();
- }
- }
- protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) throws Exception {
- SharedConnection sc =
- engine.lookupConnection(engine.getBrokerURI());
- ClientRequest cacheEntry = null;
- boolean doCallback = false;
- boolean addTimeToLive = true;
- Session jmsSession = null;
-
- // Check the environment for existence of NoTTL tag. If present,
- // the deployer of the service wants to disable message expiration.
- if (System.getProperty("NoTTL") != null) {
- addTimeToLive = false;
- }
- try {
-// long t1 = System.currentTimeMillis();
- jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Request JMS Message from the concrete implementation
- Message message = null;
- // Determine if this a CAS Process Request
-// boolean casProcessRequest = isProcessRequest(pm);
- // Only Process request can be serialized as binary
- if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
- message = jmsSession.createBytesMessage();
- } else {
- message = jmsSession.createTextMessage();
- }
- // get the producer initialized from a valid connection
- // producer = getMessageProducer();
-
- Destination d = null;
- String selector = null;
- // UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue
- // instead of a temp queue. Regular queues can be recovered in case of
- // a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
- // Code in JmsOutputChannel will add the selector if the service is a CM.
- if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) {
- selector = (String)pm.getPropertyAsString(AsynchAEMessage.TargetingSelector);
- }
- if ( selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS || pm.getMessageType() == AsynchAEMessage.Stop) ) {
- d = (Destination)pm.getProperty(AsynchAEMessage.Destination);
-
- } else {
- d = jmsSession.createQueue(destinationName);
- }
- MessageProducer mProducer = jmsSession.createProducer(d);
- mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- //System.out.println(">>>>>>> Time to create and initialize JMS Sesssion:"+(System.currentTimeMillis()-t1));
- super.initializeMessage(pm, message);
- String destination = ((ActiveMQDestination) d).getPhysicalName();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "run",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_sending_msg_to_endpoint__FINE",
- new Object[] {
- UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
- .getIntProperty(AsynchAEMessage.Command)),
- UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
- .getIntProperty(AsynchAEMessage.MessageType)), destination });
- }
- if (casProcessRequest) {
- cacheEntry = (ClientRequest) engine.getCache().get(
- pm.getPropertyAsString(AsynchAEMessage.CasReference));
- if (cacheEntry != null) {
- // CAS cas = cacheEntry.getCAS();
- // enable logging
- if (System.getProperty("UimaAsCasTracking") != null) {
- message.setStringProperty("UimaAsCasTracking", "enable");
- }
- // Target specific service instance if targeting for the CAS is provided
- // by the client application
- if ( cacheEntry.getTargetServiceId() != null ) {
- // System.out.println("------------Client Sending CAS to Service Instance With Id:"+cacheEntry.getTargetServiceId());;
- message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty, cacheEntry.getTargetServiceId());
- }
- // Use Process Timeout value for the time-to-live property in the
- // outgoing JMS message. When this time is exceeded
- // while the message sits in a queue, the JMS Server will remove it from
- // the queue. What happens with the expired message depends on the
- // configuration. Most JMS Providers create a special dead-letter queue
- // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
- // are not auto evicted yet and accumulate taking up memory.
- long timeoutValue = cacheEntry.getProcessTimeout();
-
- if (timeoutValue > 0 && addTimeToLive ) {
- // Set high time to live value
- message.setJMSExpiration(10 * timeoutValue);
- }
- if (pm.getMessageType() == AsynchAEMessage.Process) {
- cacheEntry.setCASDepartureTime(System.nanoTime());
- }
- cacheEntry.setCASDepartureTime(System.nanoTime());
-
- doCallback = true;
-
- } else {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.WARNING,
- CLASS_NAME.getName(),
- "run",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_failed_cache_lookup__WARNING",
- new Object[] {
- pm.getPropertyAsString(AsynchAEMessage.CasReference),
- UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
- .getIntProperty(AsynchAEMessage.Command)),
- UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
- .getIntProperty(AsynchAEMessage.MessageType)), destination });
- }
- }
-
- }
- // start timers
- if( casProcessRequest ) {
- CAS cas = cacheEntry.getCAS();
-
-
- // Add the cas to a list of CASes pending reply. Also start the timer if necessary
- engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(), engine.timerPerCAS); // true=timer per cas
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_cas_added_to_pending_FINE", new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), engine.serviceDelegate.toString()});
- }
-
-
- } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta &&
- engine.serviceDelegate.getGetMetaTimeout() > 0 ) {
- // timer for PING has been started in sendCAS()
- if ( !engine.serviceDelegate.isAwaitingPingReply()) {
- engine.serviceDelegate.startGetMetaRequestTimer();
- }
- } else {
- doCallback = false; // dont call onBeforeMessageSend() callback on CPC
- }
- // Dispatch asynchronous request to Uima AS service
- mProducer.send(message);
-
- if ( doCallback ) {
- UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),cacheEntry.getCAS(),
- cacheEntry.getCasReferenceId());
- // Notify engine before sending a message
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "run",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_calling_onBeforeMessageSend__FINE",
- new Object[] {
- pm.getPropertyAsString(AsynchAEMessage.CasReference),
- String.valueOf(cacheEntry.getCAS().hashCode())
- });
- }
- // Note the callback is a misnomer. The callback is made *after* the send now
- // Application receiving this callback can consider the CAS as delivere to a queue
- engine.onBeforeMessageSend(status);
-
-
- }
- } finally {
- if ( jmsSession != null ) {
- try {
- jmsSession.close();
- } catch( Exception eee) {
-
- }
- }
- }
-
-
- }
+ // return session.createBytesMessage();
+ }
+
+ /**
+ * Cleanup any jms resources used by the worker thread
+ */
+ protected void cleanup() {
+ try {
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ if (producer != null) {
+ producer.close();
+ producer = null;
+ }
+ } catch (Exception e) {
+ // Ignore we are shutting down
+ } finally {
+ producerMap.clear();
+ }
+ }
+
+ protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine,
+ boolean casProcessRequest) throws Exception {
+ SharedConnection sc = engine.lookupConnection(engine.getBrokerURI());
+ ClientRequest cacheEntry = null;
+ boolean doCallback = false;
+ boolean addTimeToLive = true;
+ Session jmsSession = null;
+
+ // Check the environment for existence of NoTTL tag. If present,
+ // the deployer of the service wants to disable message expiration.
+ if (System.getProperty("NoTTL") != null) {
+ addTimeToLive = false;
+ }
+ try {
+ // long t1 = System.currentTimeMillis();
+ jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Request JMS Message from the concrete implementation
+ Message message = null;
+ // Determine if this a CAS Process Request
+ // boolean casProcessRequest = isProcessRequest(pm);
+ // Only Process request can be serialized as binary
+ if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
+ message = jmsSession.createBytesMessage();
+ } else {
+ message = jmsSession.createTextMessage();
+ }
+ // get the producer initialized from a valid connection
+ // producer = getMessageProducer();
+
+ Destination d = null;
+ String selector = null;
+ // UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue
+ // instead of a temp queue. Regular queues can be recovered in case of
+ // a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
+ // Code in JmsOutputChannel will add the selector if the service is a CM.
+ if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) {
+ selector = (String) pm.getPropertyAsString(AsynchAEMessage.TargetingSelector);
+ }
+ if (selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS
+ || pm.getMessageType() == AsynchAEMessage.Stop)) {
+ d = (Destination) pm.getProperty(AsynchAEMessage.Destination);
+
+ } else {
+ d = jmsSession.createQueue(destinationName);
+ }
+ MessageProducer mProducer = jmsSession.createProducer(d);
+ mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ // System.out.println(">>>>>>> Time to create and initialize JMS
+ // Sesssion:"+(System.currentTimeMillis()-t1));
+ super.initializeMessage(pm, message);
+ String destination = ((ActiveMQDestination) d).getPhysicalName();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+ new Object[] {
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+ message.getIntProperty(AsynchAEMessage.Command)),
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+ message.getIntProperty(AsynchAEMessage.MessageType)),
+ destination });
+ }
+ if (casProcessRequest) {
+ cacheEntry = (ClientRequest) engine.getCache()
+ .get(pm.getPropertyAsString(AsynchAEMessage.CasReference));
+ if (cacheEntry != null) {
+ // CAS cas = cacheEntry.getCAS();
+ // enable logging
+ if (System.getProperty("UimaAsCasTracking") != null) {
+ message.setStringProperty("UimaAsCasTracking", "enable");
+ }
+ // Target specific service instance if targeting for the CAS is provided
+ // by the client application
+ if (cacheEntry.getTargetServiceId() != null) {
+ // System.out.println("------------Client Sending CAS to Service Instance With
+ // Id:"+cacheEntry.getTargetServiceId());;
+ message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
+ cacheEntry.getTargetServiceId());
+ }
+ // Use Process Timeout value for the time-to-live property in the
+ // outgoing JMS message. When this time is exceeded
+ // while the message sits in a queue, the JMS Server will remove it from
+ // the queue. What happens with the expired message depends on the
+ // configuration. Most JMS Providers create a special dead-letter queue
+ // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the
+ // DLQ
+ // are not auto evicted yet and accumulate taking up memory.
+ long timeoutValue = cacheEntry.getProcessTimeout();
+
+ if (timeoutValue > 0 && addTimeToLive) {
+ // Set high time to live value
+ message.setJMSExpiration(10 * timeoutValue);
+ }
+ if (pm.getMessageType() == AsynchAEMessage.Process) {
+ cacheEntry.setCASDepartureTime(System.nanoTime());
+ }
+ cacheEntry.setCASDepartureTime(System.nanoTime());
+
+ doCallback = true;
+
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING",
+ new Object[] { pm.getPropertyAsString(AsynchAEMessage.CasReference),
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+ message.getIntProperty(AsynchAEMessage.Command)),
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+ message.getIntProperty(AsynchAEMessage.MessageType)),
+ destination });
+ }
+ return; // no cache entry, done here
+ }
+
+ }
+ // start timers
+ if (casProcessRequest) {
+ CAS cas = cacheEntry.getCAS();
+
+ // Add the cas to a list of CASes pending reply. Also start the timer if
+ // necessary
+ engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(),
+ engine.timerPerCAS); // true=timer per cas
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE",
+ new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()),
+ engine.serviceDelegate.toString() });
+ }
+
+ } else if (pm.getMessageType() == AsynchAEMessage.GetMeta
+ && engine.serviceDelegate.getGetMetaTimeout() > 0) {
+ // timer for PING has been started in sendCAS()
+ if (!engine.serviceDelegate.isAwaitingPingReply()) {
+ engine.serviceDelegate.startGetMetaRequestTimer();
+ }
+ } else {
+ doCallback = false; // dont call onBeforeMessageSend() callback on CPC
+ }
+ // Dispatch asynchronous request to Uima AS service
+ mProducer.send(message);
+
+ if (doCallback) {
+ UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(),
+ cacheEntry.getCasReferenceId());
+ // Notify engine before sending a message
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_calling_onBeforeMessageSend__FINE",
+ new Object[] { pm.getPropertyAsString(AsynchAEMessage.CasReference),
+ String.valueOf(cacheEntry.getCAS().hashCode()) });
+ }
+ // Note the callback is a misnomer. The callback is made *after* the send now
+ // Application receiving this callback can consider the CAS as delivere to a
+ // queue
+ engine.onBeforeMessageSend(status);
+
+ }
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "dispatchMessage",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ } finally {
+ if (jmsSession != null) {
+ try {
+ jmsSession.close();
+ } catch (Exception eee) {
+
+ }
+ }
+ }
+
+ }
}
\ No newline at end of file
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
index f0623fb..d5ffa93 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
@@ -70,9 +70,14 @@ import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.ControllerCallbackListener;
import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.definition.connectors.Endpoints;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
import org.apache.uima.aae.jmx.JmxManager;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.MessageProcessor;
import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.aae.service.AsynchronousUimaASService;
import org.apache.uima.aae.service.UimaASService;
@@ -495,12 +500,28 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
if ( dispatchThread == null ) {
// make sure we are in the running state. The local consumer depends on it
running = true;
-
+ UimaAsEndpoint clientEndpoint;
+ try {
+
+ MessageProcessor messageProcessor =
+ new ClientMessageProcessor();
+
+ clientEndpoint =
+ Endpoints.newEndpoint(EndpointType.Direct, EndpointType.Direct.getName()+"Client", messageProcessor);
+ // create client consumers and connect them to the service
+ // producers
+ service.connect(clientEndpoint);
+ clientEndpoint.start();
+ } catch( Exception e) {
+ throw new ResourceInitializationException(e);
+ }
+
// start message consumer to handle replies
- startLocalConsumer(anApplicationContext);
+ // startLocalConsumer(anApplicationContext);
// start dispatcher in its own thread. It will fetch messages from a shared 'pendingMessageQueue'
LocalDispatcher dispatcher =
- new LocalDispatcher(this, service, pendingMessageQueue);
+ new LocalDispatcher(this, service, pendingMessageQueue, clientEndpoint);
+
dispatchThread = new Thread(dispatcher,"LocalDispatcher");
dispatchThread.start();
}
@@ -1690,4 +1711,12 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC
}
}
+ private class ClientMessageProcessor implements MessageProcessor {
+
+ @Override
+ public void process(MessageContext message) throws Exception {
+ onMessage((DirectMessage)message.getRawMessage());
+ }
+
+ }
}
diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
index f3e03a4..a8fcd6a 100644
--- a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
+++ b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
@@ -24,7 +24,12 @@ import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.client.UimaASProcessStatus;
import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UimaAsOrigin;
import org.apache.uima.aae.service.UimaASService;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl;
@@ -39,37 +44,89 @@ public class LocalDispatcher implements Runnable {
private BlockingQueue<PendingMessage> messageQueue = null;
private BaseUIMAAsynchronousEngineCommon_impl client;
private UimaASService service;
-
+ private UimaAsEndpoint clientEndpoint;
+
public LocalDispatcher(BaseUIMAAsynchronousEngineCommon_impl client, UimaASService service,
- BlockingQueue<PendingMessage> pendingMessageQueue) {
+ BlockingQueue<PendingMessage> pendingMessageQueue, UimaAsEndpoint clientEndpoint) {
this.service = service;
this.client = client;
this.messageQueue = pendingMessageQueue;
+ this.clientEndpoint = clientEndpoint;
}
-
+
private boolean reject(PendingMessage pm) {
return false;
}
private void dispatch(PendingMessage pm) throws Exception {
boolean doCallback = false;
-
+ StringBuilder serviceUri = new StringBuilder("direct").append(":").append(service.getName());
+
switch (pm.getMessageType()) {
case AsynchAEMessage.GetMeta:
- service.sendGetMetaRequest();
+ serviceUri.append(":").append(ConsumerType.GetMetaRequest.name()).toString();
+ MessageContext getMetaMessage =
+ clientEndpoint.newMessageBuilder().
+ newGetMetaRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct))
+// newGetMetaRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct))
+ .withPayload(AsynchAEMessage.None)
+ .build();
+ clientEndpoint.dispatch(getMetaMessage, serviceUri.toString());
+
System.out.println("LocalDispatcher.dispatch()-dispatched getMeta Request");
break;
case AsynchAEMessage.Process:
doCallback = true;
- service.process((CAS) pm.getProperty(AsynchAEMessage.CAS), pm.getPropertyAsString(AsynchAEMessage.CasReference));
- System.out.println("LocalDispatcher.dispatch()-dispatched Process Request");
+ serviceUri.append(":").append(ConsumerType.ProcessCASRequest.name()).toString();
+
+ MessageContext processMessage =
+ clientEndpoint.newMessageBuilder().
+ newProcessCASRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct))
+// newProcessCASRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct))
+ .withPayload(AsynchAEMessage.CASRefID)
+ .withCasReferenceId(pm.getPropertyAsString(AsynchAEMessage.CasReference))
+ .build();
+
+ service.add2Cache((CAS) pm.getProperty(AsynchAEMessage.CAS), processMessage, pm.getPropertyAsString(AsynchAEMessage.CasReference));
+
+ clientEndpoint.dispatch(processMessage, serviceUri.toString());
+
+ System.out.println("LocalDispatcher.dispatch()-dispatched process Request");
+
+
break;
case AsynchAEMessage.CollectionProcessComplete:
- service.collectionProcessComplete();
+ serviceUri.append(":").append(ConsumerType.CpcRequest.name()).toString();
+
+ MessageContext cpcMessage =
+ clientEndpoint.newMessageBuilder().
+ newCpCRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct))
+// newCpCRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct))
+ .withPayload(AsynchAEMessage.None)
+ .build();
+ clientEndpoint.dispatch(cpcMessage, serviceUri.toString());
+
System.out.println("LocalDispatcher.dispatch()-dispatched CPC Request");
break;
+
+ case AsynchAEMessage.ReleaseCAS:
+ serviceUri.append(":").append(ConsumerType.FreeCASRequest.name()).toString();
+
+ MessageContext freeCASMessage =
+ clientEndpoint.newMessageBuilder().
+ newCpCRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct))
+// newCpCRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct))
+ .withPayload(AsynchAEMessage.CASRefID)
+ .withCasReferenceId(pm.getPropertyAsString(AsynchAEMessage.CasReference))
+ .build();
+ clientEndpoint.dispatch(freeCASMessage, serviceUri.toString());
+
+ System.out.println("LocalDispatcher.dispatch()-dispatched Free CAS Request");
+ break;
+
+
}
if ( doCallback ) {
UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),(CAS)pm.getProperty(AsynchAEMessage.CAS),
@@ -99,9 +156,9 @@ public class LocalDispatcher implements Runnable {
while (client.isRunning()) {
PendingMessage pm = null;
try {
- System.out.println("LocalDispatcher.run()- waiting for new message ... queue hashcode:"+messageQueue.hashCode());
+ System.out.println("Client LocalDispatcher.run()- waiting for new message ... queue hashcode:"+messageQueue.hashCode());
pm = messageQueue.take();
- System.out.println("LocalDispatcher.run()-got new message to dispatch");
+ System.out.println("Client LocalDispatcher.run()-got new message to dispatch");
} catch (InterruptedException e) {
return;
@@ -122,12 +179,8 @@ public class LocalDispatcher implements Runnable {
}
}
try {
- System.out.println(".................... calling LocalDispatch.beforeDispatch()");
client.beforeDispatch(pm);
-
- System.out.println(".................... calling LocalDispatch.dispatch()");
dispatch(pm);
- System.out.println(".................... LocalDispatch.dispatch() returned");
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "run",
diff --git a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
index 1976e2f..2e966a5 100644
--- a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
+++ b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
@@ -26,6 +26,7 @@ import java.io.FileWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
+import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -1229,7 +1230,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
initialize(uimaAsClient, appCtx);
waitUntilInitialized();
- for (int i = 0; i < 500; i++) {
+ for (int i = 0; i < 2; i++) {
CAS cas = uimaAsClient.getCAS();
cas.setDocumentText("Some Text");
System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
@@ -1441,6 +1442,8 @@ public class TestUimaASNoErrors extends BaseTestSupport {
@Test
public void testDeployAsyncAggregateServiceOverJava() throws Exception {
+// URL url = TestUimaASNoErrors.class.getResource("/Deploy_AsyncAggregate.xml");
+
testDeployAsyncAggregateService(Transport.Java);
}
@@ -1457,7 +1460,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
- runTest(appCtx, uimaAsClient, "tcp://localhost:61616", "TopLevelTaeQueue", 200, PROCESS_LATCH);
+ runTest(appCtx, uimaAsClient, "tcp://localhost:61616", "TopLevelTaeQueue", 2, PROCESS_LATCH);
}
@Test
@@ -1476,7 +1479,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
System.setProperty("NoOpBroker", "tcp::/localhost:61616");
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
-// deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
+ // deployService(uimaAsClient, relativePath + "/Deploy_NoOpAnnotator.xml");
deployJmsService(uimaAsClient, relativePath + "/Deploy_NoOpAnnotatorUsingPlaceholder.xml");
Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue");