You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2009/10/16 17:26:02 UTC

svn commit: r825936 - /incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp

Author: eae
Date: Fri Oct 16 15:26:01 2009
New Revision: 825936

URL: http://svn.apache.org/viewvc?rev=825936&view=rev
Log:
UIMA-1620 commit Bhavani's uimacpp-1620.patch

Modified:
    incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp

Modified: incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp?rev=825936&r1=825935&r2=825936&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp Fri Oct 16 15:26:01 2009
@@ -44,7 +44,8 @@
 #include "uima/xmiwriter.hpp"
 #include "uima/xcasdeserializer.hpp"
 #include "uima/xmideserializer.hpp"
-
+#include <apr_portable.h>
+#include <apr_thread_proc.h>
 using namespace uima;
 
 
@@ -58,9 +59,26 @@
                           const AnalysisEngine &  crEngine);
 static void tafCheckError(ErrorInfo const &);
 
+void processInputFiles(AnalysisEngine * pEngine);
+
+// input file or directory
+ std::string in;
+// output directory
+ std::string out;
+//AE descriptor filename
+ const char* cnfg = NULL;
 // sofa to use for creating a tcas
 bool useSofa, lenient;
 const char* sofaName;
+//logging level
+int loglevel;
+//num iterations over input data.
+int numruns;
+//num annotator instances
+int numinstances;
+//randomize input and delay between call to process.
+bool randomize;
+long rdelay;
 
 // input data types
 enum dataFormats { textFormat, xcasFormat, xmiFormat };
@@ -80,12 +98,30 @@
   cerr << "   -lenient      For -xmi, ignore unknown types & features" << endl;
   cerr << "   -s Sofa       Name of a Sofa to process (input must be an XCAS or XMI)" << endl;
   cerr << "   -l logLevel   Set to 0, 1, or 2 for Message, Warning, or Error" << endl;
+  cerr << "   -n numInstances number of annotator instances each running in a separate thread" << endl;
+  cerr << "   -r numRuns    number of iterations over the same input." << endl;
+  cerr << "   -rand         randomize the selection on next input to process." << endl;
+  cerr << "   -rdelay Max   add random delay between 0 and Max milliseconds between calls to process." << endl;
+
 }
 
-int main(int argc, char * argv[]) {
 
-  int loglevel = -1;
 
+//=================================================================
+//
+//Annotator instance threads that process input.
+//-----------------------------------------------------------------
+ static void* APR_THREAD_FUNC process(apr_thread_t *thd, void *data) {
+   cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing... starting thread." << endl;
+   processInputFiles((AnalysisEngine *) data);
+   apr_thread_exit(thd, APR_SUCCESS);
+   return NULL;
+ }
+
+
+int main(int argc, char * argv[]) {
+
+ 
   try {
 
     /* Access the command line arguments to get the name of the input text. */
@@ -95,14 +131,16 @@
       return 1;
     }
     useSofa = false;
-	lenient = false;
+    lenient = false;
     xcasInput = textFormat;
-    /* input/output dir arg */
-    std::string in;
-    std::string out;
-    std::string sofa;
-    std::string pattern("*");
-    const char* cnfg = NULL;
+    //std::string sofa;
+    //std::string pattern("*");
+    cnfg = NULL;
+    loglevel = -1;
+    numinstances = 1;
+    numruns = 1;
+    randomize = false;
+    rdelay = 0;
 
     int index = 0;
     while (++index < argc) {
@@ -132,6 +170,37 @@
             return 1;
           }
         }
+      } else if (0 == strcmp(arg, "-n")) {
+        if ( ++index < argc ) {
+          numinstances = atoi(argv[index]);
+          if (numinstances < 1) {
+            cerr << "NumInstances less than minimum value 1 "<< endl;
+            return 1;
+          }
+          if (out.length() > 0) {
+            cerr << "Output directory may not be specified when NumInstances is more than 1." << endl;
+            return 1;         
+          }
+        }
+      } else if (0 == strcmp(arg, "-rand")) {
+        randomize = true;
+      } else if (0 == strcmp(arg, "-r")) {
+        if ( ++index < argc ) {
+          numruns = atoi(argv[index]);
+          if (numruns < 1) {
+            cerr << "Number of runs less than minimum value 1 "<< endl;
+            return 1;
+          }
+        }
+      } else if (0 == strcmp(arg, "-rdelay")) {
+        if ( ++index < argc ) {
+          rdelay = atol(argv[index]);
+          rdelay = rdelay; //convert to microsec
+          if (rdelay < 1) {
+            cerr << "Random delay in millis less than minimum value of 1 "<< endl;
+            return 1;
+          }
+        }
       } else { //one of the standard params - whichever we haven't read yet
         if (cnfg == NULL) {
           cnfg = arg;
@@ -139,6 +208,10 @@
           in.append(arg);
         } else if (out.length() == 0) {
           out.append(arg);
+          if (numinstances > 1) {
+            cerr << "Output directory may not be specified when NumInstances is more than 1." << endl;
+            return 1;
+          }
         }
       }
     } //while
