You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2017/07/13 19:31:22 UTC
svn commit: r1801867 - in /uima/uima-as/trunk:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/
Author: cwiklik
Date: Thu Jul 13 19:31:22 2017
New Revision: 1801867
URL: http://svn.apache.org/viewvc?rev=1801867&view=rev
Log:
UIMA-5477 improved error handling in the UIMA-AS client to recover from lost broker connection
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1801867&r1=1801866&r2=1801867&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Thu Jul 13 19:31:22 2017
@@ -26,6 +26,7 @@ import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -34,9 +35,18 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.UimaMessageValidator;
import org.apache.uima.adapter.jms.JmsConstants;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
+import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.SerialFormat;
import org.apache.uima.util.Level;
+import org.apache.uima.util.impl.ProcessTrace_impl;
/**
* Initializes JMS session and creates JMS MessageProducer to be used for sending messages to a
@@ -57,7 +67,8 @@ public class ActiveMQMessageSender exten
private String destinationName = null;
- private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>();
+ private ConcurrentHashMap<Destination, MessageProducer> producerMap =
+ new ConcurrentHashMap<Destination, MessageProducer>();
public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
@@ -135,6 +146,12 @@ public class ActiveMQMessageSender exten
throw e;
}
}
+ /**
+ * Returns the full name of the destination queue
+ */
+ protected String getDestinationEndpoint() throws Exception {
+ return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
+ }
/**
* Creates a jms session object used to instantiate message producer
@@ -144,12 +161,6 @@ public class ActiveMQMessageSender exten
producer = getMessageProducer(session.createQueue(destinationName));
}
- /**
- * Returns the full name of the destination queue
- */
- protected String getDestinationEndpoint() throws Exception {
- return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
- }
/**
* Returns jsm MessageProducer
@@ -236,4 +247,161 @@ public class ActiveMQMessageSender exten
producerMap.clear();
}
}
+ protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) throws Exception {
+ SharedConnection sc =
+ engine.lookupConnection(engine.getBrokerURI());
+ ClientRequest cacheEntry = null;
+ boolean doCallback = false;
+ boolean addTimeToLive = true;
+ Session jmsSession = null;
+
+ // Check the environment for existence of NoTTL tag. If present,
+ // the deployer of the service wants to disable message expiration.
+ if (System.getProperty("NoTTL") != null) {
+ addTimeToLive = false;
+ }
+ try {
+ long t1 = System.currentTimeMillis();
+ jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Request JMS Message from the concrete implementation
+ Message message = null;
+ // Determine if this a CAS Process Request
+// boolean casProcessRequest = isProcessRequest(pm);
+ // Only Process request can be serialized as binary
+ if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
+ message = jmsSession.createBytesMessage();
+ } else {
+ message = jmsSession.createTextMessage();
+ }
+ // get the producer initialized from a valid connection
+ // producer = getMessageProducer();
+ Destination d = jmsSession.createQueue(destinationName);
+ MessageProducer mProducer = jmsSession.createProducer(d);
+ mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ //System.out.println(">>>>>>> Time to create and initialize JMS Sesssion:"+(System.currentTimeMillis()-t1));
+ super.initializeMessage(pm, message);
+ String destination = ((ActiveMQDestination) d).getPhysicalName();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_msg_to_endpoint__FINE",
+ new Object[] {
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
+ .getIntProperty(AsynchAEMessage.Command)),
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
+ .getIntProperty(AsynchAEMessage.MessageType)), destination });
+ }
+ if (casProcessRequest) {
+ cacheEntry = (ClientRequest) engine.getCache().get(
+ pm.get(AsynchAEMessage.CasReference));
+ if (cacheEntry != null) {
+ // CAS cas = cacheEntry.getCAS();
+ // enable logging
+ if (System.getProperty("UimaAsCasTracking") != null) {
+ message.setStringProperty("UimaAsCasTracking", "enable");
+ }
+
+ // Use Process Timeout value for the time-to-live property in the
+ // outgoing JMS message. When this time is exceeded
+ // while the message sits in a queue, the JMS Server will remove it from
+ // the queue. What happens with the expired message depends on the
+ // configuration. Most JMS Providers create a special dead-letter queue
+ // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
+ // are not auto evicted yet and accumulate taking up memory.
+ long timeoutValue = cacheEntry.getProcessTimeout();
+
+ if (timeoutValue > 0 && addTimeToLive ) {
+ // Set high time to live value
+ message.setJMSExpiration(10 * timeoutValue);
+ }
+ if (pm.getMessageType() == AsynchAEMessage.Process) {
+ cacheEntry.setCASDepartureTime(System.nanoTime());
+ }
+ cacheEntry.setCASDepartureTime(System.nanoTime());
+
+ doCallback = true;
+
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.WARNING,
+ CLASS_NAME.getName(),
+ "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_failed_cache_lookup__WARNING",
+ new Object[] {
+ pm.get(AsynchAEMessage.CasReference),
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
+ .getIntProperty(AsynchAEMessage.Command)),
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
+ .getIntProperty(AsynchAEMessage.MessageType)), destination });
+ }
+ }
+
+ }
+ // start timers
+ if( casProcessRequest ) {
+ CAS cas = cacheEntry.getCAS();
+
+
+ // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+ engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(), engine.timerPerCAS); // true=timer per cas
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_cas_added_to_pending_FINE", new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), engine.serviceDelegate.toString()});
+ }
+
+
+ } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta &&
+ engine.serviceDelegate.getGetMetaTimeout() > 0 ) {
+ // timer for PING has been started in sendCAS()
+ if ( !engine.serviceDelegate.isAwaitingPingReply()) {
+ engine.serviceDelegate.startGetMetaRequestTimer();
+ }
+ } else {
+ doCallback = false; // dont call onBeforeMessageSend() callback on CPC
+ }
+ // Dispatch asynchronous request to Uima AS service
+ mProducer.send(message);
+
+ if ( doCallback ) {
+ UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),cacheEntry.getCAS(),
+ cacheEntry.getCasReferenceId());
+ // Notify engine before sending a message
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_calling_onBeforeMessageSend__FINE",
+ new Object[] {
+ pm.get(AsynchAEMessage.CasReference),
+ String.valueOf(cacheEntry.getCAS().hashCode())
+ });
+ }
+ // Note the callback is a misnomer. The callback is made *after* the send now
+ // Application receiving this callback can consider the CAS as delivere to a queue
+ engine.onBeforeMessageSend(status);
+
+
+ }
+ } finally {
+ if ( jmsSession != null ) {
+ try {
+ jmsSession.close();
+ } catch( Exception eee) {
+
+ }
+ }
+ }
+
+
+ }
}
\ No newline at end of file
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=1801867&r1=1801866&r2=1801867&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Thu Jul 13 19:31:22 2017
@@ -86,7 +86,9 @@ public abstract class BaseMessageSender
// Releases resources
protected abstract void cleanup() throws Exception;
-
+
+ protected abstract void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) throws Exception;
+
// Returns the name of the destination
protected abstract String getDestinationEndpoint() throws Exception;
@@ -306,7 +308,7 @@ public abstract class BaseMessageSender
}
// get the producer initialized from a valid connection
- producer = getMessageProducer();
+// producer = getMessageProducer();
// Check if the request should be rejected. It would be the case if the connection was invalid and
// subsequently recovered. If it was invalid, we went through error handling and the request is stale.
if ( !rejectRequest && engine.running) {
@@ -318,6 +320,24 @@ public abstract class BaseMessageSender
"UIMAJMS_client_dispatching_getmeta_ping__INFO", new Object[] { });
}
}
+
+ // NEW CODE 07/12/2017
+ while( engine.running ) {
+ try {
+ // blocks until the connection is re-established with a broker
+ engine.recoverSharedConnectionIfClosed();
+ SharedConnection sc =
+ engine.lookupConnection(engine.getBrokerURI());
+ dispatchMessage(pm,engine,isProcessRequest(pm));
+
+ break;
+ } catch( Exception exx) {
+
+ }
+ }
+
+
+ /*
try {
// Request JMS Message from the concrete implementation
Message message = null;
@@ -329,7 +349,9 @@ public abstract class BaseMessageSender
} else {
message = createTextMessage();
}
-
+ // get the producer initialized from a valid connection
+ producer = getMessageProducer();
+
initializeMessage(pm, message);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
@@ -448,7 +470,7 @@ public abstract class BaseMessageSender
}
reject(pm,e);
}
-
+*/
}
}
try {
@@ -462,7 +484,7 @@ public abstract class BaseMessageSender
}
}
- private void initializeMessage(PendingMessage aPm, Message anOutgoingMessage) throws Exception {
+ protected void initializeMessage(PendingMessage aPm, Message anOutgoingMessage) throws Exception {
// Populate message properties based on outgoing message type
switch (aPm.getMessageType()) {
case AsynchAEMessage.GetMeta:
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1801867&r1=1801866&r2=1801867&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Thu Jul 13 19:31:22 2017
@@ -3264,7 +3264,7 @@ public abstract class BaseUIMAAsynchrono
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_retry__INFO",
new Object[] { brokerURL });
}
- if ( e instanceof JMSException && e.getMessage().endsWith("Connection refused") ) {
+ if ( e instanceof JMSException && e.getMessage().indexOf("Connection refused") > 0) {
log = false;
System.out.println("Uima AS Client:"+e.getMessage()+" Retrying every 5 seconds until successfull");