You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2009/10/03 19:46:50 UTC

svn commit: r821374 [1/2] - in /incubator/uima/uimacpp/trunk/src/utils: ActiveMQAnalysisEngineService.cpp ActiveMQAnalysisEngineService.hpp deployCppService.cpp deployCppService.hpp

Author: eae
Date: Sat Oct  3 17:46:49 2009
New Revision: 821374

URL: http://svn.apache.org/viewvc?rev=821374&view=rev
Log:
UIMA-799 commit Bhavani's uimacpp-799.patch

Modified:
    incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp
    incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp
    incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp
    incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp

Modified: incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp?rev=821374&r1=821373&r2=821374&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp Sat Oct  3 17:46:49 2009
@@ -51,11 +51,44 @@
 #define LOGINFO(n,x) { if (n > uimacpp_ee_tracelevel) {} else { FORMATMSG(x); logMessage(lstr.str().c_str()); } }
 #define LOGERROR(x){ FORMATMSG(x); logError(lstr.str().c_str()); }
 #define LOGWARN(x) { FORMATMSG(x); logWarning(lstr.str().c_str());}
+
+static void listener_signal_handler(int signum) {
+    stringstream str;
+    str << __FILE__ << __LINE__ << " Received Signal: " << signum;
+    cerr << str.str() << endl;    
+}
+
+
+//=================================================================
+//
+//Message processing function executed by message handling threads.
+//-----------------------------------------------------------------
+static void* APR_THREAD_FUNC handleMessages(apr_thread_t *thd, void *data) {  
+   //cout << __FILE__ << __LINE__ << Thread::getId() << " handleMessages start " << endl;
+   AMQListener * handler = (AMQListener*) data;
+   handler->receiveAndProcessMessages(thd);
+   apr_thread_exit(thd, APR_SUCCESS);
+   return NULL;
+}
+
+
 //===================================================
 //AMQConnection
 //---------------------------------------------------
-  AMQConnection::AMQConnection( const char * aBrokerURL, Monitor * pStatistics) : 
-                                      iv_brokerURL(aBrokerURL), 
+ ConnectionFactory * AMQConnection::createConnectionFactory(ServiceParameters & params) {
+   stringstream str;
+   str << params.getBrokerURL() << "?jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() << endl;
+   ConnectionFactory * pConnFactory = ConnectionFactory::createCMSConnectionFactory(str.str());
+   cout << "AMQConnection()::createConnectionFactory " <<  params.getBrokerURL() << " prefetch=" << params.getPrefetchSize() << endl;
+   return pConnFactory;
+ };
+
+ AMQConnection::AMQConnection(  ConnectionFactory * connFact, 
+                                      Monitor * pMonitor, int id) :
+                                      iv_id(id),
+                                      iv_pConnFact(connFact),
+                                      //iv_pMonitor(0),
+                                      iv_brokerURL(((ActiveMQConnectionFactory*)connFact)->getBrokerURL()), 
                                       iv_pConnection(0), 
                                       iv_pConsumerSession(0),
                                       iv_pConsumer(0),
@@ -64,22 +97,29 @@
                                       iv_pListener(0),
                                       iv_pProducerSession(0),
                                       iv_pProducer(0),
-                                      iv_pTextMessage(0),
+                                      iv_pReplyMessage(0),
                                       iv_replyDestinations(),
-                                      iv_valid(false) {
+                                      iv_valid(false),
+                                      iv_started(false),
+                                      iv_reconnecting(false)  {
+    int pos  = iv_brokerURL.find_first_of("?");
+    if (pos != string::npos) {
+      iv_brokerURL = iv_brokerURL.substr(0,pos);
+    }
+    iv_pMonitor = pMonitor;
+    initialize();
+ }
+ void AMQConnection::initialize( ) {
 
     try {
-      iv_pMonitor = pStatistics;
+  
       LOGINFO(INFO, "AMQConnection() connecting to " + iv_brokerURL);
-      // Create a ConnectionFactory
-      ActiveMQConnectionFactory* connectionFactory = 
-        new ActiveMQConnectionFactory(iv_brokerURL);
-
+      
       // Create a Connection
-      if (connectionFactory == NULL) {	
-        LOGERROR("AMQConnection() could not create connection factory");
+      if (iv_pConnFact == NULL) {	
+        LOGERROR("AMQConnection() invalid connection factory");
         ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
-        msg.addParam("AMQConnection() could not create connection factory");
+        msg.addParam("AMQConnection() invalid create connection factory");
         ErrorInfo errInfo;
         errInfo.setMessage(msg);
         UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
@@ -88,25 +128,40 @@
           errInfo.getMessage().getMessageID(),
           ErrorInfo::unrecoverable);
       }
-      this->iv_pConnection = connectionFactory->createConnection();
-      if (this->iv_pConnection == NULL) {
-        LOGERROR("AMQConnection() could not create connection.");
-        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
-        msg.addParam("AMQConnection() could not create connection to " + iv_brokerURL);
-        ErrorInfo errInfo;
-        errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
-        errInfo.setMessage(msg);
-        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
-          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
-          errInfo.getMessage(),
-          errInfo.getMessage().getMessageID(),
-          ErrorInfo::unrecoverable);
-      } else {
-        //default exception listener
-        this->iv_pConnection->setExceptionListener(this);	
+
+      bool retrying = false;
+      while (iv_pConnection == NULL) {
+        try {
+          iv_pConnection = iv_pConnFact->createConnection();
+          if (iv_pConnection == NULL ) {
+            if (!retrying) {
+              stringstream str;
+              str << " AMQConnection::initialize() Connection object is null. Failed to connect to " << iv_brokerURL
+                         << ". Retrying..." << endl;
+              LOGWARN(str.str());
+              retrying = true;
+              apr_sleep(30000000); //wait 30 seconds to reconnect
+            }
+          }
+        } catch (cms::CMSException& e) {
+          if (!retrying) {
+            stringstream str;
+            str << "AMQConnection::initialize() Failed to connect to " << iv_brokerURL
+                         << e.getMessage() << ". Retrying..." << endl;
+            LOGWARN(str.str());
+            retrying = true;
+            apr_sleep(30000000); //wait 30 seconds to reconnect
+          }
+        } 
+      }
+     
+      if (retrying) {
+        LOGWARN("AMQConnection::initialize() Connected to " + iv_brokerURL);
       }
 
-      delete connectionFactory;
+      //default exception listener
+      this->iv_pConnection->setExceptionListener(this);	
+      
       // Create a Producer Session
       LOGINFO(FINEST,"AMQConnection() create Producer Session " + iv_brokerURL);
       this->iv_pProducerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE );
@@ -139,8 +194,8 @@
       this->iv_pProducer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
 
       //create TextMessage
-      this->iv_pTextMessage = this->iv_pProducerSession->createTextMessage();
-      if (this->iv_pTextMessage == NULL) {
+      this->iv_pReplyMessage = this->iv_pProducerSession->createTextMessage();
+      if (this->iv_pReplyMessage == NULL) {
         LOGERROR("AMQConnection() create textMessage failed. ");
         ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
         msg.addParam("AMQConnection() failed to create message.");
@@ -209,9 +264,9 @@
       delete this->iv_pConnection;
       this->iv_pConnection = NULL;
     }
-    if (this->iv_pTextMessage != NULL) {
-      delete this->iv_pTextMessage;
-      this->iv_pTextMessage=NULL;
+    if (this->iv_pReplyMessage != NULL) {
+      delete this->iv_pReplyMessage;
+      this->iv_pReplyMessage=NULL;
     } 
     
     //destinations
@@ -222,20 +277,13 @@
   }
 
 /* create a MessageConsumer session and register a MessageListener
-to receive messages from the input queue. */
-  void AMQConnection::createMessageConsumer(string queueName,
-                                        MessageListener * listener,
-                                        int prefetch) {
+   to receive messages from the input queue. */
+  void AMQConnection::createMessageConsumer(string queueName, string selector) {
     LOGINFO(FINEST, "AMQConnection::createMessageConsumer() consumer start " + queueName);
     this->iv_inputQueueName = queueName;
     stringstream str;
-    //we add one to prefetch size to get the same behavior as the
-    //Spring listener used in UIMA Java SDK which calls the
-    //ActiveMQMessageConsumer.receive() api.
-    //create endpoint destination
     str << queueName;
-    str << "?consumer.prefetchSize=" << prefetch+1 << endl;	
-
+   
     if (this->iv_pConsumerSession != NULL || iv_pConsumer != NULL) {
       LOGERROR("AMQConnection::createMessageConsumer() A session already exists. ");
       ErrorInfo errInfo;
@@ -278,7 +326,13 @@
         ErrorInfo::unrecoverable);
     }
 
-    this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue);
+    iv_selector = selector;
+    if (selector.length() > 0) {
+      this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue, iv_selector);
+    } else {
+      this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue);
+    }
+
     if (this->iv_pConsumer == NULL) {
       LOGERROR("AMQConnection::createMessageConsumer() createConsumer failed. " + queueName);
       ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
@@ -291,28 +345,24 @@
         errInfo.getMessage().getMessageID(),
         ErrorInfo::unrecoverable);
     }
