You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2009/10/03 19:46:50 UTC
svn commit: r821374 [1/2] - in /incubator/uima/uimacpp/trunk/src/utils:
ActiveMQAnalysisEngineService.cpp ActiveMQAnalysisEngineService.hpp
deployCppService.cpp deployCppService.hpp
Author: eae
Date: Sat Oct 3 17:46:49 2009
New Revision: 821374
URL: http://svn.apache.org/viewvc?rev=821374&view=rev
Log:
UIMA-799 commit Bhavani's uimacpp-799.patch
Modified:
incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp
incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp
incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp
incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp
Modified: incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp?rev=821374&r1=821373&r2=821374&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp Sat Oct 3 17:46:49 2009
@@ -51,11 +51,44 @@
#define LOGINFO(n,x) { if (n > uimacpp_ee_tracelevel) {} else { FORMATMSG(x); logMessage(lstr.str().c_str()); } }
#define LOGERROR(x){ FORMATMSG(x); logError(lstr.str().c_str()); }
#define LOGWARN(x) { FORMATMSG(x); logWarning(lstr.str().c_str());}
+
+static void listener_signal_handler(int signum) {
+ stringstream str;
+ str << __FILE__ << __LINE__ << " Received Signal: " << signum;
+ cerr << str.str() << endl;
+}
+
+
+//=================================================================
+//
+//Message processing function executed by message handling threads.
+//-----------------------------------------------------------------
+static void* APR_THREAD_FUNC handleMessages(apr_thread_t *thd, void *data) {
+ //cout << __FILE__ << __LINE__ << Thread::getId() << " handleMessages start " << endl;
+ AMQListener * handler = (AMQListener*) data;
+ handler->receiveAndProcessMessages(thd);
+ apr_thread_exit(thd, APR_SUCCESS);
+ return NULL;
+}
+
+
//===================================================
//AMQConnection
//---------------------------------------------------
- AMQConnection::AMQConnection( const char * aBrokerURL, Monitor * pStatistics) :
- iv_brokerURL(aBrokerURL),
+ ConnectionFactory * AMQConnection::createConnectionFactory(ServiceParameters & params) {
+ stringstream str;
+ str << params.getBrokerURL() << "?jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() << endl;
+ ConnectionFactory * pConnFactory = ConnectionFactory::createCMSConnectionFactory(str.str());
+ cout << "AMQConnection()::createConnectionFactory " << params.getBrokerURL() << " prefetch=" << params.getPrefetchSize() << endl;
+ return pConnFactory;
+ };
+
+ AMQConnection::AMQConnection( ConnectionFactory * connFact,
+ Monitor * pMonitor, int id) :
+ iv_id(id),
+ iv_pConnFact(connFact),
+ //iv_pMonitor(0),
+ iv_brokerURL(((ActiveMQConnectionFactory*)connFact)->getBrokerURL()),
iv_pConnection(0),
iv_pConsumerSession(0),
iv_pConsumer(0),
@@ -64,22 +97,29 @@
iv_pListener(0),
iv_pProducerSession(0),
iv_pProducer(0),
- iv_pTextMessage(0),
+ iv_pReplyMessage(0),
iv_replyDestinations(),
- iv_valid(false) {
+ iv_valid(false),
+ iv_started(false),
+ iv_reconnecting(false) {
+ int pos = iv_brokerURL.find_first_of("?");
+ if (pos != string::npos) {
+ iv_brokerURL = iv_brokerURL.substr(0,pos);
+ }
+ iv_pMonitor = pMonitor;
+ initialize();
+ }
+ void AMQConnection::initialize( ) {
try {
- iv_pMonitor = pStatistics;
+
LOGINFO(INFO, "AMQConnection() connecting to " + iv_brokerURL);
- // Create a ConnectionFactory
- ActiveMQConnectionFactory* connectionFactory =
- new ActiveMQConnectionFactory(iv_brokerURL);
-
+
// Create a Connection
- if (connectionFactory == NULL) {
- LOGERROR("AMQConnection() could not create connection factory");
+ if (iv_pConnFact == NULL) {
+ LOGERROR("AMQConnection() invalid connection factory");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
- msg.addParam("AMQConnection() could not create connection factory");
+ msg.addParam("AMQConnection() invalid create connection factory");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
@@ -88,25 +128,40 @@
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
- this->iv_pConnection = connectionFactory->createConnection();
- if (this->iv_pConnection == NULL) {
- LOGERROR("AMQConnection() could not create connection.");
- ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
- msg.addParam("AMQConnection() could not create connection to " + iv_brokerURL);
- ErrorInfo errInfo;
- errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
- errInfo.setMessage(msg);
- UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
- UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
- errInfo.getMessage(),
- errInfo.getMessage().getMessageID(),
- ErrorInfo::unrecoverable);
- } else {
- //default exception listener
- this->iv_pConnection->setExceptionListener(this);
+
+ bool retrying = false;
+ while (iv_pConnection == NULL) {
+ try {
+ iv_pConnection = iv_pConnFact->createConnection();
+ if (iv_pConnection == NULL ) {
+ if (!retrying) {
+ stringstream str;
+ str << " AMQConnection::initialize() Connection object is null. Failed to connect to " << iv_brokerURL
+ << ". Retrying..." << endl;
+ LOGWARN(str.str());
+ retrying = true;
+ apr_sleep(30000000); //wait 30 seconds to reconnect
+ }
+ }
+ } catch (cms::CMSException& e) {
+ if (!retrying) {
+ stringstream str;
+ str << "AMQConnection::initialize() Failed to connect to " << iv_brokerURL
+ << e.getMessage() << ". Retrying..." << endl;
+ LOGWARN(str.str());
+ retrying = true;
+ apr_sleep(30000000); //wait 30 seconds to reconnect
+ }
+ }
+ }
+
+ if (retrying) {
+ LOGWARN("AMQConnection::initialize() Connected to " + iv_brokerURL);
}
- delete connectionFactory;
+ //default exception listener
+ this->iv_pConnection->setExceptionListener(this);
+
// Create a Producer Session
LOGINFO(FINEST,"AMQConnection() create Producer Session " + iv_brokerURL);
this->iv_pProducerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE );
@@ -139,8 +194,8 @@
this->iv_pProducer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
//create TextMessage
- this->iv_pTextMessage = this->iv_pProducerSession->createTextMessage();
- if (this->iv_pTextMessage == NULL) {
+ this->iv_pReplyMessage = this->iv_pProducerSession->createTextMessage();
+ if (this->iv_pReplyMessage == NULL) {
LOGERROR("AMQConnection() create textMessage failed. ");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() failed to create message.");
@@ -209,9 +264,9 @@
delete this->iv_pConnection;
this->iv_pConnection = NULL;
}
- if (this->iv_pTextMessage != NULL) {
- delete this->iv_pTextMessage;
- this->iv_pTextMessage=NULL;
+ if (this->iv_pReplyMessage != NULL) {
+ delete this->iv_pReplyMessage;
+ this->iv_pReplyMessage=NULL;
}
//destinations
@@ -222,20 +277,13 @@
}
/* create a MessageConsumer session and register a MessageListener
-to receive messages from the input queue. */
- void AMQConnection::createMessageConsumer(string queueName,
- MessageListener * listener,
- int prefetch) {
+ to receive messages from the input queue. */
+ void AMQConnection::createMessageConsumer(string queueName, string selector) {
LOGINFO(FINEST, "AMQConnection::createMessageConsumer() consumer start " + queueName);
this->iv_inputQueueName = queueName;
stringstream str;
- //we add one to prefetch size to get the same behavior as the
- //Spring listener used in UIMA Java SDK which calls the
- //ActiveMQMessageConsumer.receive() api.
- //create endpoint destination
str << queueName;
- str << "?consumer.prefetchSize=" << prefetch+1 << endl;
-
+
if (this->iv_pConsumerSession != NULL || iv_pConsumer != NULL) {
LOGERROR("AMQConnection::createMessageConsumer() A session already exists. ");
ErrorInfo errInfo;
@@ -278,7 +326,13 @@
ErrorInfo::unrecoverable);
}
- this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue);
+ iv_selector = selector;
+ if (selector.length() > 0) {
+ this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue, iv_selector);
+ } else {
+ this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue);
+ }
+
if (this->iv_pConsumer == NULL) {
LOGERROR("AMQConnection::createMessageConsumer() createConsumer failed. " + queueName);
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
@@ -291,28 +345,24 @@
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
- //register listener
- this->iv_pConsumer->setMessageListener(listener); //caller owns listener
- this->iv_pListener = listener;
+
LOGINFO(FINEST, "AMQConnection::createMessageConsumer() " + queueName + " successful.");
}
-//caller owns the ExceptionListener
- void AMQConnection::setExceptionListener(ExceptionListener * el) {
- this->iv_pConnection->setExceptionListener(el);
- }
void AMQConnection::onException(const CMSException & ex) {
//mark endpoint as broken.
this->iv_valid = false;
//log that connection is invalid.
- LOGWARN("AMQConnection()::onException() Connection to " + iv_brokerURL
- + " may be broken. " + ex.getMessage());
+ stringstream str;
+ str << "AMQConnection()::onException() Connection to "
+ << iv_brokerURL << " is broken. Reconnecting ... " << ex.getMessage() << endl;
+ LOGWARN(str.str());
}
-
+
TextMessage * AMQConnection::getTextMessage() {
- if (this->iv_pTextMessage == NULL) {
+ if (this->iv_pReplyMessage == NULL) {
LOGERROR("AMQConnection::getTextMessage() failed. ");
ErrorInfo errInfo;
errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "TextMessage could not be created."));
@@ -322,9 +372,9 @@
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
- iv_pTextMessage->clearProperties();
- iv_pTextMessage->clearBody();
- return this->iv_pTextMessage;
+ iv_pReplyMessage->clearProperties();
+ iv_pReplyMessage->clearBody();
+ return this->iv_pReplyMessage;
}
void AMQConnection::sendMessage(string queuename) {
@@ -364,11 +414,11 @@
ErrorInfo::unrecoverable);
}
- this->iv_pProducer->send(pDest,this->iv_pTextMessage);
+ this->iv_pProducer->send(pDest,this->iv_pReplyMessage);
//cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl;
- this->iv_pTextMessage->clearBody();
- this->iv_pTextMessage->clearProperties();
+ this->iv_pReplyMessage->clearBody();
+ this->iv_pReplyMessage->clearProperties();
LOGINFO(FINEST, "AMQConnection::sendMessage() successful to " + queuename);
}
@@ -384,11 +434,11 @@
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
- this->iv_pProducer->send(cmsReplyTo,this->iv_pTextMessage);
+ this->iv_pProducer->send(cmsReplyTo,this->iv_pReplyMessage);
//cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl;
- this->iv_pTextMessage->clearBody();
- this->iv_pTextMessage->clearProperties();
+ this->iv_pReplyMessage->clearBody();
+ this->iv_pReplyMessage->clearProperties();
LOGINFO(4, "AMQConnection::sendMessage() successful to " + cmsReplyTo->toProviderString());
}
@@ -429,12 +479,86 @@
}
}
+ void AMQConnection::resetBeforeReconnect() {
+ this->iv_reconnecting = true;
+
+ if (this->iv_pConsumer != NULL) {
+ delete this->iv_pConsumer;
+ this->iv_pConsumer = NULL;
+ }
+ if (this->iv_pProducer != NULL) {
+ delete this->iv_pProducer;
+ this->iv_pProducer = NULL;
+ }
+ if (this->iv_pConsumerSession != NULL) {
+ delete this->iv_pConsumerSession;
+ this->iv_pConsumerSession = NULL;
+ }
+ if (this->iv_pProducerSession != NULL) {
+ delete this->iv_pProducerSession;
+ this->iv_pProducerSession = NULL;
+ }
+ if (this->iv_pConnection != NULL) {
+ delete this->iv_pConnection;
+ this->iv_pConnection = NULL;
+ }
+
+ this->iv_started = false;
+ this->iv_valid = false;
+
+ if (this->iv_pReplyMessage != NULL) {
+ delete this->iv_pReplyMessage;
+ this->iv_pReplyMessage=NULL;
+ }
+
+ //destinations
+ map<string, cms::Destination*>::iterator ite;
+ for (ite= iv_replyDestinations.begin();ite != iv_replyDestinations.end();ite++) {
+ delete ite->second;
+ }
+ }
+
+ void AMQConnection::reconnect() {
+ try {
+ this->iv_reconnecting = true;
+ this->iv_pMonitor->reconnecting(iv_id);
+ apr_sleep(30000000); //wait 30 seconds to reconnect
+ //cout << "AMQConnection::reconnect() calling initialize >>>" << endl;
+ this->initialize();
+ this->createMessageConsumer(this->iv_inputQueueName,this->iv_selector);
+ this->start();
+ stringstream str;
+ str << "AMQConnection::reconnect() Successfully reconnected to >>> " << iv_brokerURL << endl;
+ LOGWARN(str.str());
+ this->iv_pMonitor->reconnectionSuccess(iv_id);
+ this->iv_reconnecting = false;
+ } catch (uima::Exception e) {
+ cerr << "AMQConnection::reconnect() " << e << endl;
+ resetBeforeReconnect();
+ } catch (...) {
+ cerr << "AMQConnection::reconnect() catch ... " << endl;
+ resetBeforeReconnect();
+ }
+
+ }
+
+ Message * AMQConnection::receive(const int delay) {
+ if (iv_valid) {
+ return iv_pConsumer->receive(delay);
+ } else {
+ this->resetBeforeReconnect();
+ this->reconnect();
+ this->iv_valid = true;
+ return NULL;
+ }
+ }
//===================================================
//AMQConnectionCache
//---------------------------------------------------
- AMQConnectionsCache::AMQConnectionsCache(Monitor * stats) {
+ AMQConnectionsCache::AMQConnectionsCache(ConnectionFactory * pConnFact,Monitor * stats) {
+ iv_pConnFact = pConnFact;
iv_pMonitor = stats;
}
@@ -462,7 +586,7 @@
if (ite == iv_connections.end()) {
LOGINFO(FINE,"AMQConnectionsCache::getConnection() create new connection to " +
brokerURL);
- connection = new AMQConnection(brokerURL.c_str(), iv_pMonitor);
+ connection = new AMQConnection(iv_pConnFact, iv_pMonitor, iv_connections.size());
if (connection == NULL) {
LOGERROR("AMQConnectionCache::getConnection Could not create a endpoint connection to " +
brokerURL);
@@ -480,7 +604,7 @@
LOGWARN("AMQConnectionCache::getEndPoint() Existing connection invalid. Reconnecting to " + brokerURL );
delete connection;
this->iv_connections.erase(brokerURL);
- connection = new AMQConnection(brokerURL.c_str(), iv_pMonitor);
+ connection = new AMQConnection(iv_pConnFact, iv_pMonitor, iv_connections.size());
if (connection == NULL) {
LOGERROR("AMQConnectionCache::getConnection() could not connect to "
+ brokerURL );
@@ -534,7 +658,29 @@
iv_pEngine(ae),
iv_pCas(cas),
//iv_pMonitor(stats),
- iv_replyToConnections(stats),
+ iv_replyToConnections(connection->iv_pConnFact,stats),
+ iv_timeLastRequestCompleted(0),
+ iv_busy(false),
+ iv_stopProcessing(false),
+ iv_count(0),
+ iv_aeDescriptor(),
+ iv_brokerURL(connection->getBrokerURL()),
+ iv_inputQueueName(connection->getInputQueueName()) {
+
+ iv_pMonitor = stats;
+ getMetaData(iv_pEngine);
+
+ }
+
+ AMQListener::AMQListener(int id,
+ AMQConnection * connection,
+ AnalysisEngine * ae,
+ Monitor * stats) :
+ iv_id(id),
+ iv_pConnection(connection),
+ iv_pEngine(ae),
+ //iv_pMonitor(stats),
+ iv_replyToConnections(connection->iv_pConnFact, stats),
iv_timeLastRequestCompleted(0),
iv_busy(false),
iv_count(0),
@@ -543,20 +689,152 @@
iv_inputQueueName(connection->getInputQueueName()) {
iv_pMonitor = stats;
- //get AE descriptor as XML to use when processing GETMETA requests.
- const AnalysisEngineMetaData & aeMetaData = iv_pEngine->getAnalysisEngineMetaData();
+ getMetaData(iv_pEngine);
+
+ }
+
+ AMQListener::~AMQListener() {
+
+ }
+
+ //extract AE descriptor as XML to use when processing GETMETA requests.
+ void AMQListener::getMetaData(AnalysisEngine * pEngine) {
+ if (pEngine == NULL) {
+ LOGERROR("AMQListener::getMetaData() Invalid handle to AnalysisEngine.");
+ ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
+ msg.addParam("AMQListener::getMetaData() Invalid handle to AnalysisEngine.");
+ ErrorInfo errInfo;
+ errInfo.setMessage(msg);
+ UIMA_EXC_THROW_NEW(Uima_runtime_error,
+ UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+ errInfo.getMessage(),
+ errInfo.getMessage().getMessageID(),
+ ErrorInfo::unrecoverable);
+ }
+ const AnalysisEngineMetaData & aeMetaData = pEngine->getAnalysisEngineMetaData();
icu::UnicodeString xmlBuffer;
- xmlBuffer.insert(0, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
- iv_pEngine->getAnnotatorContext().getTaeSpecifier().toXMLBuffer(aeMetaData,
+ xmlBuffer.insert(0, "<?xml version=\"1.0\"?>");
+ pEngine->getAnnotatorContext().getTaeSpecifier().toXMLBuffer(aeMetaData,
false, xmlBuffer);
UnicodeStringRef uref(xmlBuffer.getBuffer(), xmlBuffer.length());
this->iv_aeDescriptor = uref.asUTF8();
}
- AMQListener::~AMQListener() {
+
+bool AMQListener::validateRequest(const TextMessage * textMessage, string & errmsg) {
+ bool valid = true;
+
+ if ( textMessage->getCMSReplyTo() == NULL)
+ LOGWARN("AMQListener::validateRequest() JMSReplyTo is not set. " );
+
+ if (textMessage->getCMSReplyTo() == NULL &&
+ !textMessage->propertyExists("MessageFrom") ) {
+ errmsg = "Reply to destination not set.";
+ return false;
+ }
+
+ if (!textMessage->propertyExists("Command") ) {
+ errmsg = "Required property 'Command' is not set.";
+ LOGERROR("AMQListener::validateRequest " + errmsg);
+ valid = false;
+ } else {
+ int command = textMessage->getIntProperty("Command");
+ if (command != PROCESS_CAS_COMMAND &&
+ command != GET_META_COMMAND &&
+ command != CPC_COMMAND) {
+ stringstream str;
+ str << "Unexpected value for 'Command' " << command;
+ errmsg = str.str();
+ LOGERROR("AMQListener::validateRequest " + errmsg);
+ valid=false;
+ } else if (command == CPC_COMMAND) {
+ if (iv_pEngine == NULL) {
+ errmsg = "CPC request received but AnalysisEngine not available.";
+ LOGERROR("AMQListener::validateRequest() " + errmsg);
+ valid = false;
+ }
+ } else if (command == PROCESS_CAS_COMMAND) {
+ if (iv_pCas == NULL || iv_pEngine == NULL) {
+ errmsg = "Process Cas request but an AnalysisEngine and CAS not available.";
+ LOGERROR("AMQListener::validateRequest() " + errmsg);
+ valid = false;
+ }
+ if (!textMessage->propertyExists("Payload") ) {
+ errmsg = "Required property 'Payload' is not set.";
+ LOGERROR("AMQListener::validateRequest " + errmsg);
+ valid = false;
+ } else {
+ int payload = textMessage->getIntProperty("Payload");
+ if (payload != XCAS_PAYLOAD && payload != XMI_PAYLOAD) {
+ stringstream str;
+ str << "Unexpected value for 'Payload' " << payload;
+ errmsg = str.str();
+ LOGERROR("AMQListener::validateRequest " + errmsg);
+ valid=false;
+ }
+ string text = textMessage->getText().c_str();
+ if (text.length() == 0) {
+ errmsg = "There is no payload data. Nothing to process.";
+ LOGERROR("AMQListener::validateRequest " + errmsg);
+ valid = false;
+ }
+ }
+ }
}
+ return valid;
+}
+
+
+void AMQListener::receiveAndProcessMessages(apr_thread_t * thd) {
+
+ try {
+ this->thd = thd;
+ cout << "Instance: " << iv_id << " ThreadId: " << apr_os_thread_current() << " started." << endl;
+ //start receiving messages
+ this->iv_pConnection->start();
+ this->iv_stopProcessing = false;
+ apr_time_t lastStatsTime = apr_time_now();
+ while (!iv_stopProcessing) {
+
+ stringstream astr;
+
+ //cout << "receiveAndProcess going to call recieve" << endl;
+ Message * msg = this->iv_pConnection->receive(2000);
+
+ if (msg != NULL) {
+ iv_busy = true;
+ astr << this->iv_id;
+ astr << " *****Message#: "<< ++iv_count << "*****";
+ //cerr << astr.str() << endl;
+ //LOGINFO(FINE,astr.str());
+ this->handleRequest(msg);
+ ///this->testHandleRequest(msg);
+ delete msg;
+ msg = 0;
+ //cerr << iv_id << " ****Message#: " << iv_count << " deleted " << endl;
+ iv_busy = false;
+ this->iv_timeLastRequestCompleted = apr_time_now();
+ //cout << iv_id << " processMessages done: getnext " << iv_count << endl;
+ }
+
+ }
+ LOGWARN("AMQListener::receiveAndProcessMessage() stopped receiving messages.");
+ } catch (uima::Exception ex ) {
+ LOGERROR("AMQListener::receiveAndProcessMessages() " +
+ ex.getErrorInfo().getMessage().asString());
+ this->iv_stopProcessing = true;
+ //tell the monitor that the thread has stopped processing
+ iv_pMonitor->listenerStopped(this->iv_id);
+ } catch (...) {
+ LOGERROR("AMQListener::receiveAndProcessMessages UnExpected error.");
+ iv_stopProcessing = true;
+ //tell the monitor that the thread has stopped processing
+ iv_pMonitor->listenerStopped(this->iv_id);
+ }
+}
+
/*
* Receive a TextMessage and examine the header properties
@@ -564,106 +842,86 @@
* one request at a time and is blocked till the requestis
* handled and the response sent.
*/
- void AMQListener::onMessage( const Message* message ) {
+ void AMQListener::handleRequest( const Message* message ) {
apr_time_t startTime = apr_time_now();
- apr_time_t endTime;
-
- apr_time_t startSerialize;
- apr_time_t startDeserialize;
- apr_time_t startAnnotatorProcess;
+ apr_time_t timeIdle = 0;
+ // Idle time is computed as the interval from time last request
+ // was processed. If this is the first request, idle time is
+ // computed from the time the service was started.
+ if (iv_timeLastRequestCompleted != 0)
+ timeIdle = startTime - iv_timeLastRequestCompleted;
+ else
+ timeIdle = startTime - iv_pMonitor->getServiceStartTime();
+ this->iv_pMonitor->processingStarted(this->iv_id, startTime, timeIdle );
+ stringstream astr;
+ astr << this->iv_id;
+ astr << " ****handleRequest(): "<< iv_count << " start"<< endl;
+ LOGINFO(FINE,astr.str());
+ ///cout << astr.str() << endl;
+ ///LOGWARN(astr.str());
+ apr_time_t endTime = 0;
+ apr_time_t startSerialize = 0;
+ apr_time_t startDeserialize = 0;
+ apr_time_t startAnnotatorProcess = 0;
+ apr_time_t startSendResponse = 0;
apr_time_t timeToDeserializeCAS = 0;
apr_time_t timeToSerializeCAS = 0;
apr_time_t timeInAnalytic = 0;
- apr_time_t timeIdle = apr_time_now() - iv_timeLastRequestCompleted;
const TextMessage* textMessage=0;
int command = 0;
- stringstream astr;
- astr << "*****Message#: " << ++iv_count << "*****" << endl;
- LOGINFO(INFO,astr.str());
- astr.seekp(0);
-
try {
- iv_busy=true;
+
textMessage = dynamic_cast< const TextMessage* >( message );
if (textMessage==0) {
- LOGERROR("AMQListener::onMessage() invalid pointer to TextMessage");
+ LOGERROR("AMQListener::handleRequest() invalid pointer to TextMessage");
endTime = apr_time_now();
- iv_pMonitor->processingComplete(0,false,endTime-startTime);
+ iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime);
this->iv_timeLastRequestCompleted = apr_time_now();
return;
}
if (textMessage->propertyExists("MessageFrom")) {
LOGINFO(FINER,"Received from " + textMessage->getStringProperty("MessageFrom"));
- } else {
- LOGERROR("AMQListener::onMessage() ERROR MessageFrom not set.");
- endTime = apr_time_now();
- iv_pMonitor->processingComplete(0,false,endTime-startTime);
- this->iv_timeLastRequestCompleted = apr_time_now();
- return;
- }
+ }
- if (textMessage->propertyExists("Command") ){
- command = textMessage->getIntProperty("Command");
- } else {
- LOGERROR("AMQListener::onMessage() Required property Command not set.");
+ //validate request properties
+ string errormessage;
+ if (!validateRequest(textMessage, errormessage)) {
+ LOGERROR("Listener::handleRequest() " + errormessage);
endTime = apr_time_now();
- sendResponse(textMessage, timeToDeserializeCAS,
- timeToSerializeCAS, timeInAnalytic,
- timeIdle, endTime-startTime,
- "Required property 'Command' not set." ,true);
+ sendResponse(textMessage, 0,0,0,timeIdle, endTime-startTime,
+ errormessage ,true);
endTime = apr_time_now();
- iv_pMonitor->processingComplete(0,false,endTime-startTime);
- this->iv_timeLastRequestCompleted = apr_time_now();
+ iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime);
return;
}
- astr.seekp(0);
+
+ command = textMessage->getIntProperty("Command");
+ astr.str("");
+ astr << "Received request Command: " << command ;
+ if (textMessage->propertyExists("CasReference")) {
+ astr << " CasReference " << textMessage->getStringProperty("CasReference");
+ }
+
astr << "Received request Command: " << command ;
if (textMessage->propertyExists("CasReference")) {
astr << " CasReference " << textMessage->getStringProperty("CasReference");
}
LOGINFO(INFO,astr.str());
- astr.seekp(0);
+
if (command == PROCESS_CAS_COMMAND) { //process CAS
- //get the payload property
- if (!textMessage->propertyExists("Payload")) {
- LOGERROR("AMQListener::onMessage() Required property Payload not set.");
- endTime = apr_time_now();
- sendResponse(textMessage, timeToDeserializeCAS,
- timeToSerializeCAS, timeInAnalytic,
- timeIdle,endTime-startTime,
- "Required property 'Command' not set." ,true);
-
- endTime = apr_time_now();
- iv_pMonitor->processingComplete(command,false,endTime-startTime);
- this->iv_timeLastRequestCompleted = apr_time_now();
- return;
- }
+ LOGINFO(FINE,"Process CAS request start.");
int payload = textMessage->getIntProperty("Payload");
//get the text in the payload
- string text = textMessage->getText();
- if (text.length() == 0) {
- LOGERROR("AMQListener::onMessage() There is no payload data. Nothing to process.");
- text = "There is no payload data. Nothing to process.";
- endTime = apr_time_now();
- sendResponse(textMessage, timeToDeserializeCAS,
- timeToSerializeCAS, timeInAnalytic,
- timeIdle, endTime-startTime,
- text ,true);
- endTime=apr_time_now();
- iv_pMonitor->processingComplete(command,false,endTime-startTime);
- this->iv_timeLastRequestCompleted = apr_time_now();
- return;
- }
+ string text = textMessage->getText().c_str();
- astr << "Payload: " << payload;
+ astr.str("");
+ astr << "Payload: " << payload << " Content: " << text ;
LOGINFO(FINER, astr.str());
- LOGINFO(FINER, " Content: " + text);
- astr.seekp(0);
//InputSource
MemBufInputSource memIS((XMLByte const *)text.c_str(),
@@ -678,6 +936,7 @@
//the CAS which will be sent with the
//response.
if (payload == XCAS_PAYLOAD) {
+ LOGINFO(FINEST, "AMQListener::handleRequest() XCAS serialization.");
startDeserialize = apr_time_now();
XCASDeserializer deserializer;
deserializer.deserialize(memIS, *iv_pCas);
@@ -693,32 +952,25 @@
timeToSerializeCAS = apr_time_now() - startSerialize;
} else if (payload == XMI_PAYLOAD) {
//deserialize incoming xmi CAS data.
+ LOGINFO(FINEST, "AMQListener::handleRequest() XMI serialization.");
+
startDeserialize = apr_time_now();
XmiSerializationSharedData sharedData;
XmiDeserializer deserializer;
deserializer.deserialize(memIS,*iv_pCas,sharedData);
startAnnotatorProcess=apr_time_now();
timeToDeserializeCAS = startAnnotatorProcess-startDeserialize;
-
+ LOGINFO(FINEST, "AMQListener::handleRequest() calling process.");
iv_pEngine->process(*iv_pCas);
startSerialize=apr_time_now();
timeInAnalytic = startSerialize - startAnnotatorProcess;
//serialize CAS
+ LOGINFO(FINEST, "AMQListener::handleRequest() calling serialize.");
XmiWriter xmiwriter(*iv_pCas, true, &sharedData);
xmiwriter.write(xmlstr);
timeToSerializeCAS = apr_time_now() - startSerialize;
- } else {
- xmlstr << "Invalid Payload " << payload;
- LOGERROR("AMQListener::onMessage() " + xmlstr.str());
- endTime=apr_time_now();
- sendResponse(textMessage, timeToDeserializeCAS,
- timeToSerializeCAS, timeInAnalytic,
- timeIdle,endTime-startTime,xmlstr.str() ,true);
- endTime=apr_time_now();
- iv_pMonitor->processingComplete(command,false,endTime-startTime);
- this->iv_timeLastRequestCompleted = apr_time_now();
- return;
+ LOGINFO(FINEST, "AMQListener::handleRequest() done processing CAS.");
}
//done with this CAS.
iv_pCas->reset();
@@ -732,80 +984,59 @@
xmlstr.str(),false);
endTime=apr_time_now();
iv_pMonitor->processingComplete(command,true,endTime-startTime,
- timeToDeserializeCAS, timeInAnalytic, timeToSerializeCAS);
+ timeToDeserializeCAS, timeInAnalytic, timeToSerializeCAS,
+ endTime-startSendResponse);
LOGINFO(FINE,"Process CAS finished.");
} else if (command == GET_META_COMMAND ) { //get Meta
LOGINFO(FINE, "Process getMeta request start.");
endTime = apr_time_now();
+ startSendResponse = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,this->iv_aeDescriptor,false);
endTime=apr_time_now();
- iv_pMonitor->processingComplete(command,true,endTime-startTime);
+ //record timing
+ iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse);
LOGINFO(FINE,"Process getMeta request finished.");
} else if (command == CPC_COMMAND ) { //CPC
LOGINFO(FINE, "Processing CollectionProcessComplete request start");
iv_pEngine->collectionProcessComplete();
endTime = apr_time_now();
- sendResponse(textMessage, timeToDeserializeCAS,
- timeToSerializeCAS, timeInAnalytic,
+ startSendResponse = apr_time_now();
+ sendResponse(textMessage, 0,
+ 0, timeInAnalytic,
timeIdle,endTime-startTime,"CPC completed.",false);
endTime=apr_time_now();
- iv_pMonitor->processingComplete(command,true,endTime-startTime);
+ iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse);
LOGINFO(FINE, "Processing CollectionProcessComplete request finished.");
- } else {
- endTime = apr_time_now();
- stringstream str;
- str << " Invalid Request " << command << endl;
- LOGERROR(str.str());
- sendResponse(textMessage,
- timeToDeserializeCAS,
- timeToSerializeCAS,
- timeInAnalytic,
- timeIdle,
- endTime-startTime,str.str(),true);
- endTime = apr_time_now();
- iv_pMonitor->processingComplete(0,false,endTime-startTime);
- }
- iv_busy=false;
- iv_timeLastRequestCompleted = apr_time_now();
+ }
} catch (XMLException& e) {
stringstream str;
- str << "AMQListener::onMessage() XMLException." << e.getMessage();
+ str << "AMQListener::handleRequest XMLException." << e.getMessage();
LOGERROR(str.str());
endTime = apr_time_now();
+ startSendResponse = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,str.str(),true);
endTime = apr_time_now();
- iv_pMonitor->processingComplete(command,false,endTime-startTime);
- iv_timeLastRequestCompleted = apr_time_now();
- iv_busy=false;
- } catch (CMSException& e) {
- LOGERROR("AMQListener::onMessage()" + e.getMessage());
- endTime = apr_time_now();
- iv_pMonitor->processingComplete(0,false,endTime-startTime);
- iv_timeLastRequestCompleted = apr_time_now();
- iv_busy=false;
+ iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse);
} catch (uima::Exception e) {
- LOGERROR("AMQListener::onMessage() UIMA Exception " + e.asString());
+ LOGERROR("AMQListener::handleRequest UIMA Exception " + e.asString());
endTime = apr_time_now();
+ startSendResponse = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle,endTime-startTime,e.asString(),true);
endTime = apr_time_now();
- iv_pMonitor->processingComplete(command,false,endTime-startTime);
- iv_timeLastRequestCompleted = apr_time_now();
- iv_busy=false;
+ iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse);
} catch(...) {
- LOGERROR("AMQListener::onMessage() Unknown exception ");
+ LOGERROR("AMQListener::handleRequest Unknown exception ");
//TODO: log / shurdown ?}
- endTime = apr_time_now();
- iv_pMonitor->processingComplete(command,false,endTime-startTime);
- iv_timeLastRequestCompleted = apr_time_now();
- iv_busy=false;
+ endTime = apr_time_now();
+ iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startTime);
+
}
- iv_timeLastRequestCompleted = apr_time_now();
}
void AMQListener::sendResponse(const TextMessage * request,
@@ -936,9 +1167,9 @@
LOGINFO(FINER,"AMQListener::sendResponse DONE");
} catch (CMSException& ex ) {
- LOGERROR("AMQListener::onMessage()" + ex.getMessage());
+ LOGERROR("AMQListener::handleMessage()" + ex.getMessage());
} catch (...) {
- LOGERROR("AMQListener::onMessage() UnExpected error sending reply.");
+ LOGERROR("AMQListener::handleRequest() UnExpected error sending reply.");
}
}
@@ -951,8 +1182,9 @@
}
AMQAnalysisEngineService::AMQAnalysisEngineService(ServiceParameters & desc,
- Monitor * stats) :
+ Monitor * stats, apr_pool_t * pool) :
//iv_pMonitor(stats),
+ iv_pool(pool),
iv_numInstances(desc.getNumberOfInstances()),
iv_brokerURL(desc.getBrokerURL()),
iv_inputQueueName(desc.getQueueName()),
@@ -962,10 +1194,11 @@
iv_vecpConnections(),
iv_vecpAnalysisEngines(),
iv_vecpCas(),
+ iv_closed(false),
iv_listeners() {
try {
iv_pMonitor = stats;
- initialize();
+ initialize(desc);
} catch (CMSException & e) {
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam(e.getMessage());
@@ -979,13 +1212,34 @@
}
}
- void AMQAnalysisEngineService::initialize() {
+ void AMQAnalysisEngineService::initialize(ServiceParameters & params) {
try {
- //create a connection for each instance
+ //create connection factory
+ LOGINFO(FINEST,"AMQAnalysisEngineService::initialize() Create connection factory");
+ this->iv_pConnFact = AMQConnection::createConnectionFactory(params);
+ if (iv_pConnFact == NULL) {
+ ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
+ msg.addParam("AMQAnalysisEngineService::initialize() Failed to create connection factory.");
+ ErrorInfo errInfo;
+ errInfo.setMessage(msg);
+ UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
+ UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+ errInfo.getMessage(),
+ errInfo.getMessage().getMessageID(),
+ ErrorInfo::unrecoverable);
+ }
+ if (!uima::ResourceManager::hasInstance()) {
+ uima::ResourceManager::createInstance("ActiveMQAnalysisEngineService");
+ }
+ ErrorInfo errInfo;
+ UnicodeString ustr(this->iv_aeDescriptor.c_str());
+ UnicodeString ufn = ResourceManager::resolveFilename(ustr,ustr);
+
+ //create a AnalysisEngine and CAS for each instance
for (int i=0; i < iv_numInstances; i++) {
- //create a connection to MQ Broker
- AMQConnection * newConnection = new AMQConnection(this->iv_brokerURL.c_str(), iv_pMonitor);
+ //create the connection
+ AMQConnection * newConnection = new AMQConnection(this->iv_pConnFact, this->iv_pMonitor, i);
if (newConnection == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create ActiveMQ endpoint connection.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
@@ -998,20 +1252,8 @@
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
- newConnection->setExceptionListener(this);
+ /////newConnection->setExceptionListener(this);
this->iv_vecpConnections.push_back(newConnection);
- }
-
- if (!uima::ResourceManager::hasInstance()) {
- uima::ResourceManager::createInstance("ActiveMQAnalysisEngineService");
- }
- ErrorInfo errInfo;
- UnicodeString ustr(this->iv_aeDescriptor.c_str());
- UnicodeString ufn = ResourceManager::resolveFilename(ustr,ustr);
-
- //create a AnalysisEngine and CAS for each instance
- for (int i=0; i < iv_numInstances; i++) {
-
//create AE
AnalysisEngine * pEngine = uima::TextAnalysisEngine::createTextAnalysisEngine((UnicodeStringRef(ufn).asUTF8().c_str()),
errInfo);
@@ -1020,7 +1262,7 @@
} else {
LOGERROR("AMQAnalysisEngineService::initializer() could not create AE" + errInfo.asString());
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
- msg.addParam("AMQListener::initialize() create AE failed. " + errInfo.getMessage().asString() );
+ msg.addParam("AMQAnalysisEngineService::initialize() create AE failed. " + errInfo.getMessage().asString() );
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE),
errInfo.setMessage(msg);
@@ -1055,17 +1297,16 @@
ErrorInfo::unrecoverable);
}
this->iv_vecpCas.push_back(cas);
- }
-
- //create listeners and register these
- for (int i=0; i < iv_numInstances; i++) {
+
+ //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create listener " << endl;
+ //create listeners and register these
AMQListener * newListener = new AMQListener(i,iv_vecpConnections.at(i),
iv_vecpAnalysisEngines.at(i), iv_vecpCas.at(i),
iv_pMonitor);
if (newListener == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create listener.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
- msg.addParam("AnalysisEngineServcie::initialize() Could not create listener.");
+ msg.addParam("AnalysisEngineService::initialize() Could not create listener.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(Uima_runtime_error,
@@ -1075,14 +1316,55 @@
ErrorInfo::unrecoverable);
}
this->iv_listeners[i] = newListener;
-
+ //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create message consumer " << endl;
//create MessageConsumer session and register Listener
- this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,
- newListener,this->iv_prefetchSize);
+ this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector);
+ }
+
+ iv_pMonitor->setNumberOfInstances(iv_numInstances);
+ //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta " << endl;
+ //Fast GetMeta
+ //create connection
+ LOGINFO(FINEST, "AMQAnalysisEngineService::initialize() Setup GETMETA instance.");
+ iv_pgetMetaConnection = new AMQConnection(this->iv_pConnFact, this->iv_pMonitor, iv_numInstances);
+
+ if (iv_pgetMetaConnection == NULL) {
+ LOGERROR("AMQAnalysisEngineService::initialize() Could not create fast getmeta ActiveMQ endpoint connection.");
+ ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
+ msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker.");
+ ErrorInfo errInfo;
+ errInfo.setMessage(msg);
+ UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
+ UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+ errInfo.getMessage(),
+ errInfo.getMessage().getMessageID(),
+ ErrorInfo::unrecoverable);
+ }
+
+ //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta listener" << endl;
+ //create a MessageListener MessageConsumer to handle getMeta requests only.
+ AMQListener * newListener = new AMQListener(iv_numInstances,iv_pgetMetaConnection,
+ iv_vecpAnalysisEngines.at(0),
+ this->iv_pMonitor);
+ if (newListener == NULL) {
+ LOGERROR("AMQAnalysisEngineService::initialize() Could not create Fast getMeta listener.");
+ ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
+ msg.addParam("AMQAnalysisEngineService::initialize() Could not create listener.");
+ ErrorInfo errInfo;
+ errInfo.setMessage(msg);
+ UIMA_EXC_THROW_NEW(Uima_runtime_error,
+ UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+ errInfo.getMessage(),
+ errInfo.getMessage().getMessageID(),
+ ErrorInfo::unrecoverable);
}
- iv_pMonitor->setNumberOfInstances(iv_numInstances);
+ this->iv_listeners[this->iv_numInstances] = newListener;
+ iv_pgetMetaConnection->createMessageConsumer(iv_inputQueueName,
+ getmeta_selector);
+ this->iv_pMonitor->setGetMetaListenerId(iv_numInstances);
+ //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() done" << endl;
} catch (uima::Exception e) {
- cout << __LINE__ << "got a uima exception " << endl;
+ cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() failed " << e.getErrorInfo().asString() << endl;
LOGERROR("AMQAnalysisEngineService::initialize() " + e.asString());
throw e;
}
@@ -1105,75 +1387,82 @@
cout << "tracelevel=" << uimacpp_ee_tracelevel << endl;
}
- void AMQAnalysisEngineService::onException( const CMSException& ex ) {
- LOGERROR("CMS Exception occured. Shutting down the service." + ex.getMessage());
- cerr << "Broken connection. Stopped receiving messages." << endl;
- stop();
- this->iv_pMonitor->shutdown();
- }
-
- void AMQAnalysisEngineService::onMessage( const Message* message ) {
- LOGINFO(0,"AMQAnalsisEngineService::onMessage() Got a message.");
- }
-
- void AMQAnalysisEngineService::start() {
- for (size_t i=0; i < iv_vecpConnections.size(); i++) {
- cout << "Starting listener " << i << endl;
- AMQConnection * connection = iv_vecpConnections.at(i);
- if (connection != NULL) {
- connection->start();
- } else {
- LOGERROR("AMQAnalysisServiceEngine::start() Invalid connection object.");
- ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
- msg.addParam("AMQAnalysisServiceEngine::start() Connection does not exist.");
- ErrorInfo errInfo;
- errInfo.setMessage(msg);
- UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
- UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
- errInfo.getMessage(),
- errInfo.getMessage().getMessageID(),
- ErrorInfo::unrecoverable);
- }
+ void AMQAnalysisEngineService::start() {
+ LOGINFO(FINER,"AMQAnalysisEngineService::start() create listener threads.");
+ //create the listener threads
+ thd_attr=0;
+ apr_status_t rv;
+ rv = apr_threadattr_create(&thd_attr, iv_pool);
+ assert(rv == APR_SUCCESS);
+ map<int, AMQListener*>::iterator ite;
+ int i=0;
+ for (ite= iv_listeners.begin();ite != iv_listeners.end();ite++) {
+ apr_thread_t *thread=0;
+ rv = apr_thread_create(&thread, thd_attr, handleMessages, ite->second, iv_pool);
+ assert(rv == APR_SUCCESS);
+ iv_listenerThreads.push_back(thread);
}
- //update the start time
- iv_pMonitor->setStartTime();
- }
+ this->iv_pMonitor->setStartTime();
+}
- int AMQAnalysisEngineService::stop() {
- //TODO: let listeners finish processing first
- //stop messages notification
- for (size_t i=0; i < iv_vecpConnections.size(); i++) {
- AMQConnection * connection = iv_vecpConnections.at(i);
- if (connection != NULL) {
- connection->stop();
- } else {
- LOGERROR("AMQAnalysisEngineService::stop() Connection object is NULL.");
- ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
- msg.addParam("AMQAnalysisServiceEngine::start() Connection does not exist.");
- ErrorInfo errInfo;
- errInfo.setMessage(msg);
- UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
- UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
- errInfo.getMessage(),
- errInfo.getMessage().getMessageID(),
- ErrorInfo::unrecoverable);
- }
- //cout << __FILE__ << " Delete Analysis Engine " << endl;
- if (iv_vecpAnalysisEngines.at(i) != NULL) {
- delete iv_vecpAnalysisEngines.at(i);
- }
- if (iv_vecpCas.at(i) != NULL) {
- delete iv_vecpCas.at(i);
- }
+void AMQAnalysisEngineService::shutdown() {
+ //LOGWARN("AMQAnalysisEngineService::shutdown()");
+ cout << "AMQAnalysisEngineService::shutdown() going to terminate threads " << endl;;
+ //terminate the threads
+ apr_status_t rv;
+ for (size_t i=0; i < this->iv_listenerThreads.size(); i++) {
+ //cout << "wait for thread " << i << " to end " << endl;
+ this->iv_listeners[i]->stopProcessing();
+ if (!iv_listeners[i]->isReconnecting()) {
+ apr_thread_join(&rv, this->iv_listenerThreads.at(i));
+ }
+ }
+ stop();
+ //cout << "AMQAnalysisEngineService::shutdown stopped all connection" << endl;
+ cleanup();
+ //cout << "AMQAnalysisEngineService::shutdown shutdown done" << endl;
+}
+
+int AMQAnalysisEngineService::stop() {
+
+ //TODO: let listeners finish processing first
+ //stop messages notification
+
+ if (iv_pgetMetaConnection != NULL) {
+ cout << "Stopping GetMetaData instance" << endl;
+ this->iv_pgetMetaConnection->stop();
+ }
+ for (size_t i=0; i < iv_vecpConnections.size(); i++) {
+ cout << "Stopping Annotator instance " << i << endl;
+ AMQConnection * connection = iv_vecpConnections.at(i);
+ if (connection != NULL) {
+ connection->stop();
+ } else {
+ LOGERROR("AMQAnalysisEngineService::stop() Connection object is NULL.");
+ ErrorInfo errInfo;
+ errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "connection object is NULL."));
+ UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
+ UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+ errInfo.getMessage(),
+ errInfo.getMessage().getMessageID(),
+ ErrorInfo::unrecoverable);
}
- iv_vecpAnalysisEngines.clear();
- iv_vecpCas.clear();
- return 0;
- }
+ }
+ return 0;
+}
+
void AMQAnalysisEngineService::cleanup() {
// Destroy resources.
try {
+ if (iv_closed) {
+ return;
+ }
+ if (iv_pgetMetaConnection != NULL) {
+ delete this->iv_pgetMetaConnection;
+ this->iv_pgetMetaConnection = 0;
+ }
+
//cout << "num consumerConnections " << consumerConnections.size() << endl;
for (size_t i=0; i < iv_vecpConnections.size(); i++) {
//cout << "deleting consumerConnection " << i << endl;
@@ -1181,26 +1470,35 @@
iv_vecpConnections.at(i)->stop();
delete iv_vecpConnections.at(i);
}
- /**
- if (iv_vecpAnalysisEngines.at(i) != NULL) {
- delete iv_vecpAnalysisEngines.at(i);
- }
- if (iv_vecpCas.at(i) != NULL) {
- delete iv_vecpCas.at(i);
- }
- **/
}
+
+ for (size_t i=0; i < iv_vecpAnalysisEngines.size(); i++) {
+ delete iv_vecpAnalysisEngines.at(i);
+ }
+
+ for (size_t i=0; i < iv_vecpCas.size(); i++) {
+ delete iv_vecpCas.at(i);
+ }
+
iv_vecpAnalysisEngines.clear();
iv_vecpCas.clear();
iv_vecpConnections.clear();
+
map<int, AMQListener*>::iterator ite;
for (ite= iv_listeners.begin();ite != iv_listeners.end();ite++) {
delete ite->second;
}
iv_listeners.clear();
+
+ if (iv_pConnFact != NULL) {
+ delete iv_pConnFact;
+ iv_pConnFact = NULL;
+ }
+ iv_closed = true;
+
} catch (CMSException& e) {
- LOGERROR("AMQAnalysisEngineService::cleanup() " + e.getMessage());
+ cerr << "AMQAnalysisEngineService::cleanup() " << e.getMessage() << endl;;
}
}
@@ -1209,22 +1507,16 @@
//CommonUtils
//---------------------------------------------------
void CommonUtils::logError(string msg) {
- iv_pMonitor->logError(msg);
+ this->iv_pMonitor->logError(msg);
//cerr << "ERROR: " << msg << endl;
}
void CommonUtils::logWarning(string msg) {
- iv_pMonitor->logWarning(msg);
+ this->iv_pMonitor->logWarning(msg);
//cout << "WARN: " << msg << endl;
}
void CommonUtils::logMessage(string msg) {
- iv_pMonitor->logMessage(msg);
+ cout << "INFO: " << msg << endl;
+ this->iv_pMonitor->logMessage(msg);
//cout << "INFO: " << msg << endl;
}
-
-
-
-
-
-
-
Modified: incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp?rev=821374&r1=821373&r2=821374&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp Sat Oct 3 17:46:49 2009
@@ -25,10 +25,14 @@
#define __ACTIVEMQ_AE_SERVICE__
#include "uima/api.hpp"
+#include <cms/ConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
+#include "time.h"
+#include <apr_thread_proc.h>
+
using namespace cms;
using namespace uima;
@@ -37,6 +41,13 @@
class Monitor;
class ServiceParameters;
+/** The function that the request processing thread will run.
+ * It receives and processes each message from the input queue
+ * as it arrives.
+ */
+static void* APR_THREAD_FUNC handleMessages(apr_thread_t *thd, void *data);
+
+
/** common base class */
class CommonUtils {
protected:
@@ -58,8 +69,12 @@
public CommonUtils {
private:
string iv_brokerURL;
+
Connection* iv_pConnection;
bool iv_valid;
+ bool iv_reconnecting;
+ bool iv_started;
+ int iv_id;
//consumer session
Session* iv_pConsumerSession;
@@ -67,21 +82,28 @@
string iv_inputQueueName;
cms::Queue * iv_pInputQueue;
MessageListener * iv_pListener;
+ TextMessage * iv_pReceivedMessage;
+ string iv_selector;
//producer session
Session * iv_pProducerSession;
MessageProducer * iv_pProducer;
- TextMessage * iv_pTextMessage;
+ TextMessage * iv_pReplyMessage;
map<string, cms::Destination*> iv_replyDestinations; //queuename-destination
-
+
+ void initialize();
public:
+
+ static ConnectionFactory * createConnectionFactory(ServiceParameters & params);
+
/** Establish connection to the broker and create a Message Producer session.
*/
- AMQConnection ( const char * aBrokerURL, Monitor * pStatistics);
+ AMQConnection ( ConnectionFactory * connFact, Monitor * pStatistics, int id);
/** Creates a MessageConsumer session and registers a listener.
Caller owns the listener. */
- void createMessageConsumer(string aQueueName, MessageListener * pListener, int prefetch);
+ //void createMessageConsumer(string aQueueName);
+ void createMessageConsumer(string aQueueName, string selector);
/** destructor */
~AMQConnection();
@@ -101,16 +123,24 @@
/** returns a TextMessage owned by this class. */
TextMessage * getTextMessage();
- /** sends the message and clears it. */
+ /** sends the reply message and clears it. */
void sendMessage(string queueName);
-
- /** sends the message and clears it. */
void sendMessage(const Destination * cmsReplyTo);
+ void sendMessage (TextMessage * request);
+
bool isValid() {
return iv_valid;
}
+ bool isStarted() {
+ return iv_started;
+ }
+
+ bool isReconnecting() {
+ return iv_reconnecting;
+ }
+
/** get the brokerURL */
string getBrokerURL() {
return this->iv_brokerURL;
@@ -120,6 +150,25 @@
string getInputQueueName() {
return this->iv_inputQueueName;
}
+
+ /** reset the connection handles */
+ void reset();
+
+ /** reset before reconnecting */
+ void resetBeforeReconnect();
+
+
+ /** reestablish broken connection.
+ * Attempts to reconnect 30 seconds after each failed attempt.
+ */
+ void reconnect();
+
+ /** receives the next message for this consumer.
+ * Delay is in millis.
+ */
+ Message * receive(const int delay);
+
+ ConnectionFactory * iv_pConnFact;
};
@@ -129,10 +178,11 @@
//--------------------------------------------------
class AMQConnectionsCache : public CommonUtils {
private:
+ ConnectionFactory * iv_pConnFact;
map<string, AMQConnection *> iv_connections; //key is brokerurl
public:
- AMQConnectionsCache(Monitor * pStatistics);
+ AMQConnectionsCache(ConnectionFactory * pConnFact, Monitor * pStatistics);
~AMQConnectionsCache();
@@ -146,15 +196,17 @@
};
//======================================================
-// A MessageListener that handles getMeta, processCAS
+// This class handles getMeta, processCAS
// and Collection Processing Complete requests.
//
// Records timing and error JMX statistics.
//
//------------------------------------------------------
-class AMQListener : public MessageListener,
- public CommonUtils {
+class AMQListener : public CommonUtils {
private:
+ apr_thread_t *thd;
+ bool iv_stopProcessing;
+
int iv_id; //Listener id
string iv_inputQueueName; //queue this listener gets messages from
string iv_brokerURL; //broker this listener is connected to
@@ -169,15 +221,27 @@
AMQConnectionsCache iv_replyToConnections; //maintain connections cache for
//sending reply to other brokers.
+ void getMetaData(AnalysisEngine * pEngine);
+
+ void handleRequest(const Message * request);
+
+ bool validateRequest(const TextMessage *, string &);
+
void sendResponse(const TextMessage * request, apr_time_t timeToDeserialize,
apr_time_t timeToSerialize, apr_time_t timeInAnalytic,
apr_time_t idleTime, apr_time_t elapsedTime,
string msgContent, bool isExceptionMsg);
public:
- /** constructor */
+
+ /** constructor */
AMQListener(int id,
AMQConnection * pConnection,
AnalysisEngine * pEngine,
+ Monitor * pStatistics);
+
+ AMQListener(int id,
+ AMQConnection * pConnection,
+ AnalysisEngine * pEngine,
CAS * pCas,
Monitor * pStatistics);
/** destructor */
@@ -187,11 +251,19 @@
return this->iv_busy;
}
- /*
- * Process the message. Handles GETMETA, PROCESSCAS
- * and CPC requests.
- */
- virtual void onMessage( const Message* message );
+ /* Flag to break out of receive loop */
+ void stopProcessing() {
+ iv_stopProcessing = true;
+ }
+
+ bool isReconnecting() {
+ return iv_pConnection->isReconnecting();
+ }
+ /*
+ * Synchronously receives and processes messages
+ */
+ void receiveAndProcessMessages(apr_thread_t *thd);
+
};
@@ -201,13 +273,13 @@
//
// This class creates connections to an ActiveMQ broker
// and registers one or more MessageConsumers to receive
-// messages from a specified queue. The MessageConsumers can
-// process requests for GETMETA, PROCESSCAS and Collection
-// Process Complete.
-//
-// Each MessageConsumer registers a MessageListener that will
-// receive and process messages. Each instance of the MessageListener
-// is initialized with an instance of the AnalysisEngine and a CAS.
+// messages from a specified queue.
+//
+// A thread is started for each instance. Each thread maintains a
+// connection to the broker, and an instance of the UIMA AnalysisEngine.
+// To support fast handling of GETMETA requests, an additional separate
+// MessageConsumer and thread is started that processes only GETMETA requests.
+//
//
// The service wrapper sets acknowledgment mode to AUTO_ACKNOWLEDGE mode
// by default and lets the underlying middleware handle the message
@@ -222,10 +294,15 @@
// See the UIMA-EE documentation for how to start and manage a C++
// servcice from Java using the UimacppServiceController bean.
//------------------------------------------------------------------------
-class AMQAnalysisEngineService : public ExceptionListener,
- public MessageListener,
- public CommonUtils {
+class AMQAnalysisEngineService : public CommonUtils {
private:
+
+ apr_pool_t * iv_pool;
+ apr_threadattr_t *thd_attr;
+ ConnectionFactory * iv_pConnFact;
+ vector<apr_thread_t *> iv_listenerThreads;
+
+
string iv_brokerURL;
string iv_inputQueueName;
string iv_aeDescriptor;
@@ -233,28 +310,30 @@
int iv_prefetchSize;
size_t iv_initialFSHeapSize;
+ AMQConnection * iv_pgetMetaConnection;
vector <AMQConnection*> iv_vecpConnections;
vector <AnalysisEngine *> iv_vecpAnalysisEngines;
vector <CAS *> iv_vecpCas;
map<int, AMQListener *> iv_listeners; //id - listener
- void initialize();
+ bool iv_started;
+ bool iv_closed;
+
+ void initialize(ServiceParameters & params);
public:
void cleanup();
~AMQAnalysisEngineService();
- AMQAnalysisEngineService(ServiceParameters & desc, Monitor * pStatistics);
+ AMQAnalysisEngineService(ServiceParameters & desc, Monitor * pStatistics, apr_pool_t * pool);
void setTraceLevel(int level);
-
- void onException( const CMSException& ex );
-
- virtual void onMessage( const Message* message );
void start();
- int stop();
+ int stop();
+
+ void shutdown();
};
#endif
Modified: incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp?rev=821374&r1=821373&r2=821374&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp Sat Oct 3 17:46:49 2009
@@ -64,7 +64,7 @@
apr_pool_t *pool;
rv = apr_pool_create(&pool, NULL);
if (rv != APR_SUCCESS) {
- cerr << "ERROR: apr_pool_create() failed. " << endl;
+ cerr << __FILE__ << " ERROR: apr_pool_create() failed. " << endl;
return -1;
}
@@ -78,7 +78,7 @@
initialize(serviceDesc, pool);
/*create service*/
- AMQAnalysisEngineService aeService(serviceDesc,singleton_pMonitor);
+ AMQAnalysisEngineService aeService(serviceDesc,singleton_pMonitor, pool);
aeService.setTraceLevel(serviceDesc.getTraceLevel());
/*start receiving messages*/
@@ -108,7 +108,7 @@
/* shutdown */
// uima::ResourceManager::getInstance().getLogger().logMessage("deployCppService shutting down.");
cout << __FILE__ << " shutting down." << endl;
- aeService.stop();
+ aeService.shutdown();
terminateService();
if (pool) {