You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/11/10 22:06:50 UTC
svn commit: r834666 -
/incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
Author: cwiklik
Date: Tue Nov 10 21:06:50 2009
New Revision: 834666
URL: http://svn.apache.org/viewvc?rev=834666&view=rev
Log:
UIMA-1643 Modified to support recovery from lost connection to a broker. Before sending a request the code checks the state of a connection and enters a loop retrying a connection every 5ms.
Modified:
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
Modified: incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=834666&r1=834665&r2=834666&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Tue Nov 10 21:06:50 2009
@@ -24,6 +24,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
@@ -36,13 +37,19 @@
import org.apache.uima.aae.client.UimaASProcessStatus;
import org.apache.uima.aae.client.UimaASProcessStatusImpl;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.UimaMessageValidator;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.cas.CAS;
+import org.apache.uima.jms.error.handler.BrokerConnectionException;
import org.apache.uima.util.Level;
+import org.apache.uima.util.ProcessTrace;
import org.apache.uima.util.impl.ProcessTrace_impl;
/**
@@ -89,7 +96,10 @@
// Returns the name of the destination
protected abstract String getDestinationEndpoint() throws Exception;
+ public abstract void setConnection(Connection connection);
+
private MessageProducer producer = null;
+
public BaseMessageSender(BaseUIMAAsynchronousEngineCommon_impl anEngine) {
messageQueue = anEngine.pendingMessageQueue;
@@ -129,7 +139,77 @@
public boolean failed() {
return workerThreadFailed;
}
-
+ /**
+ * This method determines if a given request message should be rejected or not. Rejected in a sense
+ * that it will not be sent to the destination due to the fact the broker connection is down.
+ */
+ private boolean reject(PendingMessage pm ) {
+ return reject(pm, new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+engine.sharedConnection.getBroker()+" Has Been Lost"));
+ }
+
+ /**
+ * This method determines if a given request message should be rejected or not. Rejected in a sense
+ * that it will not be sent to the destination due to the fact the broker connection is down.
+ */
+ private boolean reject( PendingMessage pm, Exception e ) {
+ boolean rejectRequest = false;
+ // If the connection to a broker was lost, notify the client
+ // and reject the request unless this is getMeta Ping request.
+ if ( !engine.sharedConnection.isConnectionValid() ) {
+ String messageKind = "";
+ if (pm.getMessageType() == AsynchAEMessage.GetMeta ) {
+ messageKind = "GetMeta";
+ } else if (pm.getMessageType() == AsynchAEMessage.Process ) {
+ messageKind = "Process";
+ } else if (pm.getMessageType() == AsynchAEMessage.CollectionProcessComplete ) {
+ messageKind = "CollectionProcessComplete";
+ }
+ rejectRequest = true;
+ try {
+ // Is this is a Process request
+ if (pm.getMessageType() == AsynchAEMessage.Process) {
+ // fetch the cache entry for this CAS
+ ClientRequest cacheEntry = (ClientRequest) engine.getCache().get(
+ pm.get(AsynchAEMessage.CasReference));
+ // We are rejecting any Process requests until connection to broker
+ // is recovered
+ cacheEntry.setProcessException();
+ // if the request was via synchronous API dont notify listeners
+ // instead the code will throw exception to the client
+ boolean notifyListener = (cacheEntry.isSynchronousInvocation() == false);
+ // handle rejected request
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+ "reject", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_rejected_process_request_broker_down__INFO", new Object[] { messageKind });
+ }
+ // Sets the state of a remote delegate to handle subsequent requests
+ engine.serviceDelegate.setState(Delegate.TIMEOUT_STATE);
+ // handle exception but dont rethrow the exception
+ engine.handleException(e, cacheEntry.getCasReferenceId(), null, cacheEntry, notifyListener, false);
+ } else {
+ // Dont handle GetMeta Ping. Let it flow through. The Ping will be done once the connection is recovered
+ if ( !engine.serviceDelegate.isAwaitingPingReply()) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+ "reject", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_rejected_nonprocess_request_broker_down__INFO", new Object[] { messageKind });
+ }
+ engine.handleNonProcessException(e);
+ } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta ){
+ rejectRequest = false; // Don't reject GetMeta Ping
+ }
+ }
+ } catch( Exception ex ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "reject", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", new Object[] { ex });
+ }
+ }
+ }
+ return rejectRequest;
+ }
/**
* Initializes jms message producer and starts the main thread. This thread waits for messages
* enqueued by application threads. The application thread adds a jms message to the
@@ -171,11 +251,11 @@
engine.onProducerInitialized();
- producer = getMessageProducer();
// Wait for messages from application threads. The uima ee client engine
// will call doStop() which sets the global flag 'done' to true.
PendingMessage pm = null;
+ ClientRequest cacheEntry = null;
while (!done) {
// Remove the oldest message from the shared 'queue'
// // Wait for a new message
@@ -186,66 +266,104 @@
if (done) {
break; // done in this loop
}
-
- try {
- // Request JMS Message from the concrete implementation
- Message message = null;
- // Determine if this a CAS Process Request
- boolean casProcessRequest = isProcessRequest(pm);
- // Only Process request can be serialized as binary
- if (casProcessRequest && engine.getSerializationStrategy().equals("binary")) {
- message = createBytesMessage();
- } else {
- message = createTextMessage();
- }
-
- initializeMessage(pm, message);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "run",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_sending_msg_to_endpoint__FINE",
- new Object[] {
- UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
- .getIntProperty(AsynchAEMessage.Command)),
- UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
- .getIntProperty(AsynchAEMessage.MessageType)), destination });
- }
- if (casProcessRequest) {
- ClientRequest cacheEntry = (ClientRequest) engine.getCache().get(
- pm.get(AsynchAEMessage.CasReference));
- if (cacheEntry != null) {
- // Use Process Timeout value for the time-to-live property in the
- // outgoing JMS message. When this time is exceeded
- // while the message sits in a queue, the JMS Server will remove it from
- // the queue. What happens with the expired message depends on the
- // configuration. Most JMS Providers create a special dead-letter queue
- // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
- // are not auto evicted yet and accumulate taking up memory.
- long timeoutValue = cacheEntry.getProcessTimeout();
-
- if (timeoutValue > 0 && addTimeToLive ) {
- // Set high time to live value
- //producer.setTimeToLive(10 * timeoutValue);
- message.setJMSExpiration(10 * timeoutValue);
- }
- if (pm.getMessageType() == AsynchAEMessage.Process) {
- cacheEntry.setCASDepartureTime(System.nanoTime());
+ // Check if the request should be rejected. If the connection to the broker is invalid and the request
+ // is not GetMeta Ping, reject the request after the connection is made. The reject() method created
+ // an exception and notified client of the fact that the request could not continue. The ping is a
+ // special case that we dont reject even though the broker connection has been lost. It is allow to
+ // fall through and will be sent as soon as the connection is recovered.
+ boolean rejectRequest = reject(pm);
+ // blocks until the connection is re-established with a broker
+ engine.recoverSharedConnectionIfClosed();
+ // get the producer initialized from a valid connection
+ producer = getMessageProducer();
+ // Check if the request should be rejected. It would be the case if the connection was invalid and
+ // subsequently recovered. If it was invalid, we went through error handling and the request is stale.
+ if ( !rejectRequest && engine.running) {
+ if ( engine.serviceDelegate.isAwaitingPingReply() &&
+ pm.getMessageType() == AsynchAEMessage.GetMeta ){
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+ "run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_dispatching_getmeta_ping__INFO", new Object[] { });
}
- UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),
- cacheEntry.getCasReferenceId());
- // Notify engine before sending a message
- engine.onBeforeMessageSend(status);
- }
- }
+ }
+ try {
+ // Request JMS Message from the concrete implementation
+ Message message = null;
+ // Determine if this a CAS Process Request
+ boolean casProcessRequest = isProcessRequest(pm);
+ // Only Process request can be serialized as binary
+ if (casProcessRequest && engine.getSerializationStrategy().equals("binary")) {
+ message = createBytesMessage();
+ } else {
+ message = createTextMessage();
+ }
+
+ initializeMessage(pm, message);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_msg_to_endpoint__FINE",
+ new Object[] {
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
+ .getIntProperty(AsynchAEMessage.Command)),
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
+ .getIntProperty(AsynchAEMessage.MessageType)), destination });
+ }
+ if (casProcessRequest) {
+ cacheEntry = (ClientRequest) engine.getCache().get(
+ pm.get(AsynchAEMessage.CasReference));
+ if (cacheEntry != null) {
+ // Use Process Timeout value for the time-to-live property in the
+ // outgoing JMS message. When this time is exceeded
+ // while the message sits in a queue, the JMS Server will remove it from
+ // the queue. What happens with the expired message depends on the
+ // configuration. Most JMS Providers create a special dead-letter queue
+ // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
+ // are not auto evicted yet and accumulate taking up memory.
+ long timeoutValue = cacheEntry.getProcessTimeout();
+
+ if (timeoutValue > 0 && addTimeToLive ) {
+ // Set high time to live value
+ message.setJMSExpiration(10 * timeoutValue);
+ }
+ if (pm.getMessageType() == AsynchAEMessage.Process) {
+ cacheEntry.setCASDepartureTime(System.nanoTime());
+ }
+ cacheEntry.setCASDepartureTime(System.nanoTime());
+ UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),
+ cacheEntry.getCasReferenceId());
+ // Notify engine before sending a message
+ engine.onBeforeMessageSend(status);
+ }
+ }
+ // start timers
+ if( casProcessRequest ) {
+ // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+ engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId());
+ } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta &&
+ engine.serviceDelegate.getGetMetaTimeout() > 0 ) {
+ // timer for PING has been started in sendCAS()
+ if ( !engine.serviceDelegate.isAwaitingPingReply()) {
+ engine.serviceDelegate.startGetMetaRequestTimer();
+ }
+ }
+ // Dispatch asynchronous request to Uima AS service
+ producer.send(message);
+
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", new Object[] { e });
+ }
+ reject(pm,e);
+ }
- producer.send(message);
- } catch (Exception e) {
- handleException(e, destination);
}
-
}
try {
cleanup();
@@ -293,6 +411,7 @@
"handleException", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
+ engine.recoverSharedConnectionIfClosed();
// Notify the engine that there was an exception.
engine.onException(e, aDestination);