-    //register listener
-    this->iv_pConsumer->setMessageListener(listener); //caller owns listener 
-    this->iv_pListener = listener;	 
+    
     LOGINFO(FINEST, "AMQConnection::createMessageConsumer() " + queueName + " successful.");
   }
 
-//caller owns the ExceptionListener
-  void AMQConnection::setExceptionListener(ExceptionListener  * el) {
-    this->iv_pConnection->setExceptionListener(el);
-  }
 
   void AMQConnection::onException(const CMSException & ex) {
     //mark endpoint as broken.
     this->iv_valid = false;
     //log that connection is invalid.
-    LOGWARN("AMQConnection()::onException() Connection to " + iv_brokerURL 
-                       + " may be broken. " + ex.getMessage());
+    stringstream str;
+    str << "AMQConnection()::onException() Connection to " 
+                       << iv_brokerURL << " is broken. Reconnecting ... " << ex.getMessage() << endl;
+    LOGWARN(str.str());
   }
-
+ 
 
   TextMessage * AMQConnection::getTextMessage() {
-    if (this->iv_pTextMessage == NULL) {
+    if (this->iv_pReplyMessage == NULL) {
       LOGERROR("AMQConnection::getTextMessage() failed. ");
       ErrorInfo errInfo;
       errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "TextMessage could not be created."));
@@ -322,9 +372,9 @@
         errInfo.getMessage().getMessageID(),
         ErrorInfo::unrecoverable);
     }
-    iv_pTextMessage->clearProperties();
-    iv_pTextMessage->clearBody();
-    return this->iv_pTextMessage;
+    iv_pReplyMessage->clearProperties();
+    iv_pReplyMessage->clearBody();
+    return this->iv_pReplyMessage;
   }
 
   void AMQConnection::sendMessage(string queuename) {
@@ -364,11 +414,11 @@
         ErrorInfo::unrecoverable);
     }
 
-    this->iv_pProducer->send(pDest,this->iv_pTextMessage);
+    this->iv_pProducer->send(pDest,this->iv_pReplyMessage);
 
     //cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl;
-    this->iv_pTextMessage->clearBody();
-    this->iv_pTextMessage->clearProperties();
+    this->iv_pReplyMessage->clearBody();
+    this->iv_pReplyMessage->clearProperties();
     LOGINFO(FINEST, "AMQConnection::sendMessage() successful to " + queuename);
   }
 
@@ -384,11 +434,11 @@
         errInfo.getMessage().getMessageID(),
         ErrorInfo::unrecoverable);
     }
-    this->iv_pProducer->send(cmsReplyTo,this->iv_pTextMessage);
+    this->iv_pProducer->send(cmsReplyTo,this->iv_pReplyMessage);
 
     //cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl;
-    this->iv_pTextMessage->clearBody();
-    this->iv_pTextMessage->clearProperties();
+    this->iv_pReplyMessage->clearBody();
+    this->iv_pReplyMessage->clearProperties();
     LOGINFO(4, "AMQConnection::sendMessage() successful to " + cmsReplyTo->toProviderString());
   }
 
@@ -429,12 +479,86 @@
     }
   }
 
+  void AMQConnection::resetBeforeReconnect() {
+    this->iv_reconnecting = true;
+
+    if (this->iv_pConsumer != NULL) {
+      delete this->iv_pConsumer;
+      this->iv_pConsumer = NULL;
+    }
+    if (this->iv_pProducer != NULL) {
+      delete this->iv_pProducer;
+      this->iv_pProducer = NULL;
+    }
+    if (this->iv_pConsumerSession != NULL) {
+      delete this->iv_pConsumerSession;
+      this->iv_pConsumerSession = NULL;
+    }
+    if (this->iv_pProducerSession != NULL) {
+      delete this->iv_pProducerSession;
+      this->iv_pProducerSession = NULL;
+    }
+    if (this->iv_pConnection != NULL) {
+      delete this->iv_pConnection;
+      this->iv_pConnection = NULL;
+    }
+
+    this->iv_started = false;
+    this->iv_valid = false;
+
+    if (this->iv_pReplyMessage != NULL) {
+      delete this->iv_pReplyMessage;
+      this->iv_pReplyMessage=NULL;
+    } 
+    
+    //destinations
+    map<string, cms::Destination*>::iterator ite; 
+    for (ite= iv_replyDestinations.begin();ite !=  iv_replyDestinations.end();ite++) {
+      delete ite->second;
+    }
+  }
+
+  void AMQConnection::reconnect() {
+    try {
+        this->iv_reconnecting = true;
+        this->iv_pMonitor->reconnecting(iv_id);
+        apr_sleep(30000000); //wait 30 seconds to reconnect
+        //cout << "AMQConnection::reconnect() calling initialize >>>" << endl;
+        this->initialize();
+        this->createMessageConsumer(this->iv_inputQueueName,this->iv_selector);
+        this->start();
+        stringstream str;
+        str << "AMQConnection::reconnect() Successfully reconnected to >>> " <<  iv_brokerURL << endl;
+        LOGWARN(str.str());
+        this->iv_pMonitor->reconnectionSuccess(iv_id);
+        this->iv_reconnecting = false;
+     } catch (uima::Exception e) {
+       cerr << "AMQConnection::reconnect() " << e << endl;	
+       resetBeforeReconnect();
+     } catch (...) {
+       cerr << "AMQConnection::reconnect() catch ... " << endl;	
+       resetBeforeReconnect();
+     }
+
+  }
+  
 
+  Message * AMQConnection::receive(const int delay) {
+      if (iv_valid) {
+        return iv_pConsumer->receive(delay);
+      } else {
+        this->resetBeforeReconnect();
+        this->reconnect();
+        this->iv_valid = true;
+        return NULL;
+      }
+  }
 
 //===================================================
 //AMQConnectionCache
 //---------------------------------------------------