@@ -166,51 +239,77 @@
     if (loglevel >= 0) {
       ResourceManager::getInstance().setLoggingLevel((LogStream::EnEntryType)loglevel);
     }
+    ErrorInfo errorInfo;
+    if (numinstances ==  1) { 
+      AnalysisEngine * pEngine = Framework::createAnalysisEngine(cnfg, errorInfo);
+      if (errorInfo.getErrorId() != UIMA_ERR_NONE) {
+        cerr << "runAECpp:" << endl
+        << "  Error string  : "
+        << AnalysisEngine::getErrorIdAsCString(errorInfo.getErrorId()) << endl
+        << "  UIMACPP Error info:" << endl
+        << errorInfo << endl;
+        exit((int)errorInfo.getErrorId());
+      }
+      processInputFiles(pEngine);
+    } else {
+      apr_status_t rv = 0;
 
-    TyErrorId utErrorId;          // Variable to store UIMACPP return codes
-    ErrorInfo errorInfo;          // Variable to stored detailed error info
-    /* Initialize engine with filename of config-file */
-    AnalysisEngine * pEngine =
-      Framework::createAnalysisEngine(cnfg, errorInfo);
-    tafCheckError(errorInfo);
-
-    /* Get a new CAS */
-    CAS* cas = pEngine->newCAS();
+      //APR pool
+      apr_pool_t *pool;
+      rv = apr_pool_create(&pool, NULL);
+      if (rv != APR_SUCCESS) {
+        cerr << "ERROR: apr_pool_create() failed. " << endl;
+        return -1;
+      }
 
-    /* process input */
-    util::DirectoryWalk dirwalker(in.c_str());
-    if (dirwalker.isValid()) {
-      cout << "runAECpp::processing all files in directory: " << in.c_str() << endl;
-      util::Filename infile(in.c_str(),"FilenamePlaceHolder");
-      while (dirwalker.isValid()) {
-        // Process all files or just the ones with matching suffix
-        if ( dirwalker.isFile() ) {
-          infile.setNewName(dirwalker.getNameWithoutPath());
-          std::string afile(infile.getAsCString());
+   
+      /*create as many AnalysisEngine instances as specified 
+       by numinstances. */
+      vector<AnalysisEngine *> analysisEngines;  
+
+      for (int i=0; i < numinstances; i++) {
+        AnalysisEngine * pEngine = Framework::createAnalysisEngine(cnfg, errorInfo);
+        if (errorInfo.getErrorId() != UIMA_ERR_NONE) {
+          cerr << "runAECpp:" << endl
+          << "  Error string  : "
+          << AnalysisEngine::getErrorIdAsCString(errorInfo.getErrorId()) << endl
+          << "  UIMACPP Error info:" << endl
+          << errorInfo << endl;
+          exit((int)errorInfo.getErrorId());
+        }
+        analysisEngines.push_back(pEngine);
+      }
+      cerr << "Initialized AnalysisEngine " << endl;
 
-          //process the cas
-          process(pEngine,cas,afile, out);
 
-          //reset the cas
-          cas->reset();
-        }
-        //get the next input file in the directory
-        dirwalker.setToNext();
+      /* create and start the processing threads */
+      apr_threadattr_t * thd_attr=0;
+      rv = apr_threadattr_create(&thd_attr, pool);
+      assert(rv == APR_SUCCESS);
+
+      vector<apr_thread_t *> processingThreads;
+      for (int i=0; i < numinstances; i++) {
+        apr_thread_t *thread=0;
+        rv = apr_thread_create(&thread, thd_attr, process, analysisEngines.at(i), pool);
+        assert(rv == APR_SUCCESS);
+        processingThreads.push_back(thread);
+        apr_sleep(10000); //required so that time function to distinctly seed randomizer.
       }
-    } else {
-      //process the cas
-      process(pEngine,cas, in, out);
-    }
-    /* call collectionProcessComplete */
-    utErrorId = pEngine->collectionProcessComplete();
 
-    /* Free annotator */
-    utErrorId = pEngine->destroy();
+      cerr << "Wait for processing threads to finish " << endl;
+      /* wait for threads to end */
+      for (size_t i=0; i < processingThreads.size(); i++) {
+        //cout << "runAECpp: wait for thread " << i << " to end " << endl;
+        apr_thread_join(&rv, processingThreads.at(i));
+      }
 
-    delete cas;
-    delete pEngine;
+      if (pool) {
+        apr_pool_destroy(pool);
+        pool=0;
+      }
+    }
   } catch (Exception e) {
-    cout << "runAECpp " << e << endl;
+    cout << "runAECpp: " << e << endl;
   }
   /* If we got this far everything went OK */
   cout << "runAECpp: processing finished sucessfully! " << endl;
