You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:27:05 UTC
svn commit: r810567 [1/3] - in
/incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms:
./ client/ message/ service/
Author: cwiklik
Date: Wed Sep 2 15:27:04 2009
New Revision: 810567
URL: http://svn.apache.org/viewvc?rev=810567&view=rev
Log:
UIMA-1541 Reformatted to conform to UIMA formatting guidelines. No other changes included.
Modified:
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/JmsConstants.java
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java
incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/service/Dd2spring.java
Modified: incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/JmsConstants.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/JmsConstants.java?rev=810567&r1=810566&r2=810567&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/JmsConstants.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/JmsConstants.java Wed Sep 2 15:27:04 2009
@@ -19,15 +19,13 @@
package org.apache.uima.adapter.jms;
-public class JmsConstants
-{
- public static final String JMS_LOG_RESOURCE_BUNDLE = "jms_adapter_messages";
- public static final String SessionTimeoutOverride = "SessionTimeoutOverride";
+public class JmsConstants {
+ public static final String JMS_LOG_RESOURCE_BUNDLE = "jms_adapter_messages";
- public static String threadName()
- {
- return Thread.currentThread().getName();
- }
+ public static final String SessionTimeoutOverride = "SessionTimeoutOverride";
+ public static String threadName() {
+ return Thread.currentThread().getName();
+ }
}
Modified: incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=810567&r1=810566&r2=810567&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Wed Sep 2 15:27:04 2009
@@ -45,248 +45,247 @@
import org.apache.uima.util.impl.ProcessTrace_impl;
/**
- * Creates a worker thread for sending messages. This is an abstract
- * implementation that provides a thread with run logic. The concrete
- * implementation of the Worker Thread must extend this class. The application
- * threads share a special in-memory queue with this worker thread. The
- * application threads add jms messages to the pendingMessageList queue and the
- * worker thread consumes them. The worker thread terminates when the uima ee
- * client calls doStop() method.
+ * Creates a worker thread for sending messages. This is an abstract implementation that provides a
+ * thread with run logic. The concrete implementation of the Worker Thread must extend this class.
+ * The application threads share a special in-memory queue with this worker thread. The application
+ * threads add jms messages to the pendingMessageList queue and the worker thread consumes them. The
+ * worker thread terminates when the uima ee client calls doStop() method.
*/
-public abstract class BaseMessageSender implements Runnable,
- MessageSender {
- private static final Class CLASS_NAME = BaseMessageSender.class;
-
- // A reference to a shared queue where application threads enqueue messages
- // to be sent
- protected BlockingQueue<PendingMessage> messageQueue =
- new LinkedBlockingQueue<PendingMessage>();
- // Global flag controlling lifecycle of this thread. It will be set to true
- // when the
- // uima ee engine calls doStop()
- protected volatile boolean done;
- // A reference to the uima ee client engine
- protected BaseUIMAAsynchronousEngineCommon_impl engine;
- // Global flag to indicate failure of the worker thread
- protected volatile boolean workerThreadFailed;
- // If the worker thread fails, store the reason for the failure
- protected Exception exception;
-
- // These are required methods to be implemented by a concrete implementation
-
- // Returns instance of JMS MessageProducer
- public abstract MessageProducer getMessageProducer();
-
- // Provides implementation-specific implementation logic. It is expected
- // that this
- // message creates an instance of JMS MessageProducer
- protected abstract void initializeProducer() throws Exception;
-
- // Releases resources
- protected abstract void cleanup() throws Exception;
-
- // Returns the name of the destination
- protected abstract String getDestinationEndpoint() throws Exception;
-
- private MessageProducer producer = null;
-
-
- public BaseMessageSender( BaseUIMAAsynchronousEngineCommon_impl anEngine) {
- messageQueue = anEngine.pendingMessageQueue;
- engine = anEngine;
- try {
- // Acquire a shared lock. Release it in the run() method once we initialize
- // the producer.
- engine.producerSemaphore.acquire();
- } catch( InterruptedException e) {}
- }
+public abstract class BaseMessageSender implements Runnable, MessageSender {
+ private static final Class CLASS_NAME = BaseMessageSender.class;
+
+ // A reference to a shared queue where application threads enqueue messages
+ // to be sent
+ protected BlockingQueue<PendingMessage> messageQueue = new LinkedBlockingQueue<PendingMessage>();
+
+ // Global flag controlling lifecycle of this thread. It will be set to true
+ // when the
+ // uima ee engine calls doStop()
+ protected volatile boolean done;
+
+ // A reference to the uima ee client engine
+ protected BaseUIMAAsynchronousEngineCommon_impl engine;
+
+ // Global flag to indicate failure of the worker thread
+ protected volatile boolean workerThreadFailed;
+
+ // If the worker thread fails, store the reason for the failure
+ protected Exception exception;
+
+ // These are required methods to be implemented by a concrete implementation
+
+ // Returns instance of JMS MessageProducer
+ public abstract MessageProducer getMessageProducer();
+
+ // Provides implementation-specific implementation logic. It is expected
+ // that this
+ // message creates an instance of JMS MessageProducer
+ protected abstract void initializeProducer() throws Exception;
+
+ // Releases resources
+ protected abstract void cleanup() throws Exception;
+
+ // Returns the name of the destination
+ protected abstract String getDestinationEndpoint() throws Exception;
+
+ private MessageProducer producer = null;
+
+ public BaseMessageSender(BaseUIMAAsynchronousEngineCommon_impl anEngine) {
+ messageQueue = anEngine.pendingMessageQueue;
+ engine = anEngine;
+ try {
+ // Acquire a shared lock. Release it in the run() method once we initialize
+ // the producer.
+ engine.producerSemaphore.acquire();
+ } catch (InterruptedException e) {
+ }
+ }
/**
* Stops the worker thread
*/
public void doStop() {
done = true;
- // Create an empty message to deliver to the queue that is blocking
- PendingMessage emptyMessage = new PendingMessage(0);
+ // Create an empty message to deliver to the queue that is blocking
+ PendingMessage emptyMessage = new PendingMessage(0);
messageQueue.add(emptyMessage);
}
- /**
- * Return the Exception that caused the failure in this worker thread
- *
- * @return - Exception
- */
- public Exception getReasonForFailure() {
- return exception;
- }
-
- /**
- * The uima ee client should call this method to check if there was a
- * failure. The method returns true if there was a failure or false
- * otherwise. If true, the uima ee client can call getReasonForFailure() to
- * get the reason for failure
- */
- public boolean failed() {
- return workerThreadFailed;
- }
-
- /**
- * Initializes jms message producer and starts the main thread. This thread
- * waits for messages enqueued by application threads. The application
- * thread adds a jms message to the pendingMessageList 'queue' and signals
- * this worker that there is a new message. The worker thread removes the
- * message and sends it out to a given destination. All messages should be
- * fully initialized The worker thread does check the message nor adds
- * anything new to the message. It just sends it out.
- */
- public void run() {
- String destination = null;
-
- // Create and initialize the producer.
- try {
- initializeProducer();
+
+ /**
+ * Return the Exception that caused the failure in this worker thread
+ *
+ * @return - Exception
+ */
+ public Exception getReasonForFailure() {
+ return exception;
+ }
+
+ /**
+ * The uima ee client should call this method to check if there was a failure. The method returns
+ * true if there was a failure or false otherwise. If true, the uima ee client can call
+ * getReasonForFailure() to get the reason for failure
+ */
+ public boolean failed() {
+ return workerThreadFailed;
+ }
+
+ /**
+ * Initializes jms message producer and starts the main thread. This thread waits for messages
+ * enqueued by application threads. The application thread adds a jms message to the
+ * pendingMessageList 'queue' and signals this worker that there is a new message. The worker
+ * thread removes the message and sends it out to a given destination. All messages should be
+ * fully initialized The worker thread does check the message nor adds anything new to the
+ * message. It just sends it out.
+ */
+ public void run() {
+ String destination = null;
+
+ // Create and initialize the producer.
+ try {
+ initializeProducer();
destination = getDestinationEndpoint();
if (destination == null) {
- throw new InvalidDestinationException(
- "Unable to determine the destination");
+ throw new InvalidDestinationException("Unable to determine the destination");
}
- } catch (Exception e) {
- workerThreadFailed = true;
- exception = e;
- e.printStackTrace();
- return;
+ } catch (Exception e) {
+ workerThreadFailed = true;
+ exception = e;
+ e.printStackTrace();
+ return;
- } finally {
+ } finally {
engine.producerSemaphore.release();
- }
+ }
- engine.onProducerInitialized();
+ engine.onProducerInitialized();
- producer = getMessageProducer();
-
- // Wait for messages from application threads. The uima ee client engine
- // will call doStop() which sets the global flag 'done' to true.
- PendingMessage pm = null;
- while (!done) {
+ producer = getMessageProducer();
+
+ // Wait for messages from application threads. The uima ee client engine
+ // will call doStop() which sets the global flag 'done' to true.
+ PendingMessage pm = null;
+ while (!done) {
// Remove the oldest message from the shared 'queue'
-// // Wait for a new message
+ // // Wait for a new message
try {
pm = messageQueue.take();
- } catch ( InterruptedException e) {
+ } catch (InterruptedException e) {
}
if (done) {
break; // done in this loop
}
- try {
- // Request JMS Message from the concrete implementation
- Message message = null;
- // Determine if this a CAS Process Request
- boolean casProcessRequest = isProcessRequest(pm);
- // Only Process request can be serialized as binary
- if ( casProcessRequest && engine.getSerializationStrategy().equals("binary") ) {
+ try {
+ // Request JMS Message from the concrete implementation
+ Message message = null;
+ // Determine if this a CAS Process Request
+ boolean casProcessRequest = isProcessRequest(pm);
+ // Only Process request can be serialized as binary
+ if (casProcessRequest && engine.getSerializationStrategy().equals("binary")) {
message = createBytesMessage();
- } else {
+ } else {
message = createTextMessage();
- }
-
- initializeMessage( pm, message );
+ }
+
+ initializeMessage(pm, message);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE,
- CLASS_NAME.getName(), "run",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_sending_msg_to_endpoint__FINE",
- new Object[] { UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message.getIntProperty(AsynchAEMessage.Command)), UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,message.getIntProperty(AsynchAEMessage.MessageType)), destination });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "run",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_msg_to_endpoint__FINE",
+ new Object[] {
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
+ .getIntProperty(AsynchAEMessage.Command)),
+ UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
+ .getIntProperty(AsynchAEMessage.MessageType)), destination });
}
- if ( casProcessRequest )
- {
- ClientRequest cacheEntry = (ClientRequest)
- engine.getCache().get(pm.get(AsynchAEMessage.CasReference));
- if ( cacheEntry != null )
- {
- // Use Process Timeout value for the time-to-live property in the
- // outgoing JMS message. When this time is exceeded
- // while the message sits in a queue, the JMS Server will remove it from
- // the queue. What happens with the expired message depends on the
- // configuration. Most JMS Providers create a special dead-letter queue
- // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
- // are not auto evicted yet and accumulate taking up memory.
- long timeoutValue = cacheEntry.getProcessTimeout();
-
- if ( timeoutValue > 0 )
- {
- // Set high time to live value
- producer.setTimeToLive(10*timeoutValue);
- }
- if ( pm.getMessageType() == AsynchAEMessage.Process )
- {
- cacheEntry.setCASDepartureTime(System.nanoTime());
- }
- UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCasReferenceId());
- // Notify engine before sending a message
- engine.onBeforeMessageSend(status);
- }
- }
-
- producer.send(message);
- } catch (Exception e) {
- handleException(e, destination);
- }
-
- }
- try {
- cleanup();
- } catch (Exception e) {
- handleException(e, destination);
- }
- }
- private void initializeMessage( PendingMessage aPm, Message anOutgoingMessage)
- throws Exception
- {
- // Populate message properties based on outgoing message type
- switch( aPm.getMessageType())
- {
- case AsynchAEMessage.GetMeta:
- engine.setMetaRequestMessage(anOutgoingMessage);
- break;
-
- case AsynchAEMessage.Process:
- String casReferenceId =
- (String)aPm.get(AsynchAEMessage.CasReference);
- if ( engine.getSerializationStrategy().equals("xmi")) {
- String serializedCAS =
- (String) aPm.get(AsynchAEMessage.CAS);
- engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
- } else {
- byte[] serializedCAS =
- (byte[]) aPm.get(AsynchAEMessage.CAS);
- engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
-
- }
- // Message Expiration for Process is added in the main run() loop
- // right before the message is dispatched to the AS Service
- break;
-
- case AsynchAEMessage.CollectionProcessComplete:
- engine.setCPCMessage(anOutgoingMessage);
- break;
- }
- }
- private boolean isProcessRequest( PendingMessage pm ) {
+ if (casProcessRequest) {
+ ClientRequest cacheEntry = (ClientRequest) engine.getCache().get(
+ pm.get(AsynchAEMessage.CasReference));
+ if (cacheEntry != null) {
+ // Use Process Timeout value for the time-to-live property in the
+ // outgoing JMS message. When this time is exceeded
+ // while the message sits in a queue, the JMS Server will remove it from
+ // the queue. What happens with the expired message depends on the
+ // configuration. Most JMS Providers create a special dead-letter queue
+ // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ
+ // are not auto evicted yet and accumulate taking up memory.
+ long timeoutValue = cacheEntry.getProcessTimeout();
+
+ if (timeoutValue > 0) {
+ // Set high time to live value
+ producer.setTimeToLive(10 * timeoutValue);
+ }
+ if (pm.getMessageType() == AsynchAEMessage.Process) {
+ cacheEntry.setCASDepartureTime(System.nanoTime());
+ }
+ UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),
+ cacheEntry.getCasReferenceId());
+ // Notify engine before sending a message
+ engine.onBeforeMessageSend(status);
+ }
+ }
+
+ producer.send(message);
+ } catch (Exception e) {
+ handleException(e, destination);
+ }
+
+ }
+ try {
+ cleanup();
+ } catch (Exception e) {
+ handleException(e, destination);
+ }
+ }
+
+ private void initializeMessage(PendingMessage aPm, Message anOutgoingMessage) throws Exception {
+ // Populate message properties based on outgoing message type
+ switch (aPm.getMessageType()) {
+ case AsynchAEMessage.GetMeta:
+ engine.setMetaRequestMessage(anOutgoingMessage);
+ break;
+
+ case AsynchAEMessage.Process:
+ String casReferenceId = (String) aPm.get(AsynchAEMessage.CasReference);
+ if (engine.getSerializationStrategy().equals("xmi")) {
+ String serializedCAS = (String) aPm.get(AsynchAEMessage.CAS);
+ engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
+ } else {
+ byte[] serializedCAS = (byte[]) aPm.get(AsynchAEMessage.CAS);
+ engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
+
+ }
+ // Message Expiration for Process is added in the main run() loop
+ // right before the message is dispatched to the AS Service
+ break;
+
+ case AsynchAEMessage.CollectionProcessComplete:
+ engine.setCPCMessage(anOutgoingMessage);
+ break;
+ }
+ }
+
+ private boolean isProcessRequest(PendingMessage pm) {
return pm.getMessageType() == AsynchAEMessage.Process;
- }
- private void handleException(Exception e, String aDestination) {
- workerThreadFailed = true;
- exception = e;
- e.printStackTrace();
- // Notify the engine that there was an exception.
- engine.onException(e, aDestination);
-
- }
- /**
- * @override
- */
- public MessageProducer getMessageProducer(Destination destination) throws Exception {
- return null;
- }
+ }
+
+ private void handleException(Exception e, String aDestination) {
+ workerThreadFailed = true;
+ exception = e;
+ e.printStackTrace();
+ // Notify the engine that there was an exception.
+ engine.onException(e, aDestination);
+
+ }
+
+ /**
+ * @override
+ */
+ public MessageProducer getMessageProducer(Destination destination) throws Exception {
+ return null;
+ }
}