-  AMQConnectionsCache::AMQConnectionsCache(Monitor * stats) {
+  AMQConnectionsCache::AMQConnectionsCache(ConnectionFactory * pConnFact,Monitor * stats) {
+    iv_pConnFact = pConnFact;
     iv_pMonitor = stats;
   }
 
@@ -462,7 +586,7 @@
       if (ite == iv_connections.end()) {
         LOGINFO(FINE,"AMQConnectionsCache::getConnection() create new connection to " +
           brokerURL);
-        connection = new AMQConnection(brokerURL.c_str(), iv_pMonitor);
+        connection = new AMQConnection(iv_pConnFact, iv_pMonitor, iv_connections.size());
         if (connection == NULL) {
           LOGERROR("AMQConnectionCache::getConnection Could not create a endpoint connection to " +
             brokerURL);
@@ -480,7 +604,7 @@
             LOGWARN("AMQConnectionCache::getEndPoint() Existing connection invalid. Reconnecting to " + brokerURL );
             delete connection;
             this->iv_connections.erase(brokerURL);
-            connection = new AMQConnection(brokerURL.c_str(), iv_pMonitor);
+            connection = new AMQConnection(iv_pConnFact, iv_pMonitor, iv_connections.size());
             if (connection == NULL) {
               LOGERROR("AMQConnectionCache::getConnection() could not connect to "
                 + brokerURL );
@@ -534,7 +658,29 @@
                             iv_pEngine(ae),
                             iv_pCas(cas),
                             //iv_pMonitor(stats),
-                            iv_replyToConnections(stats),
+                            iv_replyToConnections(connection->iv_pConnFact,stats),
+                            iv_timeLastRequestCompleted(0),
+                            iv_busy(false),
+                            iv_stopProcessing(false),
+                            iv_count(0),
+                            iv_aeDescriptor(),
+                            iv_brokerURL(connection->getBrokerURL()),
+                            iv_inputQueueName(connection->getInputQueueName()) {
+
+    iv_pMonitor = stats;
+    getMetaData(iv_pEngine); 
+ 
+  }
+
+  AMQListener::AMQListener(int id,
+                           AMQConnection * connection,
+                           AnalysisEngine * ae,
+                           Monitor * stats) : 
+                            iv_id(id),
+                            iv_pConnection(connection),
+                            iv_pEngine(ae),
+                            //iv_pMonitor(stats),
+                            iv_replyToConnections(connection->iv_pConnFact, stats),
                             iv_timeLastRequestCompleted(0),
                             iv_busy(false),
                             iv_count(0),
@@ -543,20 +689,152 @@
                             iv_inputQueueName(connection->getInputQueueName()) {
 
     iv_pMonitor = stats;
-    //get AE descriptor as XML to use when processing GETMETA requests.
-    const AnalysisEngineMetaData & aeMetaData = iv_pEngine->getAnalysisEngineMetaData();			
+    getMetaData(iv_pEngine); 
+ 
+  }
+
+  AMQListener::~AMQListener() {
+
+  }
+
+  //extract AE descriptor as XML to use when processing GETMETA requests.
+ void AMQListener::getMetaData(AnalysisEngine * pEngine) {
+    if (pEngine == NULL) {
+      LOGERROR("AMQListener::getMetaData() Invalid handle to AnalysisEngine.");
+      ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
+      msg.addParam("AMQListener::getMetaData() Invalid handle to AnalysisEngine.");
+      ErrorInfo errInfo;
+      errInfo.setMessage(msg);
+      UIMA_EXC_THROW_NEW(Uima_runtime_error, 
+            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+            errInfo.getMessage(),
+            errInfo.getMessage().getMessageID(),
+            ErrorInfo::unrecoverable);
+    }
+    const AnalysisEngineMetaData & aeMetaData = pEngine->getAnalysisEngineMetaData();			
     icu::UnicodeString xmlBuffer;
-    xmlBuffer.insert(0, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-    iv_pEngine->getAnnotatorContext().getTaeSpecifier().toXMLBuffer(aeMetaData,
+    xmlBuffer.insert(0, "<?xml version=\"1.0\"?>");
+    pEngine->getAnnotatorContext().getTaeSpecifier().toXMLBuffer(aeMetaData,
       false, xmlBuffer);
     UnicodeStringRef uref(xmlBuffer.getBuffer(), xmlBuffer.length());
 
     this->iv_aeDescriptor = uref.asUTF8();
   }
 
-  AMQListener::~AMQListener() {
 
+
+bool AMQListener::validateRequest(const TextMessage * textMessage, string & errmsg) {
+  bool valid = true; 
+  
+  if ( textMessage->getCMSReplyTo() == NULL) 
+    LOGWARN("AMQListener::validateRequest() JMSReplyTo is not set. " );    
+  
+  if (textMessage->getCMSReplyTo() == NULL &&
+    !textMessage->propertyExists("MessageFrom") ) {
+     errmsg = "Reply to destination not set.";
+     return false;
+  }  
+
+  if (!textMessage->propertyExists("Command") ) {
+    errmsg = "Required property 'Command' is not set.";
+    LOGERROR("AMQListener::validateRequest " + errmsg);
+    valid = false;  
+  } else {
+    int command = textMessage->getIntProperty("Command");   
+    if (command != PROCESS_CAS_COMMAND &&
+        command != GET_META_COMMAND &&
+        command != CPC_COMMAND) {
+        stringstream str;
+        str << "Unexpected value for 'Command' " << command;
+        errmsg = str.str(); 
+        LOGERROR("AMQListener::validateRequest " + errmsg);
+        valid=false;
+    } else if (command == CPC_COMMAND) {
+      if (iv_pEngine == NULL) {
+        errmsg = "CPC request received but AnalysisEngine not available.";
+        LOGERROR("AMQListener::validateRequest() " + errmsg);
+        valid = false;
+      }
+    } else if (command == PROCESS_CAS_COMMAND) {
+      if (iv_pCas == NULL || iv_pEngine == NULL) {
+        errmsg = "Process Cas request but an AnalysisEngine and CAS not available.";
+        LOGERROR("AMQListener::validateRequest() " + errmsg);
+        valid = false;
+      }
+      if (!textMessage->propertyExists("Payload") ) {
+        errmsg = "Required property 'Payload' is not set.";
+        LOGERROR("AMQListener::validateRequest " + errmsg);
+        valid = false;  
+      } else {
+        int payload = textMessage->getIntProperty("Payload");
+        if (payload != XCAS_PAYLOAD && payload != XMI_PAYLOAD) {
+           stringstream str;
+           str << "Unexpected value for 'Payload' " << payload;
+           errmsg = str.str();
+           LOGERROR("AMQListener::validateRequest " + errmsg);
+           valid=false;
+        } 
+        string text = textMessage->getText().c_str();
+        if (text.length() == 0) {
+          errmsg = "There is no payload data. Nothing to process.";
+          LOGERROR("AMQListener::validateRequest " + errmsg);
+          valid = false;  
+        }
+      }
+    }
   }
+  return valid;
+}
+
+
+void AMQListener::receiveAndProcessMessages(apr_thread_t * thd) {
+ 
+  try {
+    this->thd = thd;
+    cout << "Instance: " << iv_id << " ThreadId: " << apr_os_thread_current() <<  " started." << endl;
+    //start receiving messages
+    this->iv_pConnection->start();
+    this->iv_stopProcessing = false;
+    apr_time_t lastStatsTime = apr_time_now(); 
+    while (!iv_stopProcessing) {
+
+      stringstream astr;
+     
+      //cout << "receiveAndProcess going to call recieve" << endl; 
+      Message * msg = this->iv_pConnection->receive(2000);
+
+      if (msg != NULL) {
+         iv_busy = true;
+         astr << this->iv_id;
+         astr << " *****Message#: "<< ++iv_count << "*****";
+         //cerr << astr.str() << endl;         
+         //LOGINFO(FINE,astr.str());
+         this->handleRequest(msg);
+         ///this->testHandleRequest(msg);
+         delete msg;
+         msg = 0;
+         //cerr << iv_id << " ****Message#: " << iv_count << " deleted " << endl;
+         iv_busy = false;
+         this->iv_timeLastRequestCompleted = apr_time_now();
+         //cout << iv_id << " processMessages done: getnext " << iv_count  << endl;
+      }
+    
+    }
+    LOGWARN("AMQListener::receiveAndProcessMessage() stopped receiving messages.");
+  } catch (uima::Exception ex ) {
+     LOGERROR("AMQListener::receiveAndProcessMessages() " + 
+       ex.getErrorInfo().getMessage().asString());
+     this->iv_stopProcessing = true;
+     //tell the monitor that the thread has stopped processing
+     iv_pMonitor->listenerStopped(this->iv_id);
+  } catch (...) {
+     LOGERROR("AMQListener::receiveAndProcessMessages UnExpected error.");
+     iv_stopProcessing = true;
+     //tell the monitor that the thread has stopped processing
+     iv_pMonitor->listenerStopped(this->iv_id);
+  }
+}
+
 
 /*
 * Receive a TextMessage and examine the header properties
@@ -564,106 +842,86 @@
 * one request at a time and is blocked till the requestis 
 * handled and the response sent.
 */
-  void AMQListener::onMessage( const Message* message ) {
+  void AMQListener::handleRequest( const Message* message ) {
 
 
     apr_time_t startTime = apr_time_now();
-    apr_time_t endTime;
-
-    apr_time_t startSerialize;
-    apr_time_t startDeserialize;
-    apr_time_t startAnnotatorProcess;
+    apr_time_t timeIdle = 0;
+    // Idle time is computed as the interval from time last request 
+    // was processed. If this is the first request, idle time is 
+    // computed from the time the service was started.  
+    if (iv_timeLastRequestCompleted != 0)
+      timeIdle =  startTime - iv_timeLastRequestCompleted;
+    else 
+      timeIdle = startTime - iv_pMonitor->getServiceStartTime(); 
 
+    this->iv_pMonitor->processingStarted(this->iv_id, startTime, timeIdle );
+    stringstream astr;
+    astr << this->iv_id;
+    astr << " ****handleRequest(): "<< iv_count << " start"<< endl;
+    LOGINFO(FINE,astr.str());
+    ///cout << astr.str() << endl;
+    ///LOGWARN(astr.str());
+    apr_time_t endTime = 0;
+    apr_time_t startSerialize = 0;
+    apr_time_t startDeserialize = 0;
+    apr_time_t startAnnotatorProcess = 0; 
+    apr_time_t startSendResponse = 0;
     apr_time_t timeToDeserializeCAS = 0;
     apr_time_t timeToSerializeCAS = 0;
     apr_time_t timeInAnalytic = 0;
-    apr_time_t timeIdle = apr_time_now() - iv_timeLastRequestCompleted;
 
     const TextMessage* textMessage=0;
     int command = 0;
 
-    stringstream astr;
-    astr << "*****Message#: " << ++iv_count << "*****" << endl;
-    LOGINFO(INFO,astr.str());
-    astr.seekp(0);
-
     try {	
-      iv_busy=true;
+    
       textMessage = dynamic_cast< const TextMessage* >( message );
       if (textMessage==0) {
-        LOGERROR("AMQListener::onMessage() invalid pointer to TextMessage");
+        LOGERROR("AMQListener::handleRequest() invalid pointer to TextMessage");
         endTime = apr_time_now();
-        iv_pMonitor->processingComplete(0,false,endTime-startTime);
+        iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime);
         this->iv_timeLastRequestCompleted = apr_time_now();
         return;
       }
       if (textMessage->propertyExists("MessageFrom")) {	
         LOGINFO(FINER,"Received from " + textMessage->getStringProperty("MessageFrom"));
-      } else {	   
-        LOGERROR("AMQListener::onMessage() ERROR MessageFrom not set.");
-        endTime = apr_time_now();
-        iv_pMonitor->processingComplete(0,false,endTime-startTime);
-        this->iv_timeLastRequestCompleted = apr_time_now();
-        return; 
-      }       
+      } 
 
-      if (textMessage->propertyExists("Command") ){
-        command = textMessage->getIntProperty("Command");
-      } else {
-        LOGERROR("AMQListener::onMessage() Required property Command not set.");
+      //validate request properties
+      string errormessage;
+      if (!validateRequest(textMessage, errormessage)) {
+        LOGERROR("Listener::handleRequest() " + errormessage);
         endTime = apr_time_now();
-        sendResponse(textMessage, timeToDeserializeCAS,
-          timeToSerializeCAS, timeInAnalytic,
-          timeIdle, endTime-startTime,
-          "Required property 'Command' not set." ,true);
+        sendResponse(textMessage, 0,0,0,timeIdle, endTime-startTime,
+          errormessage ,true);
         endTime = apr_time_now();
-        iv_pMonitor->processingComplete(0,false,endTime-startTime);
-        this->iv_timeLastRequestCompleted = apr_time_now();
+        iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime);
         return;
       }
-      astr.seekp(0);
+
+      command = textMessage->getIntProperty("Command");
+      astr.str("");
+      astr << "Received request Command: " << command ;
+      if (textMessage->propertyExists("CasReference")) {
+        astr << " CasReference " << textMessage->getStringProperty("CasReference");
+      }
+   
       astr << "Received request Command: " << command ;
       if (textMessage->propertyExists("CasReference")) {
         astr << " CasReference " << textMessage->getStringProperty("CasReference");
       }
       LOGINFO(INFO,astr.str());
-      astr.seekp(0);
+
       if (command == PROCESS_CAS_COMMAND) { //process CAS
-        //get the payload property
-        if (!textMessage->propertyExists("Payload")) {
-          LOGERROR("AMQListener::onMessage() Required property Payload not set.");
-          endTime = apr_time_now();
-          sendResponse(textMessage, timeToDeserializeCAS,
-            timeToSerializeCAS, timeInAnalytic,
-            timeIdle,endTime-startTime,
-            "Required property 'Command' not set." ,true);
-
-          endTime = apr_time_now();
-          iv_pMonitor->processingComplete(command,false,endTime-startTime);
-          this->iv_timeLastRequestCompleted = apr_time_now();
-          return;
-        }
+        LOGINFO(FINE,"Process CAS request start.");
         int payload = textMessage->getIntProperty("Payload");
         //get the text in the payload 
-        string text = textMessage->getText();
-        if (text.length() == 0) {
-          LOGERROR("AMQListener::onMessage() There is no payload data. Nothing to process.");
-          text = "There is no payload data. Nothing to process.";
-          endTime = apr_time_now();
-          sendResponse(textMessage, timeToDeserializeCAS,
-            timeToSerializeCAS, timeInAnalytic,
-            timeIdle, endTime-startTime,
-            text ,true);
-          endTime=apr_time_now();
-          iv_pMonitor->processingComplete(command,false,endTime-startTime);
-          this->iv_timeLastRequestCompleted = apr_time_now();
-          return;
-        }
+        string text = textMessage->getText().c_str();
 
-        astr  << "Payload: " << payload;
+        astr.str("");
+        astr << "Payload: " << payload << " Content: " << text ;
         LOGINFO(FINER, astr.str());
-        LOGINFO(FINER, "  Content: " +  text);
-        astr.seekp(0);
 
         //InputSource
         MemBufInputSource memIS((XMLByte const *)text.c_str(),
@@ -678,6 +936,7 @@
         //the CAS which will be sent with the
         //response.
         if (payload == XCAS_PAYLOAD) {
+          LOGINFO(FINEST, "AMQListener::handleRequest() XCAS serialization.");
           startDeserialize = apr_time_now();
           XCASDeserializer deserializer;
           deserializer.deserialize(memIS, *iv_pCas); 
@@ -693,32 +952,25 @@
           timeToSerializeCAS = apr_time_now() - startSerialize;
         } else if (payload == XMI_PAYLOAD) {
           //deserialize incoming xmi CAS data.
+          LOGINFO(FINEST, "AMQListener::handleRequest() XMI serialization.");
+          
           startDeserialize = apr_time_now();
           XmiSerializationSharedData sharedData;
           XmiDeserializer deserializer;
           deserializer.deserialize(memIS,*iv_pCas,sharedData);
           startAnnotatorProcess=apr_time_now();
           timeToDeserializeCAS = startAnnotatorProcess-startDeserialize;
-
+          LOGINFO(FINEST, "AMQListener::handleRequest() calling process.");
           iv_pEngine->process(*iv_pCas);
           startSerialize=apr_time_now();
           timeInAnalytic = startSerialize - startAnnotatorProcess;
 
           //serialize CAS 
+          LOGINFO(FINEST, "AMQListener::handleRequest() calling serialize.");
           XmiWriter xmiwriter(*iv_pCas, true, &sharedData);
           xmiwriter.write(xmlstr);
           timeToSerializeCAS = apr_time_now() - startSerialize;
-        } else {
-          xmlstr << "Invalid Payload " << payload;
-          LOGERROR("AMQListener::onMessage() " + xmlstr.str());
-          endTime=apr_time_now();
-          sendResponse(textMessage, timeToDeserializeCAS,
-            timeToSerializeCAS, timeInAnalytic,
-            timeIdle,endTime-startTime,xmlstr.str() ,true);
-          endTime=apr_time_now();
-          iv_pMonitor->processingComplete(command,false,endTime-startTime);
-          this->iv_timeLastRequestCompleted = apr_time_now();
-          return;
+          LOGINFO(FINEST, "AMQListener::handleRequest() done processing CAS.");
         }
         //done with this CAS.
         iv_pCas->reset(); 
@@ -732,80 +984,59 @@
                       xmlstr.str(),false);
         endTime=apr_time_now();
         iv_pMonitor->processingComplete(command,true,endTime-startTime,
-          timeToDeserializeCAS, timeInAnalytic, timeToSerializeCAS);
+          timeToDeserializeCAS, timeInAnalytic, timeToSerializeCAS,
+          endTime-startSendResponse);
         LOGINFO(FINE,"Process CAS finished.");		
       } else if (command ==  GET_META_COMMAND ) { //get Meta 	 
         LOGINFO(FINE, "Process getMeta request start.");
         endTime = apr_time_now();
+        startSendResponse = apr_time_now();
         sendResponse(textMessage, timeToDeserializeCAS,
                   timeToSerializeCAS, timeInAnalytic,
                   timeIdle, endTime-startTime,this->iv_aeDescriptor,false);			
         endTime=apr_time_now();
-        iv_pMonitor->processingComplete(command,true,endTime-startTime);
+        //record timing
+        iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse);
         LOGINFO(FINE,"Process getMeta request finished.");			
       } else if (command ==  CPC_COMMAND ) { //CPC       			
         LOGINFO(FINE, "Processing CollectionProcessComplete request start");				
         iv_pEngine->collectionProcessComplete();
         endTime = apr_time_now();
-        sendResponse(textMessage, timeToDeserializeCAS,
-                    timeToSerializeCAS, timeInAnalytic,
+        startSendResponse = apr_time_now();
+        sendResponse(textMessage, 0,
+                    0, timeInAnalytic,
                     timeIdle,endTime-startTime,"CPC completed.",false);
         endTime=apr_time_now();
-        iv_pMonitor->processingComplete(command,true,endTime-startTime);
+        iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse);
         LOGINFO(FINE, "Processing CollectionProcessComplete request finished.");	
-      } else {
-        endTime = apr_time_now();
-        stringstream str;
-        str << " Invalid Request " << command << endl;
-        LOGERROR(str.str());
-        sendResponse(textMessage, 
-                      timeToDeserializeCAS,
-                      timeToSerializeCAS, 
-                      timeInAnalytic,
-                      timeIdle,
-                      endTime-startTime,str.str(),true);
-        endTime = apr_time_now();
-        iv_pMonitor->processingComplete(0,false,endTime-startTime);
-      }
-      iv_busy=false;
-      iv_timeLastRequestCompleted = apr_time_now();
+      } 
     } catch (XMLException& e) {
       stringstream str;
-      str << "AMQListener::onMessage() XMLException." << e.getMessage();
+      str << "AMQListener::handleRequest XMLException." << e.getMessage();
       LOGERROR(str.str());
       endTime = apr_time_now();
+      startSendResponse = apr_time_now();
       sendResponse(textMessage, timeToDeserializeCAS,
                     timeToSerializeCAS, timeInAnalytic,
                     timeIdle, endTime-startTime,str.str(),true);
       endTime = apr_time_now();
-      iv_pMonitor->processingComplete(command,false,endTime-startTime);
-      iv_timeLastRequestCompleted = apr_time_now();
-      iv_busy=false;
-    }  catch (CMSException& e) {
-      LOGERROR("AMQListener::onMessage()" + e.getMessage());
-      endTime = apr_time_now();
-      iv_pMonitor->processingComplete(0,false,endTime-startTime);
-      iv_timeLastRequestCompleted = apr_time_now();
-      iv_busy=false;
+      iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse);
     } catch (uima::Exception e) {
-      LOGERROR("AMQListener::onMessage() UIMA Exception " + e.asString());
+      LOGERROR("AMQListener::handleRequest UIMA Exception " + e.asString());
       endTime = apr_time_now();
+      startSendResponse = apr_time_now();
       sendResponse(textMessage, timeToDeserializeCAS,
                       timeToSerializeCAS, timeInAnalytic,
                       timeIdle,endTime-startTime,e.asString(),true);
       endTime = apr_time_now();
-      iv_pMonitor->processingComplete(command,false,endTime-startTime);
-      iv_timeLastRequestCompleted = apr_time_now();
-      iv_busy=false;
+      iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse);
     }	catch(...) {
-      LOGERROR("AMQListener::onMessage() Unknown exception ");
+      LOGERROR("AMQListener::handleRequest Unknown exception ");
       //TODO: log / shurdown ?}   
-      endTime = apr_time_now();
-      iv_pMonitor->processingComplete(command,false,endTime-startTime);
-      iv_timeLastRequestCompleted = apr_time_now();
-      iv_busy=false;
+      endTime = apr_time_now(); 
+      iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startTime);
+
     }
-    iv_timeLastRequestCompleted = apr_time_now();
   }
 
   void AMQListener::sendResponse(const TextMessage * request, 
@@ -936,9 +1167,9 @@
        LOGINFO(FINER,"AMQListener::sendResponse DONE");
 
      } catch (CMSException& ex ) {
-       LOGERROR("AMQListener::onMessage()" +  ex.getMessage());
+       LOGERROR("AMQListener::handleMessage()" +  ex.getMessage());
      } catch (...)  { 
-       LOGERROR("AMQListener::onMessage() UnExpected error sending reply.");
+       LOGERROR("AMQListener::handleRequest() UnExpected error sending reply.");
      }
 }
 