@@ -253,7 +352,7 @@
 }
 
 void process (AnalysisEngine * pEngine, CAS * cas, std::string in, std::string outfn) {
-  cout << endl << "runAECpp::processing " << in << endl;
+  cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing " << in << " " << out << endl;
   try {
     if (xcasInput != textFormat) {
       /* initialize from an xcas or xmicas */
@@ -262,7 +361,7 @@
 	  LocalFileInputSource fileIS(native);
 	  XMLString::release(&native);
 	  if (xcasInput == xcasFormat) {
-		XCASDeserializer::deserialize(fileIS, *cas);
+	    XCASDeserializer::deserialize(fileIS, *cas);
 	  }
 	  else {
 		XmiDeserializer::deserialize(fileIS, *cas, lenient);
@@ -326,7 +425,7 @@
         //release the CAS
         pEngine->getAnnotatorContext().releaseCAS(outCas);
 
-        cout << "new Cas " << i << endl;
+        cout << "runAECpp::processing new Cas " << i << endl;
       }
 
     } else {
@@ -345,7 +444,7 @@
         //release CAS
         pEngine->getAnnotatorContext().releaseCAS(outCas);
 
-        cout << "new Cas " << i << endl;
+        cout << "runAECpp::processing new Cas " << i << endl;
       }
 
     }
@@ -365,7 +464,7 @@
       }
 
       //serialize the input cas
-      cout << "runAECpp: write out xmi " << outfn << endl;
+      cout << "runAECpp::processing write out xmi " << outfn << endl;
       XmiWriter writer(*cas, true);
       writer.write(file);
       file.close();
@@ -399,12 +498,130 @@
   }
 
   //serialize the cas
-  cout << "write out xmi " << ofn << endl;
+  cout << "runAECpp::processing write out xmi " << ofn << endl;
   XmiWriter writer(outCas, true);
   writer.write(file);
   file.close();
 }
 
