You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/02/07 14:48:27 UTC

svn commit: r1565656 - /uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java

Author: degenaro
Date: Fri Feb  7 13:48:27 2014
New Revision: 1565656

URL: http://svn.apache.org/r1565656
Log:
UIMA-3600 DUCC Job Driver (JD) does not properly handle work items for down node (RM Purged)

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java?rev=1565656&r1=1565655&r2=1565656&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java Fri Feb  7 13:48:27 2014
@@ -65,6 +65,7 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
 import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
 import org.apache.uima.ducc.transport.event.common.IDuccUimaDeployableConfiguration;
 import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
 import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
@@ -996,6 +997,90 @@ public class JobDriver extends Thread im
 		return;
 	}
 	
+	// <UIMA-3600>
+	
+	private IDuccProcess getProcess(WorkItem workItem) {
+		IDuccProcess process = null;
+		if(workItem != null) {
+			DuccProcessMap duccProcessMap = (DuccProcessMap)getJob().getProcessMap();
+			process = duccProcessMap.get(workItem.getProcessId());
+		}
+		return process;
+	}
+	
+	private boolean isUserFailure(IDuccProcess process) {
+		boolean retVal = false;
+		return retVal;
+	}
+	
+	private boolean isNodeFailure(WorkItem workItem) {
+		return isNodeFailure(getProcess(workItem));
+	}
+	
+	private boolean isNodeFailure(IDuccProcess process) {
+		boolean retVal = false;
+		if(process != null) {
+			ProcessDeallocationType deallocationType = process.getProcessDeallocationType();
+			if(deallocationType != null) {
+				switch(deallocationType) {
+				case Purged:
+					retVal = true;
+					break;
+				default:
+					break;
+				}
+			}
+		}
+		return retVal;
+	}
+	
+	private int ProcessStatusMaxWaitSeconds = 5 * 60;
+	private long MillisPerSecond = 1000;
+	
+	private void waitForProcessStatus(IJobDriver jobDriver, WorkItem workItem) {
+		String location = "waitForProcessStatus";
+		try {
+			String casId = workItem.getCasId();
+			String seqNo = ""+workItem.getSeqNo();
+			IDuccWorkJob job = jobDriver.getJob();
+			DuccId jobDuccId = job.getDuccId();
+			DuccId processDuccId = null;
+			IDuccProcess process = getProcess(workItem);
+			if(process != null) {
+				processDuccId = process.getDuccId();
+				duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: pending");
+				long expiry = System.currentTimeMillis() + ProcessStatusMaxWaitSeconds * MillisPerSecond;
+				duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" max wait millis: "+expiry);
+				long now = System.currentTimeMillis();
+				while(now < expiry) {
+					try {
+						Thread.sleep(1000);
+					}
+					catch(InterruptedException e) {
+					}
+					if(isNodeFailure(process)) {
+						duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: node failure");
+						break;
+					}
+					if(isUserFailure(process)) {
+						duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: user failure");
+						break;
+					}
+					now = System.currentTimeMillis();
+				}
+				duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: expired");
+			}
+			else {
+				duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: not found");
+			}
+		}
+		catch(Exception e) {
+			duccOut.error(location, null, "process status error?", e);
+		}
+	}
+	
+	// </UIMA-3600>
+	
 	public void waitForLocation(IJobDriver jobDriver, WorkItem workItem) {
 		String location = "waitForLocation";
 		try {
@@ -1395,7 +1480,18 @@ public class JobDriver extends Thread im
 			}
 			else if(timeout) {
 				duccOut.debug(location, jobid, "action:timeout "+getThreadLocationInfo(workItem), e);
-				employPluginExceptionHandler(workItem, e);
+				// <UIMA-3600>
+				waitForProcessStatus(this, workItem);
+				if(isNodeFailure(workItem)) {
+					duccOut.debug(location, jobid, "action:timeout-node-failure-retry "+getThreadLocationInfo(workItem), e);
+					retry(workItem);
+					// node failed, work item interrupted - don't add another work item to queue
+				}
+				else {
+					duccOut.debug(location, jobid, "action:timeout-handler "+getThreadLocationInfo(workItem), e);
+					employPluginExceptionHandler(workItem, e);
+				}
+				// </UIMA-3600>
 			}
 			else if(isUnknownProcess(workItem)) {
 				duccOut.debug(location, jobid, "action:unknown-process "+getThreadLocationInfo(workItem), e);