@@ -951,8 +1182,9 @@
   }
 
   AMQAnalysisEngineService::AMQAnalysisEngineService(ServiceParameters & desc,
-                                                     Monitor * stats) : 
+                                                     Monitor * stats,  apr_pool_t * pool)  : 
                                         //iv_pMonitor(stats),
+                                        iv_pool(pool),
                                         iv_numInstances(desc.getNumberOfInstances()),
                                         iv_brokerURL(desc.getBrokerURL()),
                                         iv_inputQueueName(desc.getQueueName()),
@@ -962,10 +1194,11 @@
                                         iv_vecpConnections(),
                                         iv_vecpAnalysisEngines(),
                                         iv_vecpCas(),
+                                        iv_closed(false),
                                         iv_listeners() {
     try {
       iv_pMonitor = stats;
-      initialize();
+      initialize(desc);
     } catch (CMSException & e) {
       ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
       msg.addParam(e.getMessage());
@@ -979,13 +1212,34 @@
     }
   }
 
-  void AMQAnalysisEngineService::initialize() {
+  void AMQAnalysisEngineService::initialize(ServiceParameters & params) {
 
     try {
-      //create a connection for each instance
+      //create connection factory
+      LOGINFO(FINEST,"AMQAnalysisEngineService::initialize() Create connection factory");
+      this->iv_pConnFact = AMQConnection::createConnectionFactory(params);
+      if (iv_pConnFact == NULL) {
+        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
+        msg.addParam("AMQAnalysisEngineService::initialize() Failed to create connection factory.");
+        ErrorInfo errInfo;
+        errInfo.setMessage(msg);
+        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
+          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+          errInfo.getMessage(),
+          errInfo.getMessage().getMessageID(),
+          ErrorInfo::unrecoverable);
+      }
+      if (!uima::ResourceManager::hasInstance()) {
+        uima::ResourceManager::createInstance("ActiveMQAnalysisEngineService");
+      }
+      ErrorInfo errInfo;
+      UnicodeString ustr(this->iv_aeDescriptor.c_str());
+      UnicodeString ufn = ResourceManager::resolveFilename(ustr,ustr);
+
+      //create a AnalysisEngine and CAS for each instance
       for (int i=0; i < iv_numInstances; i++) {
-        //create a connection to MQ Broker 
-        AMQConnection * newConnection = new AMQConnection(this->iv_brokerURL.c_str(), iv_pMonitor);
+        //create the connection
+        AMQConnection * newConnection = new AMQConnection(this->iv_pConnFact, this->iv_pMonitor, i);
         if (newConnection == NULL) {
           LOGERROR("AMQAnalysisEngineService::initialize() Could not create ActiveMQ endpoint connection.");
           ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
@@ -998,20 +1252,8 @@
             errInfo.getMessage().getMessageID(),
             ErrorInfo::unrecoverable);
         }
-        newConnection->setExceptionListener(this);
+        /////newConnection->setExceptionListener(this);
         this->iv_vecpConnections.push_back(newConnection);
-      }
-
-      if (!uima::ResourceManager::hasInstance()) {
-        uima::ResourceManager::createInstance("ActiveMQAnalysisEngineService");
-      }
-      ErrorInfo errInfo;
-      UnicodeString ustr(this->iv_aeDescriptor.c_str());
-      UnicodeString ufn = ResourceManager::resolveFilename(ustr,ustr);
-
-      //create a AnalysisEngine and CAS for each instance
-      for (int i=0; i < iv_numInstances; i++) {
-
         //create AE
         AnalysisEngine * pEngine = uima::TextAnalysisEngine::createTextAnalysisEngine((UnicodeStringRef(ufn).asUTF8().c_str()),
           errInfo);
@@ -1020,7 +1262,7 @@
         } else {
           LOGERROR("AMQAnalysisEngineService::initializer() could not create AE" + errInfo.asString());
           ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
-          msg.addParam("AMQListener::initialize() create AE failed. " + errInfo.getMessage().asString() );
+          msg.addParam("AMQAnalysisEngineService::initialize() create AE failed. " + errInfo.getMessage().asString() );
           ErrorInfo errInfo;
           errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE),
             errInfo.setMessage(msg);
@@ -1055,17 +1297,16 @@
             ErrorInfo::unrecoverable);
         }
         this->iv_vecpCas.push_back(cas);