+void processInputFiles(AnalysisEngine * pEngine) {
+    TyErrorId utErrorId;          // Variable to store UIMACPP return codes
+    ErrorInfo errorInfo;          // Variable to stored detailed error info
+
+    int count = 0;
+
+    stringstream str;
+    str << endl << "ThreadId: " << apr_os_thread_current();
+    str << " runAECpp: Processing started. Number of runs " << numruns 
+                << " rdelay " << rdelay << " millis. " ;
+    if (randomize)
+        str << " Inputs processed in random order. ";
+    
+    cout << str.str() << endl;
+    //uima::ResourceManager::getInstance().getLogger().logMessage(str.str() + " started: " );
+
+    /* Get a new CAS */
+    CAS* cas = pEngine->newCAS();
+    if (cas == NULL) {
+      cerr << "runAECpp: pEngine->newCAS() failed." << endl;
+      exit (1);
+    }
+
+    /* initialize random seed: */
+    srand( time(NULL) + (apr_time_now() % 10000) );
+
+    for (int i=0; i < numruns; i++) {
+      stringstream str;
+      cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing start iteration: " << i << endl;
+      //uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
+      /* process input */
+      util::DirectoryWalk dirwalker(in.c_str());
+      if (dirwalker.isValid()) {
+        cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing all files in directory: " << in.c_str() << endl;
+        util::Filename infile(in.c_str(),"FilenamePlaceHolder");
+        if (!randomize) { 
+          while (dirwalker.isValid()) {
+          // Process all files or just the ones with matching suffix
+            if ( dirwalker.isFile() ) {
+              infile.setNewName(dirwalker.getNameWithoutPath());
+              std::string afile(infile.getAsCString());
+            
+              stringstream str;
+              if (count % 100 == 0 && count > 0)  {
+                str << apr_time_now() << " ThreadId: " << apr_os_thread_current() <<  " numProcessed=" << count;
+                cerr << str.str() << endl;  
+                uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
+              }
+              //process the cas
+              process(pEngine,cas,afile, out);
+
+              //reset the cas
+              cas->reset();
+              if (rdelay > 0) {
+                int howlong = rand() % rdelay;
+                cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing sleep for " << howlong << " millis " << endl;
+                apr_sleep(howlong*1000);
+              }
+              count++;
+            }
+            //get the next input file in the directory
+            dirwalker.setToNext();
+          }
+        } else {
+          //construct a list of the input files.
+          vector<std::string> filenames;
+          while (dirwalker.isValid()) {
+             // Process all files or just the ones with matching suffix
+             if ( dirwalker.isFile() ) {
+              infile.setNewName(dirwalker.getNameWithoutPath());
+              filenames.push_back(infile.getAsCString());
+             }
+             //get the next input file in the directory
+             dirwalker.setToNext();
+          } 
+         
+          //how many to process in this run.
+          int num = filenames.size();   
+          for (int i=0; i < num; i++) {
+            //select next file to be processed.   
+            int index =   rand() % filenames.size();  //number between 1 and number of files
+            
+            stringstream str;
+            if (count % 100 == 0 && count > 0)  {
+              str << apr_time_now() << " ThreadId: " << apr_os_thread_current() <<  " runAECpp::processing numProcessed=" << count;
+              cerr << str.str() << endl;  
+              uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
+            }
+
+            string afile = filenames.at(index);
+            //cout << "ThreadId: " << apr_os_thread_current() << "runAECpp::processing file " << index << " " << afile  << endl;
+            //process 
+            process(pEngine, cas, afile, out);
+            cas->reset();
+
+            //sleep for time specified by rdelay
+            if (rdelay > 0) {
+              int howlong = rand() % rdelay;
+              cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing sleep for " << howlong << " millis " << endl;
+              apr_sleep(howlong*1000);
+            }
+            count++;
+          }
+        }
+      } else {
+        //process the cas
+        process(pEngine,cas, in, out);
+      }
+      /* call collectionProcessComplete */
+      utErrorId = pEngine->collectionProcessComplete();
+    }
+    /* Free annotator */
+    utErrorId = pEngine->destroy();
+
+    delete cas;
+    delete pEngine;
+    cout << "ThreadId: " << apr_os_thread_current()  << " runAECpp finished processing." << endl;
+}
 /* <EOF> */