You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2009/10/16 17:01:43 UTC
svn commit: r825922 - in /incubator/uima/uimacpp/trunk/src/utils:
ActiveMQAnalysisEngineService.cpp ActiveMQAnalysisEngineService.hpp
deployCppService.cpp deployCppService.hpp
Author: eae
Date: Fri Oct 16 15:01:42 2009
New Revision: 825922
URL: http://svn.apache.org/viewvc?rev=825922&view=rev
Log:
UIMA-1617 commit Bhavani's UIMACPP-1617.patch
Modified:
incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp
incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp
incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp
incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp
Modified: incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp?rev=825922&r1=825921&r2=825922&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp Fri Oct 16 15:01:42 2009
@@ -462,7 +462,6 @@
//stops receiving messages
void AMQConnection::stop() {
- LOGINFO(0,"AMQConnection::stop(). ");
if (this->iv_pConnection != NULL) {
this->iv_pConnection->stop();
} else {
@@ -793,7 +792,8 @@
this->thd = thd;
cout << "Instance: " << iv_id << " ThreadId: " << apr_os_thread_current() << " started." << endl;
//start receiving messages
- this->iv_pConnection->start();
+ //this->iv_pConnection->start();
+
this->iv_stopProcessing = false;
apr_time_t lastStatsTime = apr_time_now();
while (!iv_stopProcessing) {
@@ -1387,7 +1387,7 @@
cout << "tracelevel=" << uimacpp_ee_tracelevel << endl;
}
- void AMQAnalysisEngineService::start() {
+ void AMQAnalysisEngineService::startProcessingThreads() {
LOGINFO(FINER,"AMQAnalysisEngineService::start() create listener threads.");
//create the listener threads
thd_attr=0;
@@ -1406,8 +1406,35 @@
}
+void AMQAnalysisEngineService::start() {
+
+ if (iv_pgetMetaConnection != NULL) {
+ cerr << "Startinging GetMetaData instance" << endl;
+ this->iv_pgetMetaConnection->start();
+ }
+ for (size_t i=0; i < iv_vecpConnections.size(); i++) {
+ cerr << "Starting Annotator instance " << i << endl;
+ AMQConnection * connection = iv_vecpConnections.at(i);
+ if (connection != NULL) {
+ connection->start();
+ } else {
+ LOGERROR("AMQAnalysisEngineService::start() Connection object is NULL.");
+ ErrorInfo errInfo;
+ errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "connection object is NULL."));
+ UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
+ UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+ errInfo.getMessage(),
+ errInfo.getMessage().getMessageID(),
+ ErrorInfo::unrecoverable);
+ }
+ }
+}
+
void AMQAnalysisEngineService::shutdown() {
//LOGWARN("AMQAnalysisEngineService::shutdown()");
+
+ stop();
+
cout << "AMQAnalysisEngineService::shutdown() going to terminate threads " << endl;;
//terminate the threads
apr_status_t rv;
@@ -1418,10 +1445,10 @@
apr_thread_join(&rv, this->iv_listenerThreads.at(i));
}
}
- stop();
- //cout << "AMQAnalysisEngineService::shutdown stopped all connection" << endl;
+
+ cout << "AMQAnalysisEngineService::shutdown stopped all connection" << endl;
cleanup();
- //cout << "AMQAnalysisEngineService::shutdown shutdown done" << endl;
+ cout << "AMQAnalysisEngineService::shutdown shutdown done" << endl;
}
int AMQAnalysisEngineService::stop() {
@@ -1430,11 +1457,11 @@
//stop messages notification
if (iv_pgetMetaConnection != NULL) {
- cout << "Stopping GetMetaData instance" << endl;
+ cerr << "Stopping GetMetaData instance" << endl;
this->iv_pgetMetaConnection->stop();
}
for (size_t i=0; i < iv_vecpConnections.size(); i++) {
- cout << "Stopping Annotator instance " << i << endl;
+ cerr << "Stopping Annotator instance " << i << endl;
AMQConnection * connection = iv_vecpConnections.at(i);
if (connection != NULL) {
connection->stop();
@@ -1452,6 +1479,42 @@
return 0;
}
+void AMQAnalysisEngineService::quiesceAndStop() {
+
+ quiesce();
+
+ //shutdown worker threads.
+ cout << "AMQAnalysisEngineService::quiesceAndStop() going to terminate threads " << endl;;
+ if (this->iv_pMonitor->getQuiesceAndStop() ) {
+ for (size_t i=0; i < this->iv_listenerThreads.size(); i++) {
+ //cout << "wait for thread " << i << " to end " << endl;
+ this->iv_listeners[i]->stopProcessing();
+ apr_thread_join(&rv, this->iv_listenerThreads.at(i));
+ }
+ }
+
+ cleanup();
+}
+
+void AMQAnalysisEngineService::quiesce() {
+ //stop connections - does not work
+ stop();
+
+ //check whether processing threads are finished.
+ bool allfinished = false;
+ while (!allfinished) {
+ allfinished = true;
+ map<int, AMQListener*>::iterator ite;
+ for (ite= iv_listeners.begin();ite != iv_listeners.end();ite++) {
+ if (ite->second->isBusy()) {
+ allfinished = false;
+ break;
+ }
+ }
+ Thread::sleep(1000);
+ }
+}
+
void AMQAnalysisEngineService::cleanup() {
// Destroy resources.
try {
@@ -1463,7 +1526,6 @@
this->iv_pgetMetaConnection = 0;
}
- //cout << "num consumerConnections " << consumerConnections.size() << endl;
for (size_t i=0; i < iv_vecpConnections.size(); i++) {
//cout << "deleting consumerConnection " << i << endl;
if (iv_vecpConnections.at(i) != NULL) {
Modified: incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp?rev=825922&r1=825921&r2=825922&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp Fri Oct 16 15:01:42 2009
@@ -328,11 +328,17 @@
AMQAnalysisEngineService(ServiceParameters & desc, Monitor * pStatistics, apr_pool_t * pool);
void setTraceLevel(int level);
+
+ void startProcessingThreads();
void start();
int stop();
+ void quiesce();
+
+ void quiesceAndStop();
+
void shutdown();
};
Modified: incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp?rev=825922&r1=825921&r2=825922&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp Fri Oct 16 15:01:42 2009
@@ -81,12 +81,15 @@
AMQAnalysisEngineService aeService(serviceDesc,singleton_pMonitor, pool);
aeService.setTraceLevel(serviceDesc.getTraceLevel());
+ /* create processing threads */
+ aeService.startProcessingThreads();
+
/*start receiving messages*/
cout << __FILE__ << " Start receiving messages " << endl;
aeService.start();
cout << __FILE__ << " UIMA C++ Service " << serviceDesc.getQueueName() << " at " <<
- serviceDesc.getBrokerURL() << " Ready to process..." << endl;
+ serviceDesc.getBrokerURL() << " Ready to process..." << endl;
/* connect to java proxy if called from java */
apr_thread_t *thread=0;
@@ -97,18 +100,41 @@
assert(rv == APR_SUCCESS);
//rv = apr_thread_join(&rv, thread);
//assert(rv == APR_SUCCESS);
- }
+ } else {
+ apr_thread_t *stdinthread=0;
+ rv = apr_thread_create(&stdinthread, thd_attr, readstdin, 0, pool);
+ }
//wait
apr_thread_mutex_lock(singleton_pMonitor->cond_mutex);
apr_thread_cond_wait(singleton_pMonitor->cond, singleton_pMonitor->cond_mutex);
- cerr << __FILE__ << " Received SHUTDOWN signal " << endl;
- apr_thread_mutex_unlock(singleton_pMonitor->cond_mutex);
-
- /* shutdown */
- // uima::ResourceManager::getInstance().getLogger().logMessage("deployCppService shutting down.");
- cout << __FILE__ << " shutting down." << endl;
- aeService.shutdown();
+ apr_thread_mutex_unlock(singleton_pMonitor->cond_mutex);
+
+ if (singleton_pMonitor->getQuiesceAndStop()) {
+ cerr << __FILE__ << " " << serviceDesc.getServiceName() << " Quiesce started. " << endl;
+
+ //quiesce
+ aeService.quiesceAndStop();
+
+ cerr << __FILE__ << " " << serviceDesc.getServiceName() << " quiesced. " << endl;
+ } else {
+ cerr << __FILE__ << " Shutdown started. " << endl;
+ aeService.shutdown();
+ cerr << __FILE__ << " Shutdown done. " << endl;
+ }
+
+ //notify java controller
+ if (cs) {
+ apr_size_t len = 4;
+ apr_status_t rv = apr_socket_send(cs, "DONE" , &len);
+ len = 1;
+ apr_socket_send(cs,"\n", &len);
+ if (rv != APR_SUCCESS) {
+ cerr << " apr_socket_send() failed sending shutdown notification." << endl;
+ }
+ }
+
+ /* cleanup */
terminateService();
if (pool) {
Modified: incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp?rev=825922&r1=825921&r2=825922&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp Fri Oct 16 15:01:42 2009
@@ -60,6 +60,8 @@
static void* APR_THREAD_FUNC handleCommands(apr_thread_t *thd, void *data);
static int terminateService();
static void signal_handler(int signum);
+static void* APR_THREAD_FUNC readstdin(apr_thread_t *thd, void *data);
+
/**
@@ -317,7 +319,7 @@
rv = apr_socket_send(socket, str.str().c_str(), &len);
if (rv != APR_SUCCESS) {
- cerr << "apr_socket_send() failed " << str.str() << endl;
+ cerr << __FILE__ << __LINE__ << " apr_socket_send() failed " << str.str() << endl;
}
apr_thread_mutex_unlock(mutex);
}
@@ -349,6 +351,7 @@
SocketLogger * logger)
{
iv_status = "Initializing";
+ iv_quiesceandstop=false;
iv_brokerURL = brokerURL;
iv_queueName = queueName;
iv_aeDescriptor = aeDesc;
@@ -391,6 +394,18 @@
return ;
}
+ void setQuiesceAndStop() {
+ //apr_thread_mutex_unlock(mutex);
+ // apr_thread_mutex_unlock(lmutex);
+ this->iv_quiesceandstop = true;
+ apr_thread_cond_signal(this->cond);
+ return ;
+ }
+
+ bool getQuiesceAndStop() {
+ return iv_quiesceandstop;
+ }
+
const string & getBrokerURL() const {
return iv_brokerURL;
}
@@ -668,6 +683,7 @@
string iv_status;
int iv_numInstances;
int iv_prefetchSize;
+ bool iv_quiesceandstop;
INT64 iv_cpcErrors;
INT64 iv_getmetaErrors;
@@ -732,7 +748,8 @@
stringstream str;
str << __FILE__ << __LINE__ << " Received Signal: " << signum;
cerr << str.str() << endl;
- singleton_pMonitor->shutdown();
+ //singleton_pMonitor->shutdown();
+ singleton_pMonitor->setQuiesceAndStop();
}
@@ -853,14 +870,14 @@
apr_socket_send(cs,"\n", &len);
cout << "sent 0 to controller " << endl;
//receive JMX, admin requests from controller
- char buf[9];
- memset(buf,0,9);
- len = 8;
+ char buf[16];
+ memset(buf,0,16);
+ len = 16;
while ( (rv = apr_socket_recv(cs, buf, &len)) != APR_EOF) {
string command = buf;
- memset(buf,0,9);
- len=8;
- //cout << "apr_socket_recv command=" << command << endl;
+ memset(buf,0,16);
+ cout << len << " apr_socket_recv command=" << command << endl;
+ len=16;
if (command.compare("GETSTATS")==0) {
//singleton_pLogger->log(LogStream::EnMessage,"deployCppService","getStats","retrieving stats",0);
singleton_pMonitor->writeStatistics(cs);
@@ -869,6 +886,11 @@
singleton_pLogger->log(uima::LogStream::EnMessage,"deployCppService", "RESET",
"reset JMX statistics",0);
singleton_pMonitor->reset();
+ } else if (command.compare("QUIESCEANDSTOP")==0) {
+ singleton_pLogger->log(uima::LogStream::EnMessage,"deployCppService", "QUIESCEANDSTOP",
+ "quiesce and shutdown",0);
+ singleton_pMonitor->setQuiesceAndStop();
+ break;
} else if (command.compare("SHUTDOWN")==0) {
singleton_pMonitor->shutdown();
break;
@@ -893,6 +915,30 @@
return NULL;
}
+
+static void* APR_THREAD_FUNC readstdin(apr_thread_t *thd, void *data) {
+
+ printf ("Enter 'q' to quiesce and stop or 's' to stop:\n ");
+ char str[2];
+ str[0] = '\n';
+
+
+ while( str[0] == '\n' ) {
+ scanf("%s", str);
+
+ if (str[0] == 's') {
+ singleton_pMonitor->shutdown();
+ } else if (str[0] == 'q') {
+ singleton_pMonitor->setQuiesceAndStop();
+ } else {
+ apr_sleep(1000000);
+ str[0] = '\n';
+ printf ("Enter 'q' to quiesce and stop or 's' to stop:\n ");
+ }
+ }
+ return 0;
+}
+
#endif