-      }
-
-      //create listeners and register these
-      for (int i=0; i < iv_numInstances; i++) {
+      
+        //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create listener " << endl;
+        //create listeners and register these
         AMQListener * newListener = new AMQListener(i,iv_vecpConnections.at(i),
           iv_vecpAnalysisEngines.at(i), iv_vecpCas.at(i),
           iv_pMonitor);
         if (newListener == NULL) {
           LOGERROR("AMQAnalysisEngineService::initialize() Could not create listener.");
           ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
-          msg.addParam("AnalysisEngineServcie::initialize() Could not create listener.");
+          msg.addParam("AnalysisEngineService::initialize() Could not create listener.");
           ErrorInfo errInfo;
           errInfo.setMessage(msg);
           UIMA_EXC_THROW_NEW(Uima_runtime_error, 
@@ -1075,14 +1316,55 @@
             ErrorInfo::unrecoverable);
         }
         this->iv_listeners[i] = newListener;
-
+        //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create message consumer " << endl;
         //create MessageConsumer session and register Listener       
-        this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,
-          newListener,this->iv_prefetchSize);
+        this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector);  
+     } 
+     
+     iv_pMonitor->setNumberOfInstances(iv_numInstances);
+     //cout <<  __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta " << endl;
+      //Fast GetMeta
+      //create connection 
+     LOGINFO(FINEST, "AMQAnalysisEngineService::initialize() Setup GETMETA instance.");
+     iv_pgetMetaConnection = new AMQConnection(this->iv_pConnFact, this->iv_pMonitor, iv_numInstances);
+
+     if (iv_pgetMetaConnection == NULL) {
+          LOGERROR("AMQAnalysisEngineService::initialize() Could not create fast getmeta ActiveMQ endpoint connection.");
+          ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
+          msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker.");
+          ErrorInfo errInfo;
+          errInfo.setMessage(msg);
+          UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
+            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+            errInfo.getMessage(),
+            errInfo.getMessage().getMessageID(),
+            ErrorInfo::unrecoverable);
+     }
+
+     //cout <<  __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta listener" << endl;
+     //create a MessageListener MessageConsumer to handle getMeta requests only.
+     AMQListener * newListener = new AMQListener(iv_numInstances,iv_pgetMetaConnection,
+          iv_vecpAnalysisEngines.at(0), 
+          this->iv_pMonitor);
+     if (newListener == NULL) {
+          LOGERROR("AMQAnalysisEngineService::initialize() Could not create Fast getMeta listener.");
+          ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
+          msg.addParam("AMQAnalysisEngineService::initialize() Could not create listener.");
+          ErrorInfo errInfo;
+          errInfo.setMessage(msg);
+          UIMA_EXC_THROW_NEW(Uima_runtime_error, 
+            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+            errInfo.getMessage(),
+            errInfo.getMessage().getMessageID(),
+            ErrorInfo::unrecoverable);
       }
