You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2013/07/29 20:18:12 UTC
svn commit: r1508161 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd:
JobDriver.java client/IWorkItemMonitor.java client/WorkItem.java
Author: degenaro
Date: Mon Jul 29 18:18:12 2013
New Revision: 1508161
URL: http://svn.apache.org/r1508161
Log:
UIMA-3125 DUCC job driver (JD) must gracefully handle Throwable from UIMA-AS client sendAndReceive
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java?rev=1508161&r1=1508160&r2=1508161&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java Mon Jul 29 18:18:12 2013
@@ -1398,6 +1398,23 @@ public class JobDriver extends Thread im
return;
}
+ public void error(WorkItem workItem, Throwable t) {
+ String location = "error";
+ try {
+ duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+ duccOut.debug(location, jobid, "action:error "+getThreadLocationInfo(workItem), t);
+ workItemStateManager.error(workItem.getSeqNo());
+ workItemError(workItem, t);
+ remove(workItem);
+ casSource.recycle(workItem.getCAS());
+ accountingWorkItemIsError(workItem.getProcessId());
+ queueCASes(1,queue,workItemFactory);
+ }
+ catch(Exception exception) {
+ duccOut.error(location, jobid, "processing error?", exception);
+ }
+ return;
+ }
private void workItemLost(WorkItem workItem) {
String location = "workItemLost";
@@ -1405,8 +1422,8 @@ public class JobDriver extends Thread im
duccOut.error(location, workItem.getJobId(), "seqNo:"+workItem.getSeqNo());
}
- private void workItemError(WorkItem workItem, Exception e) {
- workItemError(workItem, e, null);
+ private void workItemError(WorkItem workItem, Throwable t) {
+ workItemError(workItem, t, null);
}
/*
@@ -1415,7 +1432,7 @@ public class JobDriver extends Thread im
}
*/
- private void workItemError(WorkItem workItem, Exception e, Directive directive) {
+ private void workItemError(WorkItem workItem, Throwable t, Directive directive) {
String location = "workItemError";
driverStatusReport.countWorkItemsProcessingError();
String nodeId = "?";
@@ -1438,9 +1455,9 @@ public class JobDriver extends Thread im
duccOut.error(location, djid, dpid, message);
duccErr.error(location, djid, dpid, message);
- if(e != null) {
- duccOut.error(location, djid, dpid, e);
- duccErr.error(location, djid, dpid, e);
+ if(t != null) {
+ duccOut.error(location, djid, dpid, t);
+ duccErr.error(location, djid, dpid, t);
}
}
catch(Exception exception) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java?rev=1508161&r1=1508160&r2=1508161&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java Mon Jul 29 18:18:12 2013
@@ -27,5 +27,6 @@ public interface IWorkItemMonitor {
public void dequeued(WorkItem workitem, String node, String pid);
public void ended(WorkItem workitem);
public void exception(WorkItem workitem, Exception e);
+ public void error(WorkItem workitem, Throwable t);
public void lost(WorkItem workItem);
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java?rev=1508161&r1=1508160&r2=1508161&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java Mon Jul 29 18:18:12 2013
@@ -19,6 +19,7 @@
package org.apache.uima.ducc.jd.client;
import java.util.ArrayList;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
@@ -124,24 +125,43 @@ public class WorkItem implements Runnabl
return callbackState;
}
+ protected void injectRandomThrowable() throws Throwable {
+ // < *** TEST ONLY!!! *** >
+ final boolean test = false;
+ if(test) {
+ Random random = new Random();
+ if(random.nextBoolean()) {
+ throw new Throwable("just testing Throwable handler");
+ }
+ }
+ // </ *** TEST ONLY!!! *** >
+ }
+
public void run() {
String methodName = "run";
duccOut.debug(methodName, jobId, duccMsg.fetch("enter"));
try {
- start();
- CAS cas = this.casTuple.getCas();
- duccOut.debug(methodName, jobId, duccMsg.fetchLabel("CAS.size")+cas.size());
- callbackState.statePendingQueued();
- duccOut.debug(methodName, null, "seqNo:"+getSeqNo()+" "+callbackState.getState());
- client.sendAndReceiveCAS(cas, analysisEnginePerformanceMetricsList);
- if(!isLost.get()) {
- ended();
- }
- } catch(Exception e) {
- if(!isLost.get()) {
- exception(e);
+ try {
+ start();
+ CAS cas = this.casTuple.getCas();
+ duccOut.debug(methodName, jobId, duccMsg.fetchLabel("CAS.size")+cas.size());
+ callbackState.statePendingQueued();
+ duccOut.debug(methodName, null, "seqNo:"+getSeqNo()+" "+callbackState.getState());
+ client.sendAndReceiveCAS(cas, analysisEnginePerformanceMetricsList);
+ //injectRandomThrowable();
+ if(!isLost.get()) {
+ ended();
+ }
+ } catch(Exception e) {
+ if(!isLost.get()) {
+ exception(e);
+ }
}
}
+ catch(Throwable t) {
+ error(t);
+ }
+
duccOut.debug(methodName, jobId, duccMsg.fetch("exit"));
}
@@ -173,6 +193,17 @@ public class WorkItem implements Runnabl
}
}
+ private void error(Throwable t) {
+ String methodName = "error";
+ try {
+ duccOut.debug(methodName, getJobId(), getProcessId(), "seqNo:"+getSeqNo()+" "+"casId:"+getCAS().hashCode(), t);
+ workItemMonitor.error(this,t);
+ }
+ catch(Exception exception) {
+ duccOut.error(methodName, null, exception);
+ }
+ }
+
public void lost() {
String methodName = "lost";
try {