You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/18 20:59:12 UTC
svn commit: r1646524 - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/
uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/
Author: cwiklik
Date: Thu Dec 18 19:59:12 2014
New Revision: 1646524
URL: http://svn.apache.org/r1646524
Log:
UIMA-4060 Added more logging. Added placeholder for error handler for process()
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1646524&r1=1646523&r2=1646524&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Thu Dec 18 19:59:12 2014
@@ -303,7 +303,7 @@ public class DuccHttpClient {
logger.error("execute", null, "Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
throw new RuntimeException("JP Http Client Unable to Communicate with JD - Error:"+postMethod.getStatusLine());
}
- logger.info("execute", null, "JD Reply Status:"+postMethod.getStatusLine());
+ logger.info("execute", null, "Thread:"+Thread.currentThread().getId()+" JD Reply Status:"+postMethod.getStatusLine());
Object o = XStreamUtils.unmarshall(responseData);
if ( o instanceof IMetaCasTransaction) {
reply = (MetaCasTransaction)o;
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1646524&r1=1646523&r2=1646524&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Thu Dec 18 19:59:12 2014
@@ -256,6 +256,8 @@ public class HttpWorkerThread implements
}
public void run() {
try {
+ logger.info("HttpWorkerThread.run()", null, "Starting JP Process Thread Id:"+Thread.currentThread().getId());
+
initialize(duccComponent.isUimaASJob());
// each thread needs its own PostMethod
PostMethod postMethod = new PostMethod(httpClient.getJdUrl());
@@ -264,6 +266,8 @@ public class HttpWorkerThread implements
// SMContext ctx = new SMContextImpl(httpClient, States.Start);
String command="";
// run forever (or until the process throws IllegalStateException
+ logger.info("HttpWorkerThread.run()", null, "Processing Work Items - Thread Id:"+Thread.currentThread().getId());
+
while (duccComponent.isRunning()) { //service.running && ctx.state().process(ctx)) {
try {
@@ -276,7 +280,9 @@ public class HttpWorkerThread implements
command = Type.Get.name();
// transaction = httpClient.post(transaction);
transaction = httpClient.execute(transaction, postMethod);
- // Confirm receipt of the CAS.
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey());
+
+ // Confirm receipt of the CAS.
transaction.setType(Type.Ack);
command = Type.Ack.name();
/// httpClient.post(transaction); // Ready to process
@@ -306,26 +312,31 @@ public class HttpWorkerThread implements
}
}
} else {
- logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd New WI:"+transaction.getMetaCas().getSystemKey());
- System.out.println("Thread:"+Thread.currentThread().getId()+" Recv'd New WI:"+transaction.getMetaCas().getSystemKey());
+ //System.out.println("Thread:"+Thread.currentThread().getId()+" Recv'd New WI:"+transaction.getMetaCas().getSystemKey());
// process the CAS
- @SuppressWarnings("unchecked")
- List<Properties> metrics = (List<Properties>)
- uimaProcessor.process(transaction.getMetaCas().getUserSpaceCas());
- logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
-
- IPerformanceMetrics metricsWrapper =
- new PerformanceMetrics();
- metricsWrapper.set(metrics);
-
- transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
+ try {
+ @SuppressWarnings("unchecked")
+ List<Properties> metrics =
+ (List<Properties>)
+ uimaProcessor.process(transaction.getMetaCas().getUserSpaceCas());
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
+ IPerformanceMetrics metricsWrapper =
+ new PerformanceMetrics();
+ metricsWrapper.set(metrics);
+
+ transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
+
+ } catch( Exception ee) {
+ transaction.getMetaCas().setUserSpaceException("Exception");
+ logger.error("run", null, ee);
+ }
transaction.getMetaCas().setUserSpaceCas(null);
transaction.setType(Type.End);
command = Type.End.name();
// httpClient.post(transaction); // Work Item Processed - End
httpClient.execute(transaction, postMethod); // Work Item Processed - End
- logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" sent END");
+ logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" sent END for WI:"+transaction.getMetaCas().getSystemKey());
}
} catch( SocketTimeoutException e) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1646524&r1=1646523&r2=1646524&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Thu Dec 18 19:59:12 2014
@@ -115,6 +115,7 @@ public class JobProcessComponent extends
* the components and returns.
*/
public void start(DuccService service, String[] args) throws Exception {
+ getLogger().info("start", null,"Ducc UIMA-AS Version:"+UimaAsVersion.getFullVersionString());
super.start(service, args);
try {
@@ -145,21 +146,25 @@ public class JobProcessComponent extends
// existence of -DDucc.Job.Type
String jobType = System.getProperty(FlagsHelper.Name.JpType.pname());
containerClass = configuration.getUserContainerClassForJob(jobType);
- String[] uimaAsArgs;
+ String[] jpArgs;
if ( "uima-as".equals(jobType)) {
uimaASJob = true;
// dd - deployment descriptor. Will use UIMA-AS
- uimaAsArgs = new String[] { "-dd",args[0],"-saxonURL",saxonJarPath,
+ jpArgs = new String[] { "-dd",args[0],"-saxonURL",saxonJarPath,
"-xslt",dd2SpringXslPath};
- } else {
+ } else if ( "uima".equals(jobType)) {
String scaleout = System.getProperty(FlagsHelper.Name.JpThreadCount.pname());
if ( scaleout == null ) {
scaleout = "1";
}
// aed - analysis engine descriptor. Will use UIMA core only
- uimaAsArgs = new String[] { "-aed",args[0], "-t", scaleout};
+ jpArgs = new String[] { "-aed",args[0], "-t", scaleout};
+ } else if ( "user".equals(jobType)) {
+ jpArgs = args;
+ } else {
+ throw new RuntimeException("Unsupported JP deployment mode. Check a value provided for -D"+FlagsHelper.Name.JpType.pname()+". Supported modes: [uima-as|uima|user]");
}
- getLogger().info("start", null,"Ducc UIMA-AS Version:"+UimaAsVersion.getFullVersionString());
+ getLogger().info("start", null,"Ducc JP JobType="+jobType);
final DuccHttpClient client = new DuccHttpClient();
String jdURL="";
try {
@@ -185,10 +190,10 @@ public class JobProcessComponent extends
throw ee;
}
- // Deploy UIMA pipelines. This blocks until the pipelines initialize or
+ // Deploy UIMA pipelines. This blocks until the pipelines initializes or
// there is an exception. The IUimaProcessor is a wrapper around
// processing container where the analysis is being done.
- int scaleout = jobProcessManager.initialize(jps, uimaAsArgs, containerClass);
+ int scaleout = jobProcessManager.initialize(jps, jpArgs, containerClass);
// uimaProcessor = jobProcessManager.deploy(jps, uimaAsArgs, containerClass);
// Setup Thread Factory
@@ -210,7 +215,7 @@ public class JobProcessComponent extends
agent.notify(currentState, processJmxUrl);
System.out.println("JMX Connect String:"+ processJmxUrl);
// Create thread pool and begin processing
- getLogger().info("start", null, "Starting "+scaleout+" Process Threads");
+ getLogger().info("start", null, "Starting "+scaleout+" Process Threads - JMX Connect String:"+ processJmxUrl);
// Create and start worker threads that pull Work Items from the JD
Future<?>[] threadHandles = new Future<?>[scaleout];
@@ -219,6 +224,8 @@ public class JobProcessComponent extends
for (int j = 0; j < scaleout; j++) {
threadHandles[j] = tpe.submit(new HttpWorkerThread(this, client, jobProcessManager, workerThreadCount));
}
+ getLogger().info("start", null, "All Http Worker Threads Started - Waiting For All Threads to Exit");
+
for( Future<?> f : threadHandles ) {
if ( f != null ) {
f.get(); // wait for worker threads to exit
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1646524&r1=1646523&r2=1646524&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Thu Dec 18 19:59:12 2014
@@ -135,7 +135,7 @@ public class UimaProcessContainer implem
// core. The sharedInitSemaphore is a static and is shared by all
// instances
// of this class.
- System.out.println("Available Permits:"+sharedInitSemaphore.availablePermits());
+// System.out.println("Available Permits:"+sharedInitSemaphore.availablePermits());
sharedInitSemaphore.acquire();
// Parse the descriptor in the calling thread.
rSpecifier = UimaUtils.getResourceSpecifier(analysisEngineDescriptor);