-      iv_pMonitor->setNumberOfInstances(iv_numInstances);
+      this->iv_listeners[this->iv_numInstances] = newListener; 
+      iv_pgetMetaConnection->createMessageConsumer(iv_inputQueueName, 
+                      getmeta_selector); 
+      this->iv_pMonitor->setGetMetaListenerId(iv_numInstances);
+      //cout <<  __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() done" << endl;
     } catch (uima::Exception e) {
-      cout << __LINE__ << "got a uima exception " << endl;
+      cout << __FILE__ << __LINE__ <<  "AMQAnalysisEngineService::initialize() failed " << e.getErrorInfo().asString() << endl;
       LOGERROR("AMQAnalysisEngineService::initialize() " + e.asString());
       throw e;
     }
@@ -1105,75 +1387,82 @@
     cout << "tracelevel=" << uimacpp_ee_tracelevel << endl;
   }
 
-  void AMQAnalysisEngineService::onException( const CMSException& ex ) {
-    LOGERROR("CMS Exception occured. Shutting down the service." + ex.getMessage()); 
-    cerr << "Broken connection. Stopped receiving messages." << endl;
-    stop();
-    this->iv_pMonitor->shutdown();
-  }
-
-  void AMQAnalysisEngineService::onMessage( const Message* message ) {
-    LOGINFO(0,"AMQAnalsisEngineService::onMessage() Got a message.");
-  }
-
-  void AMQAnalysisEngineService::start() {	
-    for (size_t i=0; i < iv_vecpConnections.size(); i++) {
-      cout << "Starting listener " << i << endl;
-      AMQConnection * connection = iv_vecpConnections.at(i);
-      if (connection != NULL) {
-        connection->start();
-      } else {
-        LOGERROR("AMQAnalysisServiceEngine::start() Invalid connection object.");
-        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
-        msg.addParam("AMQAnalysisServiceEngine::start() Connection does not exist.");
-        ErrorInfo errInfo;
-        errInfo.setMessage(msg);
-        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
-          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
-          errInfo.getMessage(),
-          errInfo.getMessage().getMessageID(),
-          ErrorInfo::unrecoverable);
-      }
+  void AMQAnalysisEngineService::start() {
+    LOGINFO(FINER,"AMQAnalysisEngineService::start() create listener threads.");
+    //create the listener threads
+    thd_attr=0;
+    apr_status_t rv;
+    rv = apr_threadattr_create(&thd_attr, iv_pool);
+    assert(rv == APR_SUCCESS);
+    map<int, AMQListener*>::iterator ite; 
+    int i=0;
+    for (ite= iv_listeners.begin();ite !=  iv_listeners.end();ite++) {
+      apr_thread_t *thread=0;
+      rv = apr_thread_create(&thread, thd_attr, handleMessages, ite->second, iv_pool);
+      assert(rv == APR_SUCCESS);
+      iv_listenerThreads.push_back(thread);
     }
-    //update the start time
-    iv_pMonitor->setStartTime();
-  }
+    this->iv_pMonitor->setStartTime();
+}
 
