You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:16:55 UTC
svn commit: r810548 - in
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client:
ActiveMQMessageSender.java BaseUIMAAsynchronousEngine_impl.java
InvalidContainerException.java
Author: cwiklik
Date: Wed Sep 2 15:16:54 2009
New Revision: 810548
URL: http://svn.apache.org/viewvc?rev=810548&view=rev
Log:
UIMA-1541 Reformatted to conform to UIMA formatting guidelines. No other changes included.
Modified:
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/InvalidContainerException.java
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=810548&r1=810547&r2=810548&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Wed Sep 2 15:16:54 2009
@@ -36,126 +36,141 @@
import org.apache.uima.util.Level;
/**
- * Initializes JMS session and creates JMS MessageProducer to be used for
- * sending messages to a given destination. It extends BaseMessageSender which
- * starts the worker thread and is tasked with sending messages. The application
- * threads share a common 'queue' with the worker thread. The application
- * threads add messages to the pendingMessageList 'queue' and the worker thread
- * consumes them.
+ * Initializes JMS session and creates JMS MessageProducer to be used for sending messages to a
+ * given destination. It extends BaseMessageSender which starts the worker thread and is tasked with
+ * sending messages. The application threads share a common 'queue' with the worker thread. The
+ * application threads add messages to the pendingMessageList 'queue' and the worker thread consumes
+ * them.
*
*/
public class ActiveMQMessageSender extends BaseMessageSender {
private static final Class CLASS_NAME = ActiveMQMessageSender.class;
- private Connection connection = null;
- private Session session = null;
- private MessageProducer producer = null;
- private String destinationName = null;
- private ConcurrentHashMap<Destination, MessageProducer> producerMap =
- new ConcurrentHashMap<Destination,MessageProducer>();
-
- public ActiveMQMessageSender(Connection aConnection,
- String aDestinationName,
- BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
- super(engine);
- connection = aConnection;
- destinationName = aDestinationName;
- }
- public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception {
- if ( producerMap.containsKey(destination))
- {
- return (MessageProducer) producerMap.get(destination);
- }
- createSession();
- MessageProducer mProducer = session.createProducer(destination);
- mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producerMap.put(destination, mProducer);
- return mProducer;
- }
- private String getBrokerURL() {
+ private Connection connection = null;
+
+ private Session session = null;
+
+ private MessageProducer producer = null;
+
+ private String destinationName = null;
+
+ private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>();
+
+ public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
+ BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
+ super(engine);
+ connection = aConnection;
+ destinationName = aDestinationName;
+ }
+
+ public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception {
+ if (producerMap.containsKey(destination)) {
+ return (MessageProducer) producerMap.get(destination);
+ }
+ createSession();
+ MessageProducer mProducer = session.createProducer(destination);
+ mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producerMap.put(destination, mProducer);
+ return mProducer;
+ }
+
+ private String getBrokerURL() {
try {
- return ((ActiveMQConnection)connection).getBrokerInfo().getBrokerURL();
- } catch( Exception ex) { /* handle silently. */}
+ return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
+ } catch (Exception ex) { /* handle silently. */
+ }
return "";
- }
- private void createSession() throws Exception {
- String broker = getBrokerURL();
- try {
- if ( session == null ) {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- } catch( JMSException e) {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_failed_creating_session_INFO", new Object[] {destinationName, broker});
- }
- if ( connection == null ) {
- System.out.println("UIMA AS Client Shared Connection Is Not Initialized");
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_not_ready_INFO", new Object[] {broker});
- }
- } else if ( ((ActiveMQConnection)connection).isClosed() || ((ActiveMQConnection)connection).isClosing()) {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO", new Object[] {destinationName, broker});
- }
- }
- throw e;
- } catch( Exception e) {
- throw e;
- }
- }
- /**
- * Creates a jms session object used to instantiate message producer
- */
- protected void initializeProducer() throws Exception {
- createSession();
- producer = getMessageProducer(session.createQueue(destinationName));
- }
-
- /**
- * Returns the full name of the destination queue
- */
- protected String getDestinationEndpoint() throws Exception {
- return ((ActiveMQDestination) producer.getDestination())
- .getPhysicalName();
- }
-
- /**
- * Returns jsm MessageProducer
- */
- public MessageProducer getMessageProducer() {
- return producer;
- }
- public TextMessage createTextMessage() throws Exception
- {
- if ( session == null )
- {
- throw new JMSException("Unable To Create JMS TextMessage. Reason: JMS Session Not Initialized");
- }
- return session.createTextMessage();
- }
- public BytesMessage createBytesMessage() throws Exception
- {
- if ( session == null )
- {
- throw new JMSException("Unable To Create JMS BytesMessage. Reason: JMS Session Not Initialized");
+ }
+
+ private void createSession() throws Exception {
+ String broker = getBrokerURL();
+ try {
+ if (session == null) {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
+ } catch (JMSException e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_failed_creating_session_INFO",
+ new Object[] { destinationName, broker });
+ }
+ if (connection == null) {
+ System.out.println("UIMA AS Client Shared Connection Is Not Initialized");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_connection_not_ready_INFO", new Object[] { broker });
+ }
+ } else if (((ActiveMQConnection) connection).isClosed()
+ || ((ActiveMQConnection) connection).isClosing()) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME)
+ .logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_connection_closed_INFO",
+ new Object[] { destinationName, broker });
+ }
+ }
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ /**
+ * Creates a jms session object used to instantiate message producer
+ */
+ protected void initializeProducer() throws Exception {
+ createSession();
+ producer = getMessageProducer(session.createQueue(destinationName));
+ }
+
+ /**
+ * Returns the full name of the destination queue
+ */
+ protected String getDestinationEndpoint() throws Exception {
+ return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
+ }
+
+ /**
+ * Returns jsm MessageProducer
+ */
+ public MessageProducer getMessageProducer() {
+ return producer;
+ }
+
+ public TextMessage createTextMessage() throws Exception {
+ if (session == null) {
+ throw new JMSException(
+ "Unable To Create JMS TextMessage. Reason: JMS Session Not Initialized");
+ }
+ return session.createTextMessage();
+ }
+
+ public BytesMessage createBytesMessage() throws Exception {
+ if (session == null) {
+ throw new JMSException(
+ "Unable To Create JMS BytesMessage. Reason: JMS Session Not Initialized");
+ }
return session.createBytesMessage();
}
- /**
- * Cleanup any jms resources used by the worker thread
- */
- protected void cleanup() throws Exception {
- try {
- if (session != null ) {
- session.close();
- }
- if (producer != null) {
- producer.close();
- }
- producerMap.clear();
- } catch( Exception e) {
- System.out.println("JMS Exception While Closing Session - Ignoring");
- // Ignore we are shutting down
- }
- }
+
+ /**
+ * Cleanup any jms resources used by the worker thread
+ */
+ protected void cleanup() throws Exception {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ if (producer != null) {
+ producer.close();
+ }
+ producerMap.clear();
+ } catch (Exception e) {
+ System.out.println("JMS Exception While Closing Session - Ignoring");
+ // Ignore we are shutting down
+ }
+ }
}
\ No newline at end of file
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=810548&r1=810547&r2=810548&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Wed Sep 2 15:16:54 2009
@@ -18,6 +18,7 @@
*/
package org.apache.uima.adapter.jms.client;
+
import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
@@ -83,145 +84,144 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
-public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineCommon_impl implements UimaAsynchronousEngine, MessageListener, ControllerCallbackListener
-{
- private static final Class CLASS_NAME = BaseUIMAAsynchronousEngine_impl.class;
- private MessageSender sender = null;
- private MessageProducer producer;
- private String brokerURI = null;
- private Session session = null;
- private Session consumerSession = null;
-
- private volatile boolean serviceInitializationException;
- private volatile boolean serviceInitializationCompleted;
-
- private Semaphore serviceSemaphore = new Semaphore(1);
-
- private Queue consumerDestination = null;
- private Session producerSession = null;
- private JmxManager jmxManager = null;
- private String applicationName = "UimaASClient";
- private static SharedConnection sharedConnection = null;
- private static Semaphore sharedConnectionSemaphore =
- new Semaphore(1);
- private Object stopMux = new Object();
- private static final UimaAsVersion uimaAsVersion =
- new UimaAsVersion();
-
- public BaseUIMAAsynchronousEngine_impl() {
- UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, "UIMA-AS version " + UIMAFramework.getVersionString());
- }
-
-
- protected TextMessage createTextMessage() throws ResourceInitializationException
- {
- return new ActiveMQTextMessage();
- }
- protected BytesMessage createBytesMessage() throws ResourceInitializationException
- {
- return new ActiveMQBytesMessage();
+public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineCommon_impl
+ implements UimaAsynchronousEngine, MessageListener, ControllerCallbackListener {
+ private static final Class CLASS_NAME = BaseUIMAAsynchronousEngine_impl.class;
+
+ private MessageSender sender = null;
+
+ private MessageProducer producer;
+
+ private String brokerURI = null;
+
+ private Session session = null;
+
+ private Session consumerSession = null;
+
+ private volatile boolean serviceInitializationException;
+
+ private volatile boolean serviceInitializationCompleted;
+
+ private Semaphore serviceSemaphore = new Semaphore(1);
+
+ private Queue consumerDestination = null;
+
+ private Session producerSession = null;
+
+ private JmxManager jmxManager = null;
+
+ private String applicationName = "UimaASClient";
+
+ private static SharedConnection sharedConnection = null;
+
+ private static Semaphore sharedConnectionSemaphore = new Semaphore(1);
+
+ private Object stopMux = new Object();
+
+ private static final UimaAsVersion uimaAsVersion = new UimaAsVersion();
+
+ public BaseUIMAAsynchronousEngine_impl() {
+ UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
+ "UIMA-AS version " + UIMAFramework.getVersionString());
}
- /**
- * Called at the end of collectionProcessingComplete - WAS closes receiving
- * thread here
- */
- protected void cleanup() throws Exception
- {
- }
+ protected TextMessage createTextMessage() throws ResourceInitializationException {
+ return new ActiveMQTextMessage();
+ }
- /**
- * Return a name of the queue to which the JMS Producer is connected to.
- */
- public String getEndPointName() throws ResourceProcessException
- {
- try{
- return ((ActiveMQDestination)sender.getMessageProducer().getDestination()).getPhysicalName();
- //return (((ActiveMQDestination) producer.getDestination()).getPhysicalName());
- }
- catch (Exception e)
- {
- throw new ResourceProcessException(e);
- }
- }
- protected void setMetaRequestMessage(Message msg) throws Exception
- {
- msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
-
- msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
- msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta);
- msg.setJMSReplyTo(consumerDestination);
- if ( msg instanceof TextMessage ) {
- ((ActiveMQTextMessage) msg).setText("");
- }
- }
- /**
- * Initialize JMS Message with properties relevant to Process CAS request.
- */
- protected void setCASMessage(String aCasReferenceId, CAS aCAS, Message msg) throws ResourceProcessException
- {
- try{
- setCommonProperties(aCasReferenceId, msg, "xmi");
- ((TextMessage)msg).setText(serializeCAS(aCAS));
- }
- catch (Exception e)
- {
- throw new ResourceProcessException(e);
- }
- }
-
- protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, Message msg) throws ResourceProcessException
- {
- try{
- setCommonProperties(aCasReferenceId, msg, "xmi");
- ((TextMessage)msg).setText(aSerializedCAS);
- }
- catch (Exception e)
- {
- throw new ResourceProcessException(e);
- }
- }
- protected void setCASMessage(String aCasReferenceId, byte[] aSerializedCAS, Message msg) throws ResourceProcessException
- {
- try{
- setCommonProperties(aCasReferenceId, msg, "binary");
- ((BytesMessage)msg).writeBytes(aSerializedCAS);
+ protected BytesMessage createBytesMessage() throws ResourceInitializationException {
+ return new ActiveMQBytesMessage();
+ }
+
+ /**
+ * Called at the end of collectionProcessingComplete - WAS closes receiving thread here
+ */
+ protected void cleanup() throws Exception {
+ }
+
+ /**
+ * Return a name of the queue to which the JMS Producer is connected to.
+ */
+ public String getEndPointName() throws ResourceProcessException {
+ try {
+ return ((ActiveMQDestination) sender.getMessageProducer().getDestination()).getPhysicalName();
+ // return (((ActiveMQDestination) producer.getDestination()).getPhysicalName());
+ } catch (Exception e) {
+ throw new ResourceProcessException(e);
+ }
+ }
+
+ protected void setMetaRequestMessage(Message msg) throws Exception {
+ msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
+
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta);
+ msg.setJMSReplyTo(consumerDestination);
+ if (msg instanceof TextMessage) {
+ ((ActiveMQTextMessage) msg).setText("");
+ }
+ }
+
+ /**
+ * Initialize JMS Message with properties relevant to Process CAS request.
+ */
+ protected void setCASMessage(String aCasReferenceId, CAS aCAS, Message msg)
+ throws ResourceProcessException {
+ try {
+ setCommonProperties(aCasReferenceId, msg, "xmi");
+ ((TextMessage) msg).setText(serializeCAS(aCAS));
+ } catch (Exception e) {
+ throw new ResourceProcessException(e);
+ }
+ }
+
+ protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, Message msg)
+ throws ResourceProcessException {
+ try {
+ setCommonProperties(aCasReferenceId, msg, "xmi");
+ ((TextMessage) msg).setText(aSerializedCAS);
+ } catch (Exception e) {
+ throw new ResourceProcessException(e);
}
- catch (Exception e)
- {
+ }
+
+ protected void setCASMessage(String aCasReferenceId, byte[] aSerializedCAS, Message msg)
+ throws ResourceProcessException {
+ try {
+ setCommonProperties(aCasReferenceId, msg, "binary");
+ ((BytesMessage) msg).writeBytes(aSerializedCAS);
+ } catch (Exception e) {
throw new ResourceProcessException(e);
}
}
- protected void setCommonProperties( String aCasReferenceId, Message msg, String aSerializationStrategy) throws ResourceProcessException
- {
- try{
- msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
-
- msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
- msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process);
- msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-
- if ( aSerializationStrategy.equals("binary")) {
- msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
- } else if ( aSerializationStrategy.equals("xmi")) {
+ protected void setCommonProperties(String aCasReferenceId, Message msg,
+ String aSerializationStrategy) throws ResourceProcessException {
+ try {
+ msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
+
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+
+ if (aSerializationStrategy.equals("binary")) {
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
+ } else if (aSerializationStrategy.equals("xmi")) {
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
- }
+ }
- msg.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
- msg.setJMSReplyTo(consumerDestination);
-
- }
- catch (Exception e)
- {
- throw new ResourceProcessException(e);
- }
-
+ msg.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
+ msg.setJMSReplyTo(consumerDestination);
+
+ } catch (Exception e) {
+ throw new ResourceProcessException(e);
}
- public void stop()
- {
+
+ }
+
+ public void stop() {
if (!running) {
return;
}
@@ -277,716 +277,722 @@
e.printStackTrace();
}
}
- }
+ }
+
+ public void setCPCMessage(Message msg) throws Exception {
+ msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ msg.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true);
+ msg.setJMSReplyTo(consumerDestination);
+ if (msg instanceof TextMessage) {
+ ((TextMessage) msg).setText("");
+ }
+ }
+
+ protected void setupConnection(String aBrokerURI) throws Exception {
+ try {
+ // Acquire global static semaphore
+ sharedConnectionSemaphore.acquire();
+ if (sharedConnection == null) {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(aBrokerURI);
+ Connection connection = factory.createConnection();
+ // This only effects Consumer
+ addPrefetch((ActiveMQConnection) connection);
+ connection.start();
+ sharedConnection = new SharedConnection(connection);
+ System.out.println("UIMA AS Client Created Shared Connection To Broker:" + aBrokerURI);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "setupConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_connection_setup_INFO", new Object[] { aBrokerURI });
+ }
+ }
- public void setCPCMessage(Message msg) throws Exception
- {
- msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
- msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
- msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
- msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- msg.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true);
- msg.setJMSReplyTo(consumerDestination);
- if ( msg instanceof TextMessage ) {
- ((TextMessage)msg).setText("");
- }
- }
- protected void setupConnection( String aBrokerURI ) throws Exception
- {
- try {
- // Acquire global static semaphore
- sharedConnectionSemaphore.acquire();
- if (sharedConnection == null )
- {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(aBrokerURI);
- Connection connection = factory.createConnection();
- // This only effects Consumer
- addPrefetch((ActiveMQConnection)connection);
- connection.start();
- sharedConnection = new SharedConnection(connection);
- System.out.println("UIMA AS Client Created Shared Connection To Broker:"+aBrokerURI);
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "setupConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_setup_INFO", new Object[] { aBrokerURI });
- }
- }
-
- } catch( Exception e) {
- throw e;
- } finally {
- // Release global static semaphore
- sharedConnectionSemaphore.release();
- }
- }
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ // Release global static semaphore
+ sharedConnectionSemaphore.release();
+ }
+ }
- private void addPrefetch(ActiveMQConnection aConnection ) {
+ private void addPrefetch(ActiveMQConnection aConnection) {
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(5);
- ((ActiveMQConnection)aConnection).setPrefetchPolicy(prefetchPolicy);
- }
- private void validateConnection(String aBrokerURI) throws Exception
- {
- // checks if a sharedConnection exists and if not creates a new one
- setupConnection(aBrokerURI);
- }
- protected Session getSession(String aBrokerURI) throws Exception
- {
- validateConnection(aBrokerURI);
- return getSession(sharedConnection.getConnection());
- }
- protected Session getSession(Connection aConnection) throws Exception
- {
+ ((ActiveMQConnection) aConnection).setPrefetchPolicy(prefetchPolicy);
+ }
+
+ private void validateConnection(String aBrokerURI) throws Exception {
+ // checks if a sharedConnection exists and if not creates a new one
+ setupConnection(aBrokerURI);
+ }
+
+ protected Session getSession(String aBrokerURI) throws Exception {
+ validateConnection(aBrokerURI);
+ return getSession(sharedConnection.getConnection());
+ }
+
+ protected Session getSession(Connection aConnection) throws Exception {
session = aConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session;
}
- protected MessageProducer lookupProducerForEndpoint( Endpoint anEndpoint ) throws Exception
- {
- if ( sharedConnection == null || producerSession == null )
- {
- throw new ResourceInitializationException();
- }
- Destination dest = producerSession.createQueue(anEndpoint.getEndpoint());
- return producerSession.createProducer(dest);
- }
+ protected MessageProducer lookupProducerForEndpoint(Endpoint anEndpoint) throws Exception {
+ if (sharedConnection == null || producerSession == null) {
+ throw new ResourceInitializationException();
+ }
+ Destination dest = producerSession.createQueue(anEndpoint.getEndpoint());
+ return producerSession.createProducer(dest);
+ }
+
public void initializeProducer(String aBrokerURI, String aQueueName) throws Exception {
- // Check if a sharedConnection exists. If not it creates one
+ // Check if a sharedConnection exists. If not it creates one
setupConnection(aBrokerURI);
initializeProducer(aBrokerURI, aQueueName, sharedConnection.getConnection());
}
- public void initializeProducer(String aBrokerURI, String aQueueName, Connection aConnection) throws Exception
- {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initializeProducer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_producer_INFO", new Object[] { aBrokerURI, aQueueName });
- }
- brokerURI = aBrokerURI;
- // Create a worker thread for sending messages. Jms sessions are single threaded
- // and it is illegal (per JMS spec) to use the same sesssion from multiple threads.
- // The worker thread solves this problem. As it is the only thread that owns the
- // session and uses it to create message producer.
- // The worker thread blocks waiting for messages from application threads. The
- // application threads add messages to the shared "queue" (in-memory queue not
- // jms queue) and the worker thread consumes them. The worker thread is not
- // serialializing CASes. This work is done in application threads.
-
- // create a Message Dispatcher object. In its constructor it acquires a shared
- // semaphore producerSemaphore and holds it until the producer is created an
- // and initialized. Once this happens or there is an error, the semaphore is
- // released.
- sender =
- new ActiveMQMessageSender( aConnection, aQueueName, this);
- producerInitialized = false;
- Thread t = new Thread( (BaseMessageSender) sender);
- // Start the worker thread. The jms session and message producer are created. Once
- // the message producer is created, the worker thread notifies this thread by
- // calling onProducerInitialized() where the global flag 'producerInitialized' is
- // set to true. After the notification, the worker thread notifies this instance
- // that the producer is fully initialized and finally begins to wait for messages
- // in pendingMessageList. Upon arrival, each message is removed from
- // pendingMessageList and it is sent to a destination.
-
- t.start();
-
- try {
- // Block waiting for the Sender to complete initializing the Producer.
- // The sender will release the lock once it instantiates and initializes
- // the Producer object or if there is an error
- producerSemaphore.acquire();
- } catch ( InterruptedException ex ) {
-
- } finally {
- producerSemaphore.release();
- }
- // Check if the worker thread failed to initialize.
- if ( sender.failed())
- {
- // Worker thread failed to initialize. Log the reason and stop the uima ee client
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "initializeProducer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_worker_thread_failed_to_initialize__WARNING", new Object[] { sender.getReasonForFailure() });
- }
- stop();
- return;
- }
- }
- /**
- * Create a JMS Consumer on a temporary queue. Service replies will be handled by
- * this consumer.
- *
- * @param aBrokerURI
- * @throws Exception
- */
+ public void initializeProducer(String aBrokerURI, String aQueueName, Connection aConnection)
+ throws Exception {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "initializeProducer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_init_jms_producer_INFO", new Object[] { aBrokerURI, aQueueName });
+ }
+ brokerURI = aBrokerURI;
+ // Create a worker thread for sending messages. Jms sessions are single threaded
+ // and it is illegal (per JMS spec) to use the same sesssion from multiple threads.
+ // The worker thread solves this problem. As it is the only thread that owns the
+ // session and uses it to create message producer.
+ // The worker thread blocks waiting for messages from application threads. The
+ // application threads add messages to the shared "queue" (in-memory queue not
+ // jms queue) and the worker thread consumes them. The worker thread is not
+ // serialializing CASes. This work is done in application threads.
+
+ // create a Message Dispatcher object. In its constructor it acquires a shared
+ // semaphore producerSemaphore and holds it until the producer is created an
+ // and initialized. Once this happens or there is an error, the semaphore is
+ // released.
+ sender = new ActiveMQMessageSender(aConnection, aQueueName, this);
+ producerInitialized = false;
+ Thread t = new Thread((BaseMessageSender) sender);
+ // Start the worker thread. The jms session and message producer are created. Once
+ // the message producer is created, the worker thread notifies this thread by
+ // calling onProducerInitialized() where the global flag 'producerInitialized' is
+ // set to true. After the notification, the worker thread notifies this instance
+ // that the producer is fully initialized and finally begins to wait for messages
+ // in pendingMessageList. Upon arrival, each message is removed from
+ // pendingMessageList and it is sent to a destination.
+
+ t.start();
+
+ try {
+ // Block waiting for the Sender to complete initializing the Producer.
+ // The sender will release the lock once it instantiates and initializes
+ // the Producer object or if there is an error
+ producerSemaphore.acquire();
+ } catch (InterruptedException ex) {
+
+ } finally {
+ producerSemaphore.release();
+ }
+ // Check if the worker thread failed to initialize.
+ if (sender.failed()) {
+ // Worker thread failed to initialize. Log the reason and stop the uima ee client
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "initializeProducer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_worker_thread_failed_to_initialize__WARNING",
+ new Object[] { sender.getReasonForFailure() });
+ }
+ stop();
+ return;
+ }
+ }
+
+ /**
+ * Create a JMS Consumer on a temporary queue. Service replies will be handled by this consumer.
+ *
+ * @param aBrokerURI
+ * @throws Exception
+ */
public void initializeConsumer(String aBrokerURI) throws Exception {
setupConnection(aBrokerURI);
initializeConsumer(aBrokerURI, sharedConnection.getConnection());
}
- public void initializeConsumer(String aBrokerURI, Connection connection) throws Exception
- {
- consumerSession = getSession(connection);
- consumerDestination = consumerSession.createTemporaryQueue();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initializeConsumer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_consumer_INFO", new Object[] { aBrokerURI, consumerDestination.getQueueName() });
- }
- consumer = consumerSession.createConsumer(consumerDestination);
- consumer.setMessageListener(this);
- System.out.println(">>>> Client Activated Temp Reply Queue:"+consumerDestination.getQueueName());
- }
- /**
- * Initialize the uima ee client. Takes initialization parameters from the
- * <code>anApplicationContext</code> map.
- */
- public synchronized void initialize(Map anApplicationContext) throws ResourceInitializationException
- {
- // Check UIMA AS version againg the UIMA Core version. If not the same throw Exception
- if ( !uimaAsVersion.getVersionString().equals(UIMAFramework.getVersionString())) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "BaseAnalysisEngineController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_incompatible_version_WARNING",
- new Object[] { "UIM AS Client", uimaAsVersion.getVersionString(), UIMAFramework.getVersionString() });
- throw new ResourceInitializationException(new AsynchAEException("Version of UIMA-AS is Incompatible with a Version of UIMA Core. UIMA-AS Version:"+uimaAsVersion.getVersionString()+" Core UIMA Version:"+UIMAFramework.getVersionString()));
- }
-
- if ( running )
- {
- throw new ResourceInitializationException(new UIMA_IllegalStateException());
- }
- reset();
- Properties performanceTuningSettings = null;
-
- if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri))
- {
- throw new ResourceInitializationException();
- }
- if (!anApplicationContext.containsKey(UimaAsynchronousEngine.Endpoint))
- {
- throw new ResourceInitializationException();
- }
- ResourceManager rm = null;
- if (anApplicationContext.containsKey(Resource.PARAM_RESOURCE_MANAGER))
- {
- rm = (ResourceManager) anApplicationContext.get(Resource.PARAM_RESOURCE_MANAGER);
- }
- else
- {
- rm = UIMAFramework.newDefaultResourceManager();
- }
- if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE))
- {
- String cas_initial_heap_size = (String) anApplicationContext.get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
- performanceTuningSettings = new Properties();
- performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
- }
- asynchManager = new AsynchAECasManager_impl(rm);
-
- brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
- String endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.Endpoint);
- clientSideJmxStats.setEndpointName(endpoint);
- int casPoolSize = 1;
-
- if (anApplicationContext.containsKey(UimaAsynchronousEngine.CasPoolSize))
- {
- casPoolSize = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CasPoolSize)).intValue();
- clientSideJmxStats.setCasPoolSize(casPoolSize);
- }
-
- if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout))
- {
- processTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.Timeout)).intValue();
- }
-
- if (anApplicationContext.containsKey(UimaAsynchronousEngine.GetMetaTimeout))
- {
- metadataTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.GetMetaTimeout)).intValue();
- }
-
- if (anApplicationContext.containsKey(UimaAsynchronousEngine.CpcTimeout))
- {
- cpcTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CpcTimeout)).intValue();
- }
- if (anApplicationContext.containsKey(UimaAsynchronousEngine.ApplicationName))
- {
- applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
- }
- if (anApplicationContext.containsKey(UimaAsynchronousEngine.SerializationStrategy))
- {
- super.serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SerializationStrategy);
- }
+ public void initializeConsumer(String aBrokerURI, Connection connection) throws Exception {
+ consumerSession = getSession(connection);
+ consumerDestination = consumerSession.createTemporaryQueue();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "initializeConsumer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_init_jms_consumer_INFO",
+ new Object[] { aBrokerURI, consumerDestination.getQueueName() });
+ }
+ consumer = consumerSession.createConsumer(consumerDestination);
+ consumer.setMessageListener(this);
+ System.out.println(">>>> Client Activated Temp Reply Queue:"
+ + consumerDestination.getQueueName());
+ }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_uimaee_client__CONFIG", new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout, cpcTimeout });
+ /**
+ * Initialize the uima ee client. Takes initialization parameters from the
+ * <code>anApplicationContext</code> map.
+ */
+ public synchronized void initialize(Map anApplicationContext)
+ throws ResourceInitializationException {
+ // Check UIMA AS version againg the UIMA Core version. If not the same throw Exception
+ if (!uimaAsVersion.getVersionString().equals(UIMAFramework.getVersionString())) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.WARNING,
+ CLASS_NAME.getName(),
+ "BaseAnalysisEngineController",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_incompatible_version_WARNING",
+ new Object[] { "UIM AS Client", uimaAsVersion.getVersionString(),
+ UIMAFramework.getVersionString() });
+ throw new ResourceInitializationException(new AsynchAEException(
+ "Version of UIMA-AS is Incompatible with a Version of UIMA Core. UIMA-AS Version:"
+ + uimaAsVersion.getVersionString() + " Core UIMA Version:"
+ + UIMAFramework.getVersionString()));
+ }
+
+ if (running) {
+ throw new ResourceInitializationException(new UIMA_IllegalStateException());
+ }
+ reset();
+ Properties performanceTuningSettings = null;
+
+ if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri)) {
+ throw new ResourceInitializationException();
+ }
+ if (!anApplicationContext.containsKey(UimaAsynchronousEngine.Endpoint)) {
+ throw new ResourceInitializationException();
+ }
+ ResourceManager rm = null;
+ if (anApplicationContext.containsKey(Resource.PARAM_RESOURCE_MANAGER)) {
+ rm = (ResourceManager) anApplicationContext.get(Resource.PARAM_RESOURCE_MANAGER);
+ } else {
+ rm = UIMAFramework.newDefaultResourceManager();
+ }
+ if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE)) {
+ String cas_initial_heap_size = (String) anApplicationContext
+ .get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
+ performanceTuningSettings = new Properties();
+ performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
+ }
+ asynchManager = new AsynchAECasManager_impl(rm);
+
+ brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
+ String endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.Endpoint);
+ clientSideJmxStats.setEndpointName(endpoint);
+ int casPoolSize = 1;
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.CasPoolSize)) {
+ casPoolSize = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CasPoolSize))
+ .intValue();
+ clientSideJmxStats.setCasPoolSize(casPoolSize);
+ }
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) {
+ processTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.Timeout))
+ .intValue();
+ }
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.GetMetaTimeout)) {
+ metadataTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.GetMetaTimeout))
+ .intValue();
+ }
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.CpcTimeout)) {
+ cpcTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CpcTimeout))
+ .intValue();
+ }
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.ApplicationName)) {
+ applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
+ }
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.SerializationStrategy)) {
+ super.serializationStrategy = (String) anApplicationContext
+ .get(UimaAsynchronousEngine.SerializationStrategy);
+ }
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
+ UIMAFramework.getLogger(CLASS_NAME)
+ .logrb(
+ Level.CONFIG,
+ CLASS_NAME.getName(),
+ "initialize",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_init_uimaee_client__CONFIG",
+ new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout,
+ cpcTimeout });
}
- super.serviceDelegate = new ClientServiceDelegate(endpoint,applicationName,this);
+ super.serviceDelegate = new ClientServiceDelegate(endpoint, applicationName, this);
super.serviceDelegate.setCasProcessTimeout(processTimeout);
super.serviceDelegate.setGetMetaTimeout(metadataTimeout);
- try
- {
- // Generate unique identifier
- String uuid = UUIDGenerator.generate();
- // JMX does not allow ':' in the ObjectName so replace these with underscore
- uuid = uuid.replaceAll(":", "_");
+ try {
+ // Generate unique identifier
+ String uuid = UUIDGenerator.generate();
+ // JMX does not allow ':' in the ObjectName so replace these with underscore
+ uuid = uuid.replaceAll(":", "_");
uuid = uuid.replaceAll("-", "_");
- applicationName += "_"+uuid;
- jmxManager = new JmxManager("org.apache.uima");
- clientSideJmxStats.setApplicationName(applicationName);
- ObjectName on = new ObjectName("org.apache.uima:name="+applicationName);
- jmxManager.registerMBean(clientSideJmxStats, on);
-
- // Check if sharedConnection exists. If not create a new one. The sharedConnection
- // is static and shared by all instances of UIMA AS client in a jvm. The check
- // is made in a critical section by first acquiring a global static semaphore to
- // prevent a race condition.
- setupConnection(brokerURI);
-
- // Reuse existing JMS connection if available
- if (sharedConnection != null ) {
+ applicationName += "_" + uuid;
+ jmxManager = new JmxManager("org.apache.uima");
+ clientSideJmxStats.setApplicationName(applicationName);
+ ObjectName on = new ObjectName("org.apache.uima:name=" + applicationName);
+ jmxManager.registerMBean(clientSideJmxStats, on);
+
+ // Check if sharedConnection exists. If not create a new one. The sharedConnection
+ // is static and shared by all instances of UIMA AS client in a jvm. The check
+ // is made in a critical section by first acquiring a global static semaphore to
+ // prevent a race condition.
+ setupConnection(brokerURI);
+
+ // Reuse existing JMS connection if available
+ if (sharedConnection != null) {
initializeProducer(brokerURI, endpoint, sharedConnection.getConnection());
initializeConsumer(brokerURI, sharedConnection.getConnection());
- } else {
- initializeProducer(brokerURI, endpoint);
- initializeConsumer(brokerURI);
- }
-
- // Increment number of client instances. SharedConnection object is a static
- // and is used to share a single JMS connection. The connection is closed
- // when the last client finishes processing and calls stop().
- if ( sharedConnection != null ) {
- sharedConnection.incrementClientCount();
- }
- running = true;
- // Acquire GetMeta Semaphore Before Sending a GetMeta Request. This will force
- // the client to block in waitForMetadataReply() until GetMeta reply is received
- try {
- getMetaSemaphore.acquire();
- } catch( InterruptedException e) {
- System.out.println("UIMA AS Client Interrupted While Waiting On GetMetaSemaphore");
- }
- sendMetaRequest();
- waitForMetadataReply();
- if (abort || !running)
- {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_aborting_as_WARNING", new Object[] { "Metadata Timeout" });
- }
- throw new ResourceInitializationException(new UimaASMetaRequestTimeout());
- }
- else
- {
- if (collectionReader != null)
- {
- asynchManager.addMetadata(collectionReader.getProcessingResourceMetaData());
- }
-
- asynchManager.initialize(casPoolSize, "ApplicationCasPoolContext", performanceTuningSettings);
-
- // Create a special CasPool of size 1 to be used for deserializing CASes from a Cas Multiplier
- if ( super.resourceMetadata != null && super.resourceMetadata instanceof AnalysisEngineMetaData )
- {
- if ( ((AnalysisEngineMetaData) super.resourceMetadata).getOperationalProperties().getOutputsNewCASes() )
- {
- // Create a Shadow CAS Pool used to de-serialize CASes produced by a CAS Multiplier
- asynchManager.initialize(1, SHADOW_CAS_POOL, performanceTuningSettings);
- }
- }
- initialized = true;
- remoteService = true;
- // running = true;
-
- for (int i = 0; listeners != null && i < listeners.size(); i++)
- {
- ((UimaASStatusCallbackListener) listeners.get(i)).initializationComplete(null);
- }
- }
-
- }
- catch (ResourceInitializationException e)
- {
- notifyOnInitializationFailure(e);
- throw e;
- }
- catch (Exception e)
- {
+ } else {
+ initializeProducer(brokerURI, endpoint);
+ initializeConsumer(brokerURI);
+ }
+
+ // Increment number of client instances. SharedConnection object is a static
+ // and is used to share a single JMS connection. The connection is closed
+ // when the last client finishes processing and calls stop().
+ if (sharedConnection != null) {
+ sharedConnection.incrementClientCount();
+ }
+ running = true;
+ // Acquire GetMeta Semaphore Before Sending a GetMeta Request. This will force
+ // the client to block in waitForMetadataReply() until GetMeta reply is received
+ try {
+ getMetaSemaphore.acquire();
+ } catch (InterruptedException e) {
+ System.out.println("UIMA AS Client Interrupted While Waiting On GetMetaSemaphore");
+ }
+ sendMetaRequest();
+ waitForMetadataReply();
+ if (abort || !running) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_aborting_as_WARNING", new Object[] { "Metadata Timeout" });
+ }
+ throw new ResourceInitializationException(new UimaASMetaRequestTimeout());
+ } else {
+ if (collectionReader != null) {
+ asynchManager.addMetadata(collectionReader.getProcessingResourceMetaData());
+ }
+
+ asynchManager.initialize(casPoolSize, "ApplicationCasPoolContext",
+ performanceTuningSettings);
+
+ // Create a special CasPool of size 1 to be used for deserializing CASes from a Cas
+ // Multiplier
+ if (super.resourceMetadata != null
+ && super.resourceMetadata instanceof AnalysisEngineMetaData) {
+ if (((AnalysisEngineMetaData) super.resourceMetadata).getOperationalProperties()
+ .getOutputsNewCASes()) {
+ // Create a Shadow CAS Pool used to de-serialize CASes produced by a CAS Multiplier
+ asynchManager.initialize(1, SHADOW_CAS_POOL, performanceTuningSettings);
+ }
+ }
+ initialized = true;
+ remoteService = true;
+ // running = true;
+
+ for (int i = 0; listeners != null && i < listeners.size(); i++) {
+ ((UimaASStatusCallbackListener) listeners.get(i)).initializationComplete(null);
+ }
+ }
+
+ } catch (ResourceInitializationException e) {
notifyOnInitializationFailure(e);
- throw new ResourceInitializationException(e);
- }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized__INFO", new Object[] { super.serializationStrategy });
- }
- // Acquire cpcReady semaphore to block sending CPC request until
- // ALL outstanding CASes are received.
- super.acquireCpcReadySemaphore();
- }
- /**
- * First generates a Spring context from a given deploy descriptor and than
- * deploys the context into a Spring Container.
- *
- * @param aDeploymentDescriptor -
- * deployment descriptor to generate Spring Context from
- * @param anApplicationContext -
- * a Map containing properties required by dd2spring
- *
- * @return - a unique spring container id
- *
- */
- public String deploy(String aDeploymentDescriptor, Map anApplicationContext) throws Exception {
- String springContext = generateSpringContext(aDeploymentDescriptor, anApplicationContext);
-
- SpringContainerDeployer springDeployer =
- new SpringContainerDeployer(springContainerRegistry);
- try
- {
- return springDeployer.deploy(springContext );
- }
- catch( ResourceInitializationException e)
- {
- running = true;
- throw e;
- }
- //return deploySpringContainer(new String[] { springContext });
- }
+ throw e;
+ } catch (Exception e) {
+ notifyOnInitializationFailure(e);
+ throw new ResourceInitializationException(e);
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized__INFO",
+ new Object[] { super.serializationStrategy });
+ }
+ // Acquire cpcReady semaphore to block sending CPC request until
+ // ALL outstanding CASes are received.
+ super.acquireCpcReadySemaphore();
+ }
+
+ /**
+ * First generates a Spring context from a given deploy descriptor and than deploys the context
+ * into a Spring Container.
+ *
+ * @param aDeploymentDescriptor
+ * - deployment descriptor to generate Spring Context from
+ * @param anApplicationContext
+ * - a Map containing properties required by dd2spring
+ *
+ * @return - a unique spring container id
+ *
+ */
+ public String deploy(String aDeploymentDescriptor, Map anApplicationContext) throws Exception {
+ String springContext = generateSpringContext(aDeploymentDescriptor, anApplicationContext);
+ SpringContainerDeployer springDeployer = new SpringContainerDeployer(springContainerRegistry);
+ try {
+ return springDeployer.deploy(springContext);
+ } catch (ResourceInitializationException e) {
+ running = true;
+ throw e;
+ }
+ // return deploySpringContainer(new String[] { springContext });
+ }
- /**
+ /**
*
*/
- public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext) throws Exception {
- if (aDeploymentDescriptorList == null) {
- throw new ResourceConfigurationException(UIMA_IllegalArgumentException.ILLEGAL_ARGUMENT, new Object[] { "Null", "DeploymentDescriptorList", "deploy()" });
- }
-
- if (aDeploymentDescriptorList.length == 0) {
- throw new ResourceConfigurationException(ResourceConfigurationException.MANDATORY_VALUE_MISSING, new Object[] { "DeploymentDescriptorList" });
- }
- String[] springContextFiles = new String[aDeploymentDescriptorList.length];
-
- for (int i = 0; i < aDeploymentDescriptorList.length; i++) {
- springContextFiles[i] = generateSpringContext(aDeploymentDescriptorList[i], anApplicationContext);
- }
-
- SpringContainerDeployer springDeployer =
- new SpringContainerDeployer(springContainerRegistry);
- try
- {
- return springDeployer.deploy(springContextFiles);
- }
- catch( ResourceInitializationException e)
- {
- running = true;
- throw e;
- }
+ public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
+ throws Exception {
+ if (aDeploymentDescriptorList == null) {
+ throw new ResourceConfigurationException(UIMA_IllegalArgumentException.ILLEGAL_ARGUMENT,
+ new Object[] { "Null", "DeploymentDescriptorList", "deploy()" });
+ }
+
+ if (aDeploymentDescriptorList.length == 0) {
+ throw new ResourceConfigurationException(
+ ResourceConfigurationException.MANDATORY_VALUE_MISSING,
+ new Object[] { "DeploymentDescriptorList" });
+ }
+ String[] springContextFiles = new String[aDeploymentDescriptorList.length];
- }
+ for (int i = 0; i < aDeploymentDescriptorList.length; i++) {
+ springContextFiles[i] = generateSpringContext(aDeploymentDescriptorList[i],
+ anApplicationContext);
+ }
+
+ SpringContainerDeployer springDeployer = new SpringContainerDeployer(springContainerRegistry);
+ try {
+ return springDeployer.deploy(springContextFiles);
+ } catch (ResourceInitializationException e) {
+ running = true;
+ throw e;
+ }
- public void undeploy() throws Exception {
+ }
+
+ public void undeploy() throws Exception {
Iterator containerIterator = springContainerRegistry.keySet().iterator();
- while(containerIterator.hasNext()) {
- String containerId = (String)containerIterator.next();
- undeploy(containerId);
- }
- }
+ while (containerIterator.hasNext()) {
+ String containerId = (String) containerIterator.next();
+ undeploy(containerId);
+ }
+ }
+
public void undeploy(String aSpringContainerId) throws Exception {
- this.undeploy( aSpringContainerId, SpringContainerDeployer.STOP_NOW);
+ this.undeploy(aSpringContainerId, SpringContainerDeployer.STOP_NOW);
}
- /**
- * Undeploys Spring container with a given container Id. All deployed Spring
- * containers are registered in the local registry under a unique id.
- *
- */
- public void undeploy(String aSpringContainerId, int stop_level) throws Exception
-
- {
- if ( aSpringContainerId == null )
- {
- return;
- }
-
- UimaEEAdminSpringContext adminContext = null;
- synchronized (springContainerRegistry)
- {
- if (!springContainerRegistry.containsKey(aSpringContainerId))
- {
- return;
- //throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId + ". Unable to undeploy the Spring container");
- }
- // Fetch an administrative context which contains a Spring Container
- adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
- if ( adminContext == null )
- {
- throw new InvalidContainerException("Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
- }
- // Fetch instance of the Container from its context
- ApplicationContext ctx = adminContext.getSpringContainer();
- // Query the container for objects that implement
- // ControllerLifecycle interface. These
- // objects are typically of type AnalysisEngineController or
- // UimacppServiceController.
- String[] asyncServiceList = ctx.getBeanNamesForType(org.apache.uima.aae.controller.ControllerLifecycle.class);
- // Given a valid list of controllers select the first from the list
- // and
- // initiate a shutdown. We don't care which controller will be
- // invoked. In case of
- // AggregateAnalysisEngineController the terminate event will
- // propagate all the way
- // to the top controller in the hierarchy and the shutdown will take
- // place from there.
- // If the controller is of kind UimecppServiceController or
- // PrimitiveAnalysisController
- // the termination logic will be immediately triggered in the
- // terminate() method.
- if (asyncServiceList != null && asyncServiceList.length > 0)
- {
- boolean topLevelController = false;
+ /**
+ * Undeploys Spring container with a given container Id. All deployed Spring containers are
+ * registered in the local registry under a unique id.
+ *
+ */
+ public void undeploy(String aSpringContainerId, int stop_level) throws Exception
+
+ {
+ if (aSpringContainerId == null) {
+ return;
+ }
+
+ UimaEEAdminSpringContext adminContext = null;
+ synchronized (springContainerRegistry) {
+ if (!springContainerRegistry.containsKey(aSpringContainerId)) {
+ return;
+ // throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId +
+ // ". Unable to undeploy the Spring container");
+ }
+ // Fetch an administrative context which contains a Spring Container
+ adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
+ if (adminContext == null) {
+ throw new InvalidContainerException(
+ "Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
+ }
+ // Fetch instance of the Container from its context
+ ApplicationContext ctx = adminContext.getSpringContainer();
+ // Query the container for objects that implement
+ // ControllerLifecycle interface. These
+ // objects are typically of type AnalysisEngineController or
+ // UimacppServiceController.
+ String[] asyncServiceList = ctx
+ .getBeanNamesForType(org.apache.uima.aae.controller.ControllerLifecycle.class);
+ // Given a valid list of controllers select the first from the list
+ // and
+ // initiate a shutdown. We don't care which controller will be
+ // invoked. In case of
+ // AggregateAnalysisEngineController the terminate event will
+ // propagate all the way
+ // to the top controller in the hierarchy and the shutdown will take
+ // place from there.
+ // If the controller is of kind UimecppServiceController or
+ // PrimitiveAnalysisController
+ // the termination logic will be immediately triggered in the
+ // terminate() method.
+ if (asyncServiceList != null && asyncServiceList.length > 0) {
+ boolean topLevelController = false;
ControllerLifecycle ctrer = null;
int indx = 0;
- while( !topLevelController ) {
- ctrer = (ControllerLifecycle) ctx.getBean(asyncServiceList[indx++]);
- if ( ctrer instanceof UimacppServiceController ||
- ((AnalysisEngineController)ctrer).isTopLevelComponent() ) {
- topLevelController = true;
- }
- }
- // Send a trigger to initiate shutdown.
- if (ctrer != null ) {
- if ( ctrer instanceof AnalysisEngineController) {
- ((AnalysisEngineController) ctrer).getControllerLatch().release();
- }
- switch( stop_level ) {
- case SpringContainerDeployer.QUIESCE_AND_STOP:
- ((AnalysisEngineController)ctrer).quiesceAndStop();
- break;
+ while (!topLevelController) {
+ ctrer = (ControllerLifecycle) ctx.getBean(asyncServiceList[indx++]);
+ if (ctrer instanceof UimacppServiceController
+ || ((AnalysisEngineController) ctrer).isTopLevelComponent()) {
+ topLevelController = true;
+ }
+ }
+ // Send a trigger to initiate shutdown.
+ if (ctrer != null) {
+ if (ctrer instanceof AnalysisEngineController) {
+ ((AnalysisEngineController) ctrer).getControllerLatch().release();
+ }
+ switch (stop_level) {
+ case SpringContainerDeployer.QUIESCE_AND_STOP:
+ ((AnalysisEngineController) ctrer).quiesceAndStop();
+ break;
case SpringContainerDeployer.STOP_NOW:
- ((AnalysisEngineController)ctrer).terminate();
+ ((AnalysisEngineController) ctrer).terminate();
break;
- }
- }
- }
- if (ctx instanceof FileSystemXmlApplicationContext)
- {
- ((FileSystemXmlApplicationContext) ctx).destroy();
- }
- // Remove the container from a local registry
- springContainerRegistry.remove(aSpringContainerId);
- }
- }
-
- /**
- * Use dd2spring to generate Spring context file from a given deployment
- * descriptor file.
- *
- * @param aDeploymentDescriptor -
- * deployment descriptor to generate Spring Context from
- * @param anApplicationContext -
- * a Map containing properties required by dd2spring
- * @return - an absolute path to the generated Spring Context file
- *
- * @throws Exception -
- * if failure occurs
- */
- private String generateSpringContext(String aDeploymentDescriptor, Map anApplicationContext) throws Exception
- {
+ }
+ }
+ }
+ if (ctx instanceof FileSystemXmlApplicationContext) {
+ ((FileSystemXmlApplicationContext) ctx).destroy();
+ }
+ // Remove the container from a local registry
+ springContainerRegistry.remove(aSpringContainerId);
+ }
+ }
- String dd2SpringXsltFilePath = null;
- String saxonClasspath = null;
+ /**
+ * Use dd2spring to generate Spring context file from a given deployment descriptor file.
+ *
+ * @param aDeploymentDescriptor
+ * - deployment descriptor to generate Spring Context from
+ * @param anApplicationContext
+ * - a Map containing properties required by dd2spring
+ * @return - an absolute path to the generated Spring Context file
+ *
+ * @throws Exception
+ * - if failure occurs
+ */
+ private String generateSpringContext(String aDeploymentDescriptor, Map anApplicationContext)
+ throws Exception {
- if (anApplicationContext.containsKey(UimaAsynchronousEngine.DD2SpringXsltFilePath))
- {
- dd2SpringXsltFilePath = (String) anApplicationContext.get(UimaAsynchronousEngine.DD2SpringXsltFilePath);
- }
- else
- {
- throw new ResourceConfigurationException(ResourceConfigurationException.MANDATORY_VALUE_MISSING, new Object[] { "Xslt File Path" });
- }
- if (anApplicationContext.containsKey(UimaAsynchronousEngine.SaxonClasspath))
- {
- saxonClasspath = (String) anApplicationContext.get(UimaAsynchronousEngine.SaxonClasspath);
- }
- else
- {
- throw new ResourceConfigurationException(ResourceConfigurationException.MANDATORY_VALUE_MISSING, new Object[] { "Saxon Classpath" });
- }
+ String dd2SpringXsltFilePath = null;
+ String saxonClasspath = null;
- Dd2spring dd2Spring = new Dd2spring();
- File springContextFile = dd2Spring.convertDd2Spring(aDeploymentDescriptor, dd2SpringXsltFilePath, saxonClasspath, (String) anApplicationContext.get(UimaAsynchronousEngine.UimaEeDebug));
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.DD2SpringXsltFilePath)) {
+ dd2SpringXsltFilePath = (String) anApplicationContext
+ .get(UimaAsynchronousEngine.DD2SpringXsltFilePath);
+ } else {
+ throw new ResourceConfigurationException(
+ ResourceConfigurationException.MANDATORY_VALUE_MISSING,
+ new Object[] { "Xslt File Path" });
+ }
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.SaxonClasspath)) {
+ saxonClasspath = (String) anApplicationContext.get(UimaAsynchronousEngine.SaxonClasspath);
+ } else {
+ throw new ResourceConfigurationException(
+ ResourceConfigurationException.MANDATORY_VALUE_MISSING,
+ new Object[] { "Saxon Classpath" });
+ }
+
+ Dd2spring dd2Spring = new Dd2spring();
+ File springContextFile = dd2Spring.convertDd2Spring(aDeploymentDescriptor,
+ dd2SpringXsltFilePath, saxonClasspath, (String) anApplicationContext
+ .get(UimaAsynchronousEngine.UimaEeDebug));
- return springContextFile.getAbsolutePath();
- }
+ return springContextFile.getAbsolutePath();
+ }
- /**
- * Deploys provided context files ( and beans) in a new Spring container.
- *
- */
- protected String deploySpringContainer(String[] springContextFiles) throws ResourceInitializationException {
+ /**
+ * Deploys provided context files ( and beans) in a new Spring container.
+ *
+ */
+ protected String deploySpringContainer(String[] springContextFiles)
+ throws ResourceInitializationException {
- SpringContainerDeployer springDeployer =
- new SpringContainerDeployer();
- try
- {
- return springDeployer.deploy(springContextFiles);
- }
- catch( ResourceInitializationException e)
- {
- // turn on the global flag so that the stop() can do the cleanup
- running = true;
- throw e;
- }
- }
-
-
-
- protected void waitForServiceNotification() throws Exception
- {
- while( !serviceInitializationCompleted ) {
- if ( serviceInitializationException ) {
- throw new ResourceInitializationException();
- }
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
- }
- try {
- serviceSemaphore.acquire();
- } catch( InterruptedException e) {
- } finally {
- serviceSemaphore.release();
- }
- if ( serviceInitializationException ) {
- throw new ResourceInitializationException();
- }
- }
- }
-
-
- protected void deployEmbeddedBroker() throws Exception
- {
- // TBI
- }
-
- public static void main(String[] args)
- {
- try
- {
-
- BaseUIMAAsynchronousEngineCommon_impl uimaee = new BaseUIMAAsynchronousEngine_impl();
-
- Map appContext = new HashMap();
- appContext.put(UimaAsynchronousEngine.DD2SpringXsltFilePath, args[1]);
- appContext.put(UimaAsynchronousEngine.SaxonClasspath, args[2]);
- String containerId = uimaee.deploy(args[0], appContext); // args[1],
- // args[2]);
-
- uimaee.undeploy(containerId);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- public void setReleaseCASMessage(TextMessage msg, String aCasReferenceId)
- throws Exception
- {
- msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.ReleaseCAS);
- msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
- msg.setJMSReplyTo(consumerDestination);
- }
+ SpringContainerDeployer springDeployer = new SpringContainerDeployer();
+ try {
+ return springDeployer.deploy(springContextFiles);
+ } catch (ResourceInitializationException e) {
+ // turn on the global flag so that the stop() can do the cleanup
+ running = true;
+ throw e;
+ }
+ }
+
+ protected void waitForServiceNotification() throws Exception {
+ while (!serviceInitializationCompleted) {
+ if (serviceInitializationException) {
+ throw new ResourceInitializationException();
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
+ }
+ try {
+ serviceSemaphore.acquire();
+ } catch (InterruptedException e) {
+ } finally {
+ serviceSemaphore.release();
+ }
+ if (serviceInitializationException) {
+ throw new ResourceInitializationException();
+ }
+ }
+ }
+
+ protected void deployEmbeddedBroker() throws Exception {
+ // TBI
+ }
+
+ public static void main(String[] args) {
+ try {
+
+ BaseUIMAAsynchronousEngineCommon_impl uimaee = new BaseUIMAAsynchronousEngine_impl();
+
+ Map appContext = new HashMap();
+ appContext.put(UimaAsynchronousEngine.DD2SpringXsltFilePath, args[1]);
+ appContext.put(UimaAsynchronousEngine.SaxonClasspath, args[2]);
+ String containerId = uimaee.deploy(args[0], appContext); // args[1],
+ // args[2]);
+
+ uimaee.undeploy(containerId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void setReleaseCASMessage(TextMessage msg, String aCasReferenceId) throws Exception {
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.ReleaseCAS);
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ msg.setJMSReplyTo(consumerDestination);
+ }
public void notifyOnInitializationFailure(Exception e) {
- notifyOnInitializationFailure(null, e);
+ notifyOnInitializationFailure(null, e);
}
public void notifyOnInitializationSuccess() {
- notifyOnInitializationSuccess(null);
+ notifyOnInitializationSuccess(null);
}
public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
- // Initialization exception. Notify blocking thread and indicate a problem
- serviceInitializationException = true;
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
- }
- serviceSemaphore.release();
- }
-
- public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
- serviceInitializationCompleted = true;
- serviceSemaphore.release();
- }
-
- public void notifyOnTermination(String message) {
-
- }
-
- protected MessageProducer getMessageProducer( Destination destination ) throws Exception
- {
- return sender.getMessageProducer(destination);
- }
+ // Initialization exception. Notify blocking thread and indicate a problem
+ serviceInitializationException = true;
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_container_init_exception__WARNING", new Object[] { e });
+ }
+ serviceSemaphore.release();
+ }
+
+ public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
+ serviceInitializationCompleted = true;
+ serviceSemaphore.release();
+ }
+
+ public void notifyOnTermination(String message) {
+
+ }
+
+ protected MessageProducer getMessageProducer(Destination destination) throws Exception {
+ return sender.getMessageProducer(destination);
+ }
+
/**
- * Request Uima AS client to initiate sending Stop requests to a service for all outstanding
- * CASes awaiting reply.
+ * Request Uima AS client to initiate sending Stop requests to a service for all outstanding CASes
+ * awaiting reply.
*
*/
public void stopProducingCases() {
- List<DelegateEntry> outstandingCasList =
- serviceDelegate.getDelegateCasesPendingRepy();
- for( DelegateEntry entry : outstandingCasList) {
- // The Cas is still being processed
- ClientRequest clientCachedRequest =
- (ClientRequest)clientCache.get(entry.getCasReferenceId());
- if ( clientCachedRequest != null &&
- !clientCachedRequest.isMetaRequest() &&
- clientCachedRequest.getCasReferenceId() != null) {
- stopProducingCases(clientCachedRequest);
- }
+ List<DelegateEntry> outstandingCasList = serviceDelegate.getDelegateCasesPendingRepy();
+ for (DelegateEntry entry : outstandingCasList) {
+ // The Cas is still being processed
+ ClientRequest clientCachedRequest = (ClientRequest) clientCache
+ .get(entry.getCasReferenceId());
+ if (clientCachedRequest != null && !clientCachedRequest.isMetaRequest()
+ && clientCachedRequest.getCasReferenceId() != null) {
+ stopProducingCases(clientCachedRequest);
+ }
}
}
+
/**
- * Request Uima AS client to initiate sending Stop request to a service for a given CAS id
- * If the service is a Cas Multiplier, it will stop producing new CASes, will wait until all
- * child CASes finish and finally returns the input CAS.
+ * Request Uima AS client to initiate sending Stop request to a service for a given CAS id If the
+ * service is a Cas Multiplier, it will stop producing new CASes, will wait until all child CASes
+ * finish and finally returns the input CAS.
*
*/
public void stopProducingCases(String aCasReferenceId) {
// The Cas is still being processed
- ClientRequest clientCachedRequest =
- (ClientRequest)clientCache.get(aCasReferenceId);
- if ( clientCachedRequest != null ) {
- stopProducingCases(clientCachedRequest);
+ ClientRequest clientCachedRequest = (ClientRequest) clientCache.get(aCasReferenceId);
+ if (clientCachedRequest != null) {
+ stopProducingCases(clientCachedRequest);
}
}
+
private void stopProducingCases(ClientRequest clientCachedRequest) {
try {
- if ( clientCachedRequest.getFreeCasNotificationQueue() != null ) {
+ if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
TextMessage msg = createTextMessage();
- msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- msg.setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest.getCasReferenceId());
- msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+ msg
+ .setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest
+ .getCasReferenceId());
+ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
try {
- MessageProducer msgProducer =
- getMessageProducer(clientCachedRequest.getFreeCasNotificationQueue());
- if ( msgProducer != null ) {
- System.out.println(">>> Client Sending Stop to Service for CAS:"+clientCachedRequest.getCasReferenceId()+" Destination:"+clientCachedRequest.getFreeCasNotificationQueue() );
- // Send STOP message to Cas Multiplier Service
+ MessageProducer msgProducer = getMessageProducer(clientCachedRequest
+ .getFreeCasNotificationQueue());
+ if (msgProducer != null) {
+ System.out.println(">>> Client Sending Stop to Service for CAS:"
+ + clientCachedRequest.getCasReferenceId() + " Destination:"
+ + clientCachedRequest.getFreeCasNotificationQueue());
+ // Send STOP message to Cas Multiplier Service
msgProducer.send(msg);
} else {
- System.out.println(">>> UIMA AS Client Unable to Send Stop To Service. Message Producer Not Initialized");
+ System.out
+ .println(">>> UIMA AS Client Unable to Send Stop To Service. Message Producer Not Initialized");
}
-
- } catch( Exception ex) {
+
+ } catch (Exception ex) {
System.out.println("Client Unable to send STOP Request to Service. Reason:");
ex.printStackTrace();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] {Thread.currentThread().getId(), ex});
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING",
+ new Object[] { Thread.currentThread().getId(), ex });
}
}
}
- } catch ( Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] {Thread.currentThread().getId(), e});
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", new Object[] { Thread.currentThread().getId(), e });
}
}
}
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/InvalidContainerException.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/InvalidContainerException.java?rev=810548&r1=810547&r2=810548&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/InvalidContainerException.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/InvalidContainerException.java Wed Sep 2 15:16:54 2009
@@ -19,26 +19,21 @@
package org.apache.uima.adapter.jms.client;
-public class InvalidContainerException extends Exception
-{
+public class InvalidContainerException extends Exception {
- public InvalidContainerException()
- {
- }
-
- public InvalidContainerException(String message)
- {
- super(message);
- }
-
- public InvalidContainerException(Throwable cause)
- {
- super(cause);
- }
-
- public InvalidContainerException(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public InvalidContainerException() {
+ }
+
+ public InvalidContainerException(String message) {
+ super(message);
+ }
+
+ public InvalidContainerException(Throwable cause) {
+ super(cause);
+ }
+
+ public InvalidContainerException(String message, Throwable cause) {
+ super(message, cause);
+ }
}