You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2013/07/29 20:18:12 UTC

svn commit: r1508161 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd: JobDriver.java client/IWorkItemMonitor.java client/WorkItem.java

Author: degenaro
Date: Mon Jul 29 18:18:12 2013
New Revision: 1508161

URL: http://svn.apache.org/r1508161
Log:
UIMA-3125 DUCC job driver (JD) must gracefully handle Throwable from UIMA-AS client sendAndReceive

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java?rev=1508161&r1=1508160&r2=1508161&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java Mon Jul 29 18:18:12 2013
@@ -1398,6 +1398,23 @@ public class JobDriver extends Thread im
 		return;
 	}
 	
+	public void error(WorkItem workItem, Throwable t) {
+		String location = "error";
+		try {
+			duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+			duccOut.debug(location, jobid, "action:error "+getThreadLocationInfo(workItem), t);
+			workItemStateManager.error(workItem.getSeqNo());
+			workItemError(workItem, t);
+			remove(workItem);
+			casSource.recycle(workItem.getCAS());
+			accountingWorkItemIsError(workItem.getProcessId());
+			queueCASes(1,queue,workItemFactory);
+		}
+		catch(Exception exception) {
+			duccOut.error(location, jobid, "processing error?", exception);
+		}
+		return;
+	}
 	
 	private void workItemLost(WorkItem workItem) {
 		String location = "workItemLost";
@@ -1405,8 +1422,8 @@ public class JobDriver extends Thread im
 		duccOut.error(location, workItem.getJobId(), "seqNo:"+workItem.getSeqNo());
 	}
 	
-	private void workItemError(WorkItem workItem, Exception e) {
-		workItemError(workItem, e, null);
+	private void workItemError(WorkItem workItem, Throwable t) {
+		workItemError(workItem, t, null);
 	}
 	
 	/*
@@ -1415,7 +1432,7 @@ public class JobDriver extends Thread im
 	}
 	*/
 	
-	private void workItemError(WorkItem workItem, Exception e, Directive directive) {
+	private void workItemError(WorkItem workItem, Throwable t, Directive directive) {
 		String location = "workItemError";
 		driverStatusReport.countWorkItemsProcessingError();
 		String nodeId = "?";
@@ -1438,9 +1455,9 @@ public class JobDriver extends Thread im
 			
 			duccOut.error(location, djid, dpid, message);
 			duccErr.error(location, djid, dpid, message);
-			if(e != null) {
-				duccOut.error(location, djid, dpid, e);
-				duccErr.error(location, djid, dpid, e);
+			if(t != null) {
+				duccOut.error(location, djid, dpid, t);
+				duccErr.error(location, djid, dpid, t);
 			}
 		}
 		catch(Exception exception) {

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java?rev=1508161&r1=1508160&r2=1508161&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/IWorkItemMonitor.java Mon Jul 29 18:18:12 2013
@@ -27,5 +27,6 @@ public interface IWorkItemMonitor {
 	public void dequeued(WorkItem workitem, String node, String pid);
 	public void ended(WorkItem workitem);
 	public void exception(WorkItem workitem, Exception e);
+	public void error(WorkItem workitem, Throwable t);
 	public void lost(WorkItem workItem);
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java?rev=1508161&r1=1508160&r2=1508161&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java Mon Jul 29 18:18:12 2013
@@ -19,6 +19,7 @@
 package org.apache.uima.ducc.jd.client;
 
 import java.util.ArrayList;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
@@ -124,24 +125,43 @@ public class WorkItem implements Runnabl
 		return callbackState;
 	}
 	
+	protected void injectRandomThrowable() throws Throwable {
+		// < *** TEST ONLY!!! *** >
+		final boolean test = false;
+		if(test) {
+			Random random = new Random();
+			if(random.nextBoolean()) {
+				throw new Throwable("just testing Throwable handler");
+			}
+		}
+		// </ *** TEST ONLY!!! *** >
+	}
+	
 	public void run() {
 		String methodName = "run";
 		duccOut.debug(methodName, jobId, duccMsg.fetch("enter"));
 		try {
-			start();
-			CAS cas = this.casTuple.getCas();
-			duccOut.debug(methodName, jobId, duccMsg.fetchLabel("CAS.size")+cas.size());
-			callbackState.statePendingQueued();
-			duccOut.debug(methodName, null, "seqNo:"+getSeqNo()+" "+callbackState.getState());
-			client.sendAndReceiveCAS(cas, analysisEnginePerformanceMetricsList);
-			if(!isLost.get()) {
-				ended();
-			}
-		} catch(Exception e) {
-			if(!isLost.get()) {
-				exception(e);
+			try {
+				start();
+				CAS cas = this.casTuple.getCas();
+				duccOut.debug(methodName, jobId, duccMsg.fetchLabel("CAS.size")+cas.size());
+				callbackState.statePendingQueued();
+				duccOut.debug(methodName, null, "seqNo:"+getSeqNo()+" "+callbackState.getState());
+				client.sendAndReceiveCAS(cas, analysisEnginePerformanceMetricsList);
+				//injectRandomThrowable();
+				if(!isLost.get()) {
+					ended();
+				}
+			} catch(Exception e) {
+				if(!isLost.get()) {
+					exception(e);
+				}
 			}
 		}
+		catch(Throwable t) {
+			error(t);
+		}
+		
 		duccOut.debug(methodName, jobId, duccMsg.fetch("exit"));
 	}
 	
@@ -173,6 +193,17 @@ public class WorkItem implements Runnabl
 		}
 	}
 	
+	private void error(Throwable t) {
+		String methodName = "error";
+		try {
+			duccOut.debug(methodName, getJobId(), getProcessId(), "seqNo:"+getSeqNo()+" "+"casId:"+getCAS().hashCode(), t);
+			workItemMonitor.error(this,t);
+		}
+		catch(Exception exception) {
+			duccOut.error(methodName, null, exception);
+		}
+	}
+	
 	public void lost() {
 		String methodName = "lost";
 		try {