-  int AMQAnalysisEngineService::stop() {
-    //TODO: let listeners finish processing first
-    //stop messages notification
-    for (size_t i=0; i < iv_vecpConnections.size(); i++) {
-      AMQConnection * connection = iv_vecpConnections.at(i);
-      if (connection != NULL) {
-        connection->stop();
-      } else {
-        LOGERROR("AMQAnalysisEngineService::stop() Connection object is NULL.");
-        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
-        msg.addParam("AMQAnalysisServiceEngine::start() Connection does not exist.");
-        ErrorInfo errInfo;
-        errInfo.setMessage(msg);
-        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
-          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
-          errInfo.getMessage(),
-          errInfo.getMessage().getMessageID(),
-          ErrorInfo::unrecoverable);
-      }
-      //cout << __FILE__ << " Delete Analysis Engine  " << endl;
-      if (iv_vecpAnalysisEngines.at(i) != NULL) {
-        delete iv_vecpAnalysisEngines.at(i);
-      }
-      if (iv_vecpCas.at(i) != NULL) {
-          delete iv_vecpCas.at(i);
-      }
 
+void AMQAnalysisEngineService::shutdown() {
+  //LOGWARN("AMQAnalysisEngineService::shutdown()");
+  cout << "AMQAnalysisEngineService::shutdown() going to terminate threads " << endl;;
+  //terminate the threads
+  apr_status_t rv;
+  for (size_t i=0; i < this->iv_listenerThreads.size(); i++) {
+    //cout << "wait for thread " << i << " to end " << endl;
+    this->iv_listeners[i]->stopProcessing();
+    if (!iv_listeners[i]->isReconnecting()) {
+      apr_thread_join(&rv, this->iv_listenerThreads.at(i));
+    }
+  }
+  stop();
+  //cout << "AMQAnalysisEngineService::shutdown stopped all connection" << endl;
+  cleanup();
+  //cout << "AMQAnalysisEngineService::shutdown shutdown done" << endl;
+}
+
+int AMQAnalysisEngineService::stop() {
+  
+  //TODO: let listeners finish processing first
+  //stop messages notification
+
+  if (iv_pgetMetaConnection != NULL) {
+    cout << "Stopping GetMetaData instance" << endl;
+    this->iv_pgetMetaConnection->stop();
+  }
+  for (size_t i=0; i < iv_vecpConnections.size(); i++) {
+    cout << "Stopping Annotator instance " << i << endl;
+    AMQConnection * connection = iv_vecpConnections.at(i);
+    if (connection != NULL) {
+      connection->stop();
+    } else {
+      LOGERROR("AMQAnalysisEngineService::stop() Connection object is NULL.");
+      ErrorInfo errInfo;
+      errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "connection object is NULL."));
+      UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
+        UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+        errInfo.getMessage(),
+        errInfo.getMessage().getMessageID(),
+        ErrorInfo::unrecoverable);
     }
-    iv_vecpAnalysisEngines.clear();
-    iv_vecpCas.clear();
-    return 0;
-  }	
+  } 
+  return 0;
+}	
+
   void AMQAnalysisEngineService::cleanup() {
     // Destroy resources.
     try { 
+      if (iv_closed) {
+         return;
+      }
+      if (iv_pgetMetaConnection != NULL) {
+        delete this->iv_pgetMetaConnection;
+        this->iv_pgetMetaConnection = 0;
+      }
+
       //cout << "num consumerConnections " << consumerConnections.size() << endl;
       for (size_t i=0; i < iv_vecpConnections.size(); i++) {
         //cout << "deleting consumerConnection " << i << endl;	
@@ -1181,26 +1470,35 @@
           iv_vecpConnections.at(i)->stop();
           delete iv_vecpConnections.at(i);
         }
-        /**
-        if (iv_vecpAnalysisEngines.at(i) != NULL) {
-          delete iv_vecpAnalysisEngines.at(i);
-        }
-        if (iv_vecpCas.at(i) != NULL) {
-          delete iv_vecpCas.at(i);
-        } 
-        **/    
       }  
+      
+      for (size_t i=0; i < iv_vecpAnalysisEngines.size(); i++) {
+         delete iv_vecpAnalysisEngines.at(i);
+      }
+
+      for (size_t i=0; i < iv_vecpCas.size(); i++) {
+         delete iv_vecpCas.at(i);
+      }
+
       iv_vecpAnalysisEngines.clear();
       iv_vecpCas.clear();
       iv_vecpConnections.clear();
+
       map<int, AMQListener*>::iterator ite; 
       for (ite= iv_listeners.begin();ite !=  iv_listeners.end();ite++) {
         delete ite->second;
       }
 
       iv_listeners.clear();
+
+      if (iv_pConnFact != NULL) {
+        delete iv_pConnFact;
+        iv_pConnFact = NULL;
+      }
+      iv_closed = true;
+
     } catch (CMSException& e) {
-      LOGERROR("AMQAnalysisEngineService::cleanup() " +  e.getMessage());
+      cerr << "AMQAnalysisEngineService::cleanup() "  << e.getMessage() << endl;;
     }  
   }           
 
@@ -1209,22 +1507,16 @@
 //CommonUtils
 //---------------------------------------------------
 void CommonUtils::logError(string msg) {
-    iv_pMonitor->logError(msg);
+    this->iv_pMonitor->logError(msg);
     //cerr << "ERROR: " << msg << endl;
   }
 void CommonUtils::logWarning(string msg) {
-    iv_pMonitor->logWarning(msg);
+    this->iv_pMonitor->logWarning(msg);
     //cout << "WARN: " << msg << endl;
   }
 void CommonUtils::logMessage(string msg) {
-    iv_pMonitor->logMessage(msg);
+  cout << "INFO: " << msg << endl;
+    this->iv_pMonitor->logMessage(msg);
     //cout << "INFO: " << msg << endl;
   }
 
-
-
-
-
-
-
-

Modified: incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp?rev=821374&r1=821373&r2=821374&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp Sat Oct  3 17:46:49 2009
@@ -25,10 +25,14 @@
 #define __ACTIVEMQ_AE_SERVICE__
 #include "uima/api.hpp"
 
+#include <cms/ConnectionFactory.h>
 #include <cms/Connection.h>
 #include <cms/Session.h>
 #include <cms/TextMessage.h>
 #include <cms/ExceptionListener.h>
+#include "time.h"
+#include <apr_thread_proc.h>
+
 
 using namespace cms;
 using namespace uima;
@@ -37,6 +41,13 @@
 class Monitor;
 class ServiceParameters;
 
