You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2009/10/16 17:26:02 UTC
svn commit: r825936 - /incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp
Author: eae
Date: Fri Oct 16 15:26:01 2009
New Revision: 825936
URL: http://svn.apache.org/viewvc?rev=825936&view=rev
Log:
UIMA-1620 commit Bhavani's uimacpp-1620.patch
Modified:
incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp
Modified: incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp
URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp?rev=825936&r1=825935&r2=825936&view=diff
==============================================================================
--- incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp (original)
+++ incubator/uima/uimacpp/trunk/src/utils/runAECpp.cpp Fri Oct 16 15:26:01 2009
@@ -44,7 +44,8 @@
#include "uima/xmiwriter.hpp"
#include "uima/xcasdeserializer.hpp"
#include "uima/xmideserializer.hpp"
-
+#include <apr_portable.h>
+#include <apr_thread_proc.h>
using namespace uima;
@@ -58,9 +59,26 @@
const AnalysisEngine & crEngine);
static void tafCheckError(ErrorInfo const &);
+void processInputFiles(AnalysisEngine * pEngine);
+
+// input file or directory
+ std::string in;
+// output directory
+ std::string out;
+//AE descriptor filename
+ const char* cnfg = NULL;
// sofa to use for creating a tcas
bool useSofa, lenient;
const char* sofaName;
+//logging level
+int loglevel;
+//num iterations over input data.
+int numruns;
+//num annotator instances
+int numinstances;
+//randomize input and delay between call to process.
+bool randomize;
+long rdelay;
// input data types
enum dataFormats { textFormat, xcasFormat, xmiFormat };
@@ -80,12 +98,30 @@
cerr << " -lenient For -xmi, ignore unknown types & features" << endl;
cerr << " -s Sofa Name of a Sofa to process (input must be an XCAS or XMI)" << endl;
cerr << " -l logLevel Set to 0, 1, or 2 for Message, Warning, or Error" << endl;
+ cerr << " -n numInstances number of annotator instances each running in a separate thread" << endl;
+ cerr << " -r numRuns number of iterations over the same input." << endl;
+ cerr << " -rand randomize the selection on next input to process." << endl;
+ cerr << " -rdelay Max add random delay between 0 and Max milliseconds between calls to process." << endl;
+
}
-int main(int argc, char * argv[]) {
- int loglevel = -1;
+//=================================================================
+//
+//Annotator instance threads that process input.
+//-----------------------------------------------------------------
+ static void* APR_THREAD_FUNC process(apr_thread_t *thd, void *data) {
+ cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing... starting thread." << endl;
+ processInputFiles((AnalysisEngine *) data);
+ apr_thread_exit(thd, APR_SUCCESS);
+ return NULL;
+ }
+
+
+int main(int argc, char * argv[]) {
+
+
try {
/* Access the command line arguments to get the name of the input text. */
@@ -95,14 +131,16 @@
return 1;
}
useSofa = false;
- lenient = false;
+ lenient = false;
xcasInput = textFormat;
- /* input/output dir arg */
- std::string in;
- std::string out;
- std::string sofa;
- std::string pattern("*");
- const char* cnfg = NULL;
+ //std::string sofa;
+ //std::string pattern("*");
+ cnfg = NULL;
+ loglevel = -1;
+ numinstances = 1;
+ numruns = 1;
+ randomize = false;
+ rdelay = 0;
int index = 0;
while (++index < argc) {
@@ -132,6 +170,37 @@
return 1;
}
}
+ } else if (0 == strcmp(arg, "-n")) {
+ if ( ++index < argc ) {
+ numinstances = atoi(argv[index]);
+ if (numinstances < 1) {
+ cerr << "NumInstances less than minimum value 1 "<< endl;
+ return 1;
+ }
+ if (out.length() > 0) {
+ cerr << "Output directory may not be specified when NumInstances is more than 1." << endl;
+ return 1;
+ }
+ }
+ } else if (0 == strcmp(arg, "-rand")) {
+ randomize = true;
+ } else if (0 == strcmp(arg, "-r")) {
+ if ( ++index < argc ) {
+ numruns = atoi(argv[index]);
+ if (numruns < 1) {
+ cerr << "Number of runs less than minimum value 1 "<< endl;
+ return 1;
+ }
+ }
+ } else if (0 == strcmp(arg, "-rdelay")) {
+ if ( ++index < argc ) {
+ rdelay = atol(argv[index]);
+ rdelay = rdelay; //convert to microsec
+ if (rdelay < 1) {
+ cerr << "Random delay in millis less than minimum value of 1 "<< endl;
+ return 1;
+ }
+ }
} else { //one of the standard params - whichever we haven't read yet
if (cnfg == NULL) {
cnfg = arg;
@@ -139,6 +208,10 @@
in.append(arg);
} else if (out.length() == 0) {
out.append(arg);
+ if (numinstances > 1) {
+ cerr << "Output directory may not be specified when NumInstances is more than 1." << endl;
+ return 1;
+ }
}
}
} //while
@@ -166,51 +239,77 @@
if (loglevel >= 0) {
ResourceManager::getInstance().setLoggingLevel((LogStream::EnEntryType)loglevel);
}
+ ErrorInfo errorInfo;
+ if (numinstances == 1) {
+ AnalysisEngine * pEngine = Framework::createAnalysisEngine(cnfg, errorInfo);
+ if (errorInfo.getErrorId() != UIMA_ERR_NONE) {
+ cerr << "runAECpp:" << endl
+ << " Error string : "
+ << AnalysisEngine::getErrorIdAsCString(errorInfo.getErrorId()) << endl
+ << " UIMACPP Error info:" << endl
+ << errorInfo << endl;
+ exit((int)errorInfo.getErrorId());
+ }
+ processInputFiles(pEngine);
+ } else {
+ apr_status_t rv = 0;
- TyErrorId utErrorId; // Variable to store UIMACPP return codes
- ErrorInfo errorInfo; // Variable to stored detailed error info
- /* Initialize engine with filename of config-file */
- AnalysisEngine * pEngine =
- Framework::createAnalysisEngine(cnfg, errorInfo);
- tafCheckError(errorInfo);
-
- /* Get a new CAS */
- CAS* cas = pEngine->newCAS();
+ //APR pool
+ apr_pool_t *pool;
+ rv = apr_pool_create(&pool, NULL);
+ if (rv != APR_SUCCESS) {
+ cerr << "ERROR: apr_pool_create() failed. " << endl;
+ return -1;
+ }
- /* process input */
- util::DirectoryWalk dirwalker(in.c_str());
- if (dirwalker.isValid()) {
- cout << "runAECpp::processing all files in directory: " << in.c_str() << endl;
- util::Filename infile(in.c_str(),"FilenamePlaceHolder");
- while (dirwalker.isValid()) {
- // Process all files or just the ones with matching suffix
- if ( dirwalker.isFile() ) {
- infile.setNewName(dirwalker.getNameWithoutPath());
- std::string afile(infile.getAsCString());
+
+ /*create as many AnalysisEngine instances as specified
+ by numinstances. */
+ vector<AnalysisEngine *> analysisEngines;
+
+ for (int i=0; i < numinstances; i++) {
+ AnalysisEngine * pEngine = Framework::createAnalysisEngine(cnfg, errorInfo);
+ if (errorInfo.getErrorId() != UIMA_ERR_NONE) {
+ cerr << "runAECpp:" << endl
+ << " Error string : "
+ << AnalysisEngine::getErrorIdAsCString(errorInfo.getErrorId()) << endl
+ << " UIMACPP Error info:" << endl
+ << errorInfo << endl;
+ exit((int)errorInfo.getErrorId());
+ }
+ analysisEngines.push_back(pEngine);
+ }
+ cerr << "Initialized AnalysisEngine " << endl;
- //process the cas
- process(pEngine,cas,afile, out);
- //reset the cas
- cas->reset();
- }
- //get the next input file in the directory
- dirwalker.setToNext();
+ /* create and start the processing threads */
+ apr_threadattr_t * thd_attr=0;
+ rv = apr_threadattr_create(&thd_attr, pool);
+ assert(rv == APR_SUCCESS);
+
+ vector<apr_thread_t *> processingThreads;
+ for (int i=0; i < numinstances; i++) {
+ apr_thread_t *thread=0;
+ rv = apr_thread_create(&thread, thd_attr, process, analysisEngines.at(i), pool);
+ assert(rv == APR_SUCCESS);
+ processingThreads.push_back(thread);
+ apr_sleep(10000); //required so that time function to distinctly seed randomizer.
}
- } else {
- //process the cas
- process(pEngine,cas, in, out);
- }
- /* call collectionProcessComplete */
- utErrorId = pEngine->collectionProcessComplete();
- /* Free annotator */
- utErrorId = pEngine->destroy();
+ cerr << "Wait for processing threads to finish " << endl;
+ /* wait for threads to end */
+ for (size_t i=0; i < processingThreads.size(); i++) {
+ //cout << "runAECpp: wait for thread " << i << " to end " << endl;
+ apr_thread_join(&rv, processingThreads.at(i));
+ }
- delete cas;
- delete pEngine;
+ if (pool) {
+ apr_pool_destroy(pool);
+ pool=0;
+ }
+ }
} catch (Exception e) {
- cout << "runAECpp " << e << endl;
+ cout << "runAECpp: " << e << endl;
}
/* If we got this far everything went OK */
cout << "runAECpp: processing finished sucessfully! " << endl;
@@ -253,7 +352,7 @@
}
void process (AnalysisEngine * pEngine, CAS * cas, std::string in, std::string outfn) {
- cout << endl << "runAECpp::processing " << in << endl;
+ cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing " << in << " " << out << endl;
try {
if (xcasInput != textFormat) {
/* initialize from an xcas or xmicas */
@@ -262,7 +361,7 @@
LocalFileInputSource fileIS(native);
XMLString::release(&native);
if (xcasInput == xcasFormat) {
- XCASDeserializer::deserialize(fileIS, *cas);
+ XCASDeserializer::deserialize(fileIS, *cas);
}
else {
XmiDeserializer::deserialize(fileIS, *cas, lenient);
@@ -326,7 +425,7 @@
//release the CAS
pEngine->getAnnotatorContext().releaseCAS(outCas);
- cout << "new Cas " << i << endl;
+ cout << "runAECpp::processing new Cas " << i << endl;
}
} else {
@@ -345,7 +444,7 @@
//release CAS
pEngine->getAnnotatorContext().releaseCAS(outCas);
- cout << "new Cas " << i << endl;
+ cout << "runAECpp::processing new Cas " << i << endl;
}
}
@@ -365,7 +464,7 @@
}
//serialize the input cas
- cout << "runAECpp: write out xmi " << outfn << endl;
+ cout << "runAECpp::processing write out xmi " << outfn << endl;
XmiWriter writer(*cas, true);
writer.write(file);
file.close();
@@ -399,12 +498,130 @@
}
//serialize the cas
- cout << "write out xmi " << ofn << endl;
+ cout << "runAECpp::processing write out xmi " << ofn << endl;
XmiWriter writer(outCas, true);
writer.write(file);
file.close();
}
+void processInputFiles(AnalysisEngine * pEngine) {
+ TyErrorId utErrorId; // Variable to store UIMACPP return codes
+ ErrorInfo errorInfo; // Variable to stored detailed error info
+
+ int count = 0;
+
+ stringstream str;
+ str << endl << "ThreadId: " << apr_os_thread_current();
+ str << " runAECpp: Processing started. Number of runs " << numruns
+ << " rdelay " << rdelay << " millis. " ;
+ if (randomize)
+ str << " Inputs processed in random order. ";
+
+ cout << str.str() << endl;
+ //uima::ResourceManager::getInstance().getLogger().logMessage(str.str() + " started: " );
+
+ /* Get a new CAS */
+ CAS* cas = pEngine->newCAS();
+ if (cas == NULL) {
+ cerr << "runAECpp: pEngine->newCAS() failed." << endl;
+ exit (1);
+ }
+
+ /* initialize random seed: */
+ srand( time(NULL) + (apr_time_now() % 10000) );
+
+ for (int i=0; i < numruns; i++) {
+ stringstream str;
+ cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing start iteration: " << i << endl;
+ //uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
+ /* process input */
+ util::DirectoryWalk dirwalker(in.c_str());
+ if (dirwalker.isValid()) {
+ cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing all files in directory: " << in.c_str() << endl;
+ util::Filename infile(in.c_str(),"FilenamePlaceHolder");
+ if (!randomize) {
+ while (dirwalker.isValid()) {
+ // Process all files or just the ones with matching suffix
+ if ( dirwalker.isFile() ) {
+ infile.setNewName(dirwalker.getNameWithoutPath());
+ std::string afile(infile.getAsCString());
+
+ stringstream str;
+ if (count % 100 == 0 && count > 0) {
+ str << apr_time_now() << " ThreadId: " << apr_os_thread_current() << " numProcessed=" << count;
+ cerr << str.str() << endl;
+ uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
+ }
+ //process the cas
+ process(pEngine,cas,afile, out);
+
+ //reset the cas
+ cas->reset();
+ if (rdelay > 0) {
+ int howlong = rand() % rdelay;
+ cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing sleep for " << howlong << " millis " << endl;
+ apr_sleep(howlong*1000);
+ }
+ count++;
+ }
+ //get the next input file in the directory
+ dirwalker.setToNext();
+ }
+ } else {
+ //construct a list of the input files.
+ vector<std::string> filenames;
+ while (dirwalker.isValid()) {
+ // Process all files or just the ones with matching suffix
+ if ( dirwalker.isFile() ) {
+ infile.setNewName(dirwalker.getNameWithoutPath());
+ filenames.push_back(infile.getAsCString());
+ }
+ //get the next input file in the directory
+ dirwalker.setToNext();
+ }
+
+ //how many to process in this run.
+ int num = filenames.size();
+ for (int i=0; i < num; i++) {
+ //select next file to be processed.
+ int index = rand() % filenames.size(); //number between 1 and number of files
+
+ stringstream str;
+ if (count % 100 == 0 && count > 0) {
+ str << apr_time_now() << " ThreadId: " << apr_os_thread_current() << " runAECpp::processing numProcessed=" << count;
+ cerr << str.str() << endl;
+ uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
+ }
+
+ string afile = filenames.at(index);
+ //cout << "ThreadId: " << apr_os_thread_current() << "runAECpp::processing file " << index << " " << afile << endl;
+ //process
+ process(pEngine, cas, afile, out);
+ cas->reset();
+
+ //sleep for time specified by rdelay
+ if (rdelay > 0) {
+ int howlong = rand() % rdelay;
+ cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing sleep for " << howlong << " millis " << endl;
+ apr_sleep(howlong*1000);
+ }
+ count++;
+ }
+ }
+ } else {
+ //process the cas
+ process(pEngine,cas, in, out);
+ }
+ /* call collectionProcessComplete */
+ utErrorId = pEngine->collectionProcessComplete();
+ }
+ /* Free annotator */
+ utErrorId = pEngine->destroy();
+
+ delete cas;
+ delete pEngine;
+ cout << "ThreadId: " << apr_os_thread_current() << " runAECpp finished processing." << endl;
+}
/* <EOF> */