You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2009/10/16 17:01:43 UTC

svn commit: r825922 - in /incubator/uima/uimacpp/trunk/src/utils: ActiveMQAnalysisEngineService.cpp ActiveMQAnalysisEngineService.hpp deployCppService.cpp deployCppService.hpp

Author: eae
Date: Fri Oct 16 15:01:42 2009
New Revision: 825922

URL: http://svn.apache.org/viewvc?rev=825922&view=rev
Log:
UIMA-1617 commit Bhavani's UIMACPP-1617.patch

Modified:
    incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp
    incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp
    incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp
    incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp

Modified: incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp?rev=825922&r1=825921&r2=825922&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.cpp Fri Oct 16 15:01:42 2009
@@ -462,7 +462,6 @@
 
 //stops receiving messages
   void AMQConnection::stop() {
-    LOGINFO(0,"AMQConnection::stop(). ");
     if (this->iv_pConnection != NULL) {
       this->iv_pConnection->stop();
     } else {
@@ -793,7 +792,8 @@
     this->thd = thd;
     cout << "Instance: " << iv_id << " ThreadId: " << apr_os_thread_current() <<  " started." << endl;
     //start receiving messages
-    this->iv_pConnection->start();
+    //this->iv_pConnection->start();
+
     this->iv_stopProcessing = false;
     apr_time_t lastStatsTime = apr_time_now(); 
     while (!iv_stopProcessing) {
@@ -1387,7 +1387,7 @@
     cout << "tracelevel=" << uimacpp_ee_tracelevel << endl;
   }
 
-  void AMQAnalysisEngineService::start() {
+  void AMQAnalysisEngineService::startProcessingThreads() {
     LOGINFO(FINER,"AMQAnalysisEngineService::start() create listener threads.");
     //create the listener threads
     thd_attr=0;
@@ -1406,8 +1406,35 @@
 }
 
 
+void AMQAnalysisEngineService::start() {
+  
+  if (iv_pgetMetaConnection != NULL) {
+    cerr << "Startinging GetMetaData instance" << endl;
+    this->iv_pgetMetaConnection->start();
+  }
+  for (size_t i=0; i < iv_vecpConnections.size(); i++) {
+    cerr << "Starting Annotator instance " << i << endl;
+    AMQConnection * connection = iv_vecpConnections.at(i);
+    if (connection != NULL) {
+      connection->start();
+    } else {
+      LOGERROR("AMQAnalysisEngineService::start() Connection object is NULL.");
+      ErrorInfo errInfo;
+      errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "connection object is NULL."));
+      UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
+        UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
+        errInfo.getMessage(),
+        errInfo.getMessage().getMessageID(),
+        ErrorInfo::unrecoverable);
+    }
+  } 
+}	
+
 void AMQAnalysisEngineService::shutdown() {
   //LOGWARN("AMQAnalysisEngineService::shutdown()");
+
+  stop();
+
   cout << "AMQAnalysisEngineService::shutdown() going to terminate threads " << endl;;
   //terminate the threads
   apr_status_t rv;
@@ -1418,10 +1445,10 @@
       apr_thread_join(&rv, this->iv_listenerThreads.at(i));
     }
   }
-  stop();
-  //cout << "AMQAnalysisEngineService::shutdown stopped all connection" << endl;
+  
+  cout << "AMQAnalysisEngineService::shutdown stopped all connection" << endl;
   cleanup();
-  //cout << "AMQAnalysisEngineService::shutdown shutdown done" << endl;
+  cout << "AMQAnalysisEngineService::shutdown shutdown done" << endl;
 }
 
 int AMQAnalysisEngineService::stop() {
@@ -1430,11 +1457,11 @@
   //stop messages notification
 
   if (iv_pgetMetaConnection != NULL) {
-    cout << "Stopping GetMetaData instance" << endl;
+    cerr << "Stopping GetMetaData instance" << endl;
     this->iv_pgetMetaConnection->stop();
   }
   for (size_t i=0; i < iv_vecpConnections.size(); i++) {
-    cout << "Stopping Annotator instance " << i << endl;
+    cerr << "Stopping Annotator instance " << i << endl;
     AMQConnection * connection = iv_vecpConnections.at(i);
     if (connection != NULL) {
       connection->stop();
@@ -1452,6 +1479,42 @@
   return 0;
 }	
 
+void  AMQAnalysisEngineService::quiesceAndStop() {
+  
+  quiesce();
+  
+  //shutdown worker threads.
+   cout << "AMQAnalysisEngineService::quiesceAndStop() going to terminate threads " << endl;;
+  if (this->iv_pMonitor->getQuiesceAndStop() ) {
+    for (size_t i=0; i < this->iv_listenerThreads.size(); i++) {
+      //cout << "wait for thread " << i << " to end " << endl;
+      this->iv_listeners[i]->stopProcessing();
+      apr_thread_join(&rv, this->iv_listenerThreads.at(i));
+    }
+  }
+
+  cleanup();
+}
+
+void  AMQAnalysisEngineService::quiesce() {
+  //stop connections - does not work
+  stop();
+
+  //check whether processing threads are finished.
+  bool allfinished = false;
+  while (!allfinished) {
+    allfinished = true;
+    map<int, AMQListener*>::iterator ite; 
+    for (ite= iv_listeners.begin();ite !=  iv_listeners.end();ite++) {
+      if (ite->second->isBusy()) { 
+        allfinished  = false;
+        break;
+      }
+    }
+    Thread::sleep(1000);
+  }
+}
+
   void AMQAnalysisEngineService::cleanup() {
     // Destroy resources.
     try { 
@@ -1463,7 +1526,6 @@
         this->iv_pgetMetaConnection = 0;
       }
 
-      //cout << "num consumerConnections " << consumerConnections.size() << endl;
       for (size_t i=0; i < iv_vecpConnections.size(); i++) {
         //cout << "deleting consumerConnection " << i << endl;	
         if (iv_vecpConnections.at(i) != NULL) {

Modified: incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp?rev=825922&r1=825921&r2=825922&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/ActiveMQAnalysisEngineService.hpp Fri Oct 16 15:01:42 2009
@@ -328,11 +328,17 @@
 	AMQAnalysisEngineService(ServiceParameters & desc, Monitor * pStatistics, apr_pool_t * pool);
 
 	void setTraceLevel(int level);
+
+  void startProcessingThreads();
 	
 	void start();
 		
 	int stop(); 	
 
+  void quiesce();
+
+  void quiesceAndStop();
+
   void shutdown();
 };
 

Modified: incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp?rev=825922&r1=825921&r2=825922&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/deployCppService.cpp Fri Oct 16 15:01:42 2009
@@ -81,12 +81,15 @@
     AMQAnalysisEngineService aeService(serviceDesc,singleton_pMonitor, pool);
     aeService.setTraceLevel(serviceDesc.getTraceLevel());
 
+    /* create processing threads */
+    aeService.startProcessingThreads();
+
     /*start receiving messages*/ 
     cout << __FILE__ << " Start receiving messages " << endl;
     aeService.start();
 
     cout << __FILE__ << " UIMA C++ Service " << serviceDesc.getQueueName() << " at " <<
-      serviceDesc.getBrokerURL() << " Ready to process..." << endl;
+    serviceDesc.getBrokerURL() << " Ready to process..." << endl;
     
     /* connect to java proxy if called from java */  
     apr_thread_t *thread=0;
@@ -97,18 +100,41 @@
       assert(rv == APR_SUCCESS);
       //rv = apr_thread_join(&rv, thread);
       //assert(rv == APR_SUCCESS);
-    }
+    }  else {
+      apr_thread_t *stdinthread=0;
+      rv = apr_thread_create(&stdinthread, thd_attr, readstdin, 0, pool);
+    }  
 
     //wait 
     apr_thread_mutex_lock(singleton_pMonitor->cond_mutex);
     apr_thread_cond_wait(singleton_pMonitor->cond, singleton_pMonitor->cond_mutex);
-    cerr << __FILE__ << " Received SHUTDOWN signal " << endl;
-    apr_thread_mutex_unlock(singleton_pMonitor->cond_mutex);   
-  
-    /* shutdown */
-   // uima::ResourceManager::getInstance().getLogger().logMessage("deployCppService shutting down.");
-    cout << __FILE__ << " shutting down." << endl;
-    aeService.shutdown();
+    apr_thread_mutex_unlock(singleton_pMonitor->cond_mutex);    
+    
+    if (singleton_pMonitor->getQuiesceAndStop()) {
+      cerr << __FILE__ << " " << serviceDesc.getServiceName() << " Quiesce started. " << endl;
+
+      //quiesce 
+      aeService.quiesceAndStop();
+      
+      cerr << __FILE__ << " " << serviceDesc.getServiceName() << " quiesced. " << endl;
+    } else {
+      cerr << __FILE__ << " Shutdown started. " << endl;
+      aeService.shutdown();
+      cerr << __FILE__ << " Shutdown done. " << endl;
+    }
+
+    //notify java controller
+    if (cs) {
+      apr_size_t len = 4;
+      apr_status_t rv = apr_socket_send(cs, "DONE" , &len);
+      len = 1;
+      apr_socket_send(cs,"\n", &len);
+      if (rv != APR_SUCCESS) {
+        cerr << " apr_socket_send() failed sending shutdown notification." << endl;
+      }
+    }  
+
+    /* cleanup */
     terminateService();
     
     if (pool) {

Modified: incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp?rev=825922&r1=825921&r2=825922&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp Fri Oct 16 15:01:42 2009
@@ -60,6 +60,8 @@
 static void* APR_THREAD_FUNC handleCommands(apr_thread_t *thd, void *data);
 static int terminateService();
 static void signal_handler(int signum);
+static void* APR_THREAD_FUNC readstdin(apr_thread_t *thd, void *data);
+
 
 
 /**
@@ -317,7 +319,7 @@
       rv = apr_socket_send(socket, str.str().c_str(), &len);
 
       if (rv != APR_SUCCESS) {
-        cerr << "apr_socket_send() failed " <<  str.str() << endl;
+        cerr << __FILE__ << __LINE__ <<   " apr_socket_send() failed " <<  str.str() << endl;
       }
       apr_thread_mutex_unlock(mutex);
     }
@@ -349,6 +351,7 @@
       SocketLogger * logger)         
     {
       iv_status = "Initializing";
+      iv_quiesceandstop=false;
       iv_brokerURL = brokerURL;
       iv_queueName = queueName;
       iv_aeDescriptor = aeDesc;
@@ -391,6 +394,18 @@
       return ;
     }
 
+    void setQuiesceAndStop() {
+      //apr_thread_mutex_unlock(mutex);
+      //  apr_thread_mutex_unlock(lmutex);
+      this->iv_quiesceandstop = true;
+      apr_thread_cond_signal(this->cond);
+      return ;
+    }
+
+    bool getQuiesceAndStop() {
+      return iv_quiesceandstop;
+    }
+
     const string & getBrokerURL() const {
       return iv_brokerURL;
     }
@@ -668,6 +683,7 @@
     string iv_status;
     int iv_numInstances;
     int iv_prefetchSize;
+    bool iv_quiesceandstop;
 
     INT64 iv_cpcErrors;
     INT64 iv_getmetaErrors;
@@ -732,7 +748,8 @@
   stringstream str;
   str << __FILE__ << __LINE__ << " Received Signal: " << signum;
   cerr << str.str() << endl;
-  singleton_pMonitor->shutdown();
+  //singleton_pMonitor->shutdown();
+  singleton_pMonitor->setQuiesceAndStop();
 }
 
 
@@ -853,14 +870,14 @@
   apr_socket_send(cs,"\n", &len);
   cout << "sent 0 to controller " << endl;
   //receive JMX, admin requests from controller 
-  char buf[9];
-  memset(buf,0,9);
-  len = 8;
+  char buf[16];
+  memset(buf,0,16);
+  len = 16;
   while ( (rv = apr_socket_recv(cs, buf, &len)) != APR_EOF) {
     string command = buf;
-    memset(buf,0,9);
-    len=8;
-    //cout << "apr_socket_recv command=" << command << endl;
+    memset(buf,0,16);
+    cout << len << " apr_socket_recv command=" << command << endl;
+    len=16;
     if (command.compare("GETSTATS")==0) {
       //singleton_pLogger->log(LogStream::EnMessage,"deployCppService","getStats","retrieving stats",0);
       singleton_pMonitor->writeStatistics(cs);
@@ -869,6 +886,11 @@
       singleton_pLogger->log(uima::LogStream::EnMessage,"deployCppService", "RESET",
         "reset JMX statistics",0);
       singleton_pMonitor->reset();
+    } else if (command.compare("QUIESCEANDSTOP")==0) {
+      singleton_pLogger->log(uima::LogStream::EnMessage,"deployCppService", "QUIESCEANDSTOP",
+        "quiesce and shutdown",0);
+      singleton_pMonitor->setQuiesceAndStop();
+      break;
     } else if (command.compare("SHUTDOWN")==0) {
       singleton_pMonitor->shutdown();
       break;
@@ -893,6 +915,30 @@
   return NULL;
 }
 
+
+static void* APR_THREAD_FUNC readstdin(apr_thread_t *thd, void *data) {
+  
+  printf ("Enter 'q' to quiesce and stop or 's' to stop:\n ");
+  char str[2];
+  str[0] = '\n';
+
+  
+  while( str[0] == '\n' ) {
+    scanf("%s", str);
+
+    if (str[0] == 's') {
+      singleton_pMonitor->shutdown();
+    } else if (str[0] == 'q') {
+      singleton_pMonitor->setQuiesceAndStop();
+    } else {
+       apr_sleep(1000000);
+       str[0] = '\n';
+       printf ("Enter 'q' to quiesce and stop or 's' to stop:\n ");
+    }
+  }
+  return 0;
+}
+
 #endif