+/** The function that the request processing thread will run. 
+  * It receives and processes each message from the input queue
+  * as it arrives.
+  */
+static void* APR_THREAD_FUNC handleMessages(apr_thread_t *thd, void *data);
+
+
 /** common base class */
 class CommonUtils {
 protected:
@@ -58,8 +69,12 @@
                       public CommonUtils {
 private:
   string iv_brokerURL;
+ 
   Connection* iv_pConnection;
   bool iv_valid;
+  bool iv_reconnecting;
+  bool iv_started;
+  int iv_id;
 
   //consumer session
   Session* iv_pConsumerSession;
@@ -67,21 +82,28 @@
   string iv_inputQueueName;
   cms::Queue * iv_pInputQueue;      
   MessageListener * iv_pListener;
+  TextMessage * iv_pReceivedMessage;
+  string iv_selector;
 	
   //producer session
   Session * iv_pProducerSession;
   MessageProducer * iv_pProducer;  
-	TextMessage * iv_pTextMessage;
+	TextMessage * iv_pReplyMessage;
   map<string, cms::Destination*> iv_replyDestinations; //queuename-destination
-
+  
+  void initialize();
 public:
+
+  static ConnectionFactory * createConnectionFactory(ServiceParameters & params);
+  
 	/** Establish connection to the broker and create a Message Producer session. 
    */
-	AMQConnection ( const char * aBrokerURL, Monitor * pStatistics);
+  AMQConnection ( ConnectionFactory * connFact, Monitor * pStatistics, int id);
 
   /** Creates a MessageConsumer session and registers a listener. 
       Caller owns the listener. */
-	void createMessageConsumer(string aQueueName, MessageListener * pListener, int prefetch); 
+	//void createMessageConsumer(string aQueueName); 
+  void createMessageConsumer(string aQueueName, string selector); 
  	
   /** destructor */
 	~AMQConnection();
@@ -101,16 +123,24 @@
 	/** returns a TextMessage owned by this class. */
 	TextMessage * getTextMessage();
 
-	/** sends the message and clears it. */
+	/** sends the reply message and clears it. */
 	void sendMessage(string queueName);
-
-  /** sends the message and clears it. */
 	void sendMessage(const Destination * cmsReplyTo);
+  void sendMessage (TextMessage * request);
+
   
   bool isValid() {
     return iv_valid;
   }
 
+  bool isStarted() {
+    return iv_started;
+  }
+
+  bool isReconnecting() {
+    return iv_reconnecting;
+  }		    
+
   /** get the brokerURL */
   string getBrokerURL() {
     return this->iv_brokerURL;
@@ -120,6 +150,25 @@
   string getInputQueueName() {
     return this->iv_inputQueueName;
   }
+
+  /** reset the connection handles */
+  void reset();
+
+  /** reset before reconnecting */
+  void resetBeforeReconnect();
+
+
+  /** reestablish broken connection.
+   * Attempts to reconnect 30 seconds after each failed attempt.
+   */
+  void reconnect();
+
+  /** receives the next message for this consumer.
+    * Delay is in millis. 
+    */
+  Message * receive(const int delay);
+
+  ConnectionFactory * iv_pConnFact;
 };
 
 
@@ -129,10 +178,11 @@
 //--------------------------------------------------
 class AMQConnectionsCache : public CommonUtils  {
 private:
+  ConnectionFactory * iv_pConnFact;
 	map<string, AMQConnection *> iv_connections; //key is brokerurl
 	
 public:
-	AMQConnectionsCache(Monitor * pStatistics); 
+	AMQConnectionsCache(ConnectionFactory * pConnFact, Monitor * pStatistics); 
 
 	~AMQConnectionsCache(); 
 
@@ -146,15 +196,17 @@
 };
 
 //======================================================
-// A MessageListener that handles getMeta, processCAS
+// This class handles getMeta, processCAS
 // and Collection Processing Complete requests.
 //
 // Records timing and error JMX statistics.
 //
 //------------------------------------------------------
-class AMQListener : public MessageListener,  
-                    public CommonUtils            {
+class AMQListener : public CommonUtils            {
 private:
+  apr_thread_t *thd;
+  bool iv_stopProcessing;
+
 	int iv_id;									    //Listener id
 	string iv_inputQueueName;				//queue this listener gets messages from
 	string iv_brokerURL;				    //broker this listener is connected to 
@@ -169,15 +221,27 @@
 	AMQConnectionsCache    iv_replyToConnections; //maintain connections cache for
                                                 //sending reply to other brokers.
 	
+  void getMetaData(AnalysisEngine * pEngine); 
+
+  void handleRequest(const Message * request);
+
+  bool validateRequest(const TextMessage *, string &);
+
   void sendResponse(const TextMessage * request, apr_time_t timeToDeserialize,
                     apr_time_t timeToSerialize, apr_time_t timeInAnalytic,
                     apr_time_t idleTime, apr_time_t elapsedTime, 
 		                string msgContent, bool isExceptionMsg); 
 public:
-	/** constructor */ 
+  
+	/** constructor */
   AMQListener(int id,
               AMQConnection * pConnection,
               AnalysisEngine * pEngine,
+              Monitor * pStatistics); 
+
+   AMQListener(int id,
+              AMQConnection * pConnection,
+              AnalysisEngine * pEngine,
               CAS * pCas,
               Monitor * pStatistics); 
   /** destructor */
@@ -187,11 +251,19 @@
 		return this->iv_busy;
 	}
 
-  /*
-	* Process the message. Handles GETMETA, PROCESSCAS
-  * and CPC requests.
-	*/
-	virtual void onMessage( const Message* message ); 
+   /* Flag to break out of receive loop */
+    void stopProcessing() {
+      iv_stopProcessing = true;
+    }
+
+   bool isReconnecting() {
+     return iv_pConnection->isReconnecting();
+   }
+    /* 
+     * Synchronously receives and processes messages 
+     */
+    void receiveAndProcessMessages(apr_thread_t *thd);
+
 	
 };
 
@@ -201,13 +273,13 @@
 //
 // This class creates connections to an ActiveMQ broker 
 // and registers one or more MessageConsumers to receive 
-// messages from a specified queue.  The MessageConsumers can
-// process requests for GETMETA, PROCESSCAS and Collection
-// Process Complete. 
-//
-// Each MessageConsumer registers a MessageListener that will
-// receive and process messages. Each instance of the MessageListener
-// is initialized with an instance of the AnalysisEngine and a CAS.
+// messages from a specified queue. 
+//
+// A thread is started for each instance. Each thread maintains a
+// connection to the broker, and an instance of the UIMA AnalysisEngine.
+// To support fast handling of GETMETA requests, an additional separate
+// MessageConsumer and thread is started that processes only GETMETA requests.
+// 
 // 
 // The service wrapper sets acknowledgment mode to AUTO_ACKNOWLEDGE mode 
 // by default and lets the underlying middleware handle the message
@@ -222,10 +294,15 @@
 // See the UIMA-EE documentation for how to start and manage a C++
 // servcice from Java using the UimacppServiceController bean.
 //------------------------------------------------------------------------
-class AMQAnalysisEngineService : public ExceptionListener,
-	                               public MessageListener,
-                                 public  CommonUtils {
+class AMQAnalysisEngineService : public  CommonUtils {
 private:
+
+  apr_pool_t * iv_pool;
+  apr_threadattr_t *thd_attr;
+  ConnectionFactory * iv_pConnFact;
+  vector<apr_thread_t *> iv_listenerThreads;
+
+
   string iv_brokerURL;  
 	string iv_inputQueueName;  
   string iv_aeDescriptor;
@@ -233,28 +310,30 @@
 	int iv_prefetchSize;
   size_t iv_initialFSHeapSize;
 
+  AMQConnection * iv_pgetMetaConnection;
 	vector <AMQConnection*> iv_vecpConnections;
   vector <AnalysisEngine *> iv_vecpAnalysisEngines;
   vector <CAS *> iv_vecpCas;
   map<int, AMQListener *> iv_listeners;    //id - listener
 
-  void initialize(); 
+  bool iv_started;
+  bool iv_closed;
+
+  void initialize(ServiceParameters & params);
 public:
   void cleanup();
   
   ~AMQAnalysisEngineService(); 
 
-	AMQAnalysisEngineService(ServiceParameters & desc, Monitor * pStatistics);
+	AMQAnalysisEngineService(ServiceParameters & desc, Monitor * pStatistics, apr_pool_t * pool);
 
 	void setTraceLevel(int level);
-
-	void onException( const CMSException& ex );
-		
-	virtual void onMessage( const Message* message );
 	
 	void start();
 		
-	int stop(); 	  
+	int stop(); 	
+
+  void shutdown();
 };
 
 #endif

Modified: incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp?rev=821374&r1=821373&r2=821374&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp Sat Oct  3 17:46:49 2009
@@ -64,7 +64,7 @@
     apr_pool_t *pool;
     rv = apr_pool_create(&pool, NULL);
     if (rv != APR_SUCCESS) {
-      cerr << "ERROR: apr_pool_create() failed. " << endl;
+      cerr << __FILE__ << " ERROR: apr_pool_create() failed. " << endl;
       return -1;
     }
 
@@ -78,7 +78,7 @@
     initialize(serviceDesc, pool);   
 
     /*create service*/
-    AMQAnalysisEngineService aeService(serviceDesc,singleton_pMonitor);
+    AMQAnalysisEngineService aeService(serviceDesc,singleton_pMonitor, pool);
     aeService.setTraceLevel(serviceDesc.getTraceLevel());
 
     /*start receiving messages*/ 
@@ -108,7 +108,7 @@
     /* shutdown */
    // uima::ResourceManager::getInstance().getLogger().logMessage("deployCppService shutting down.");
     cout << __FILE__ << " shutting down." << endl;
-    aeService.stop();
+    aeService.shutdown();
     terminateService();
     
     if (pool) {