You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/02/07 14:48:27 UTC
svn commit: r1565656 -
/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
Author: degenaro
Date: Fri Feb 7 13:48:27 2014
New Revision: 1565656
URL: http://svn.apache.org/r1565656
Log:
UIMA-3600 DUCC Job Driver (JD) does not properly handle work items for down node (RM Purged)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java?rev=1565656&r1=1565655&r2=1565656&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java Fri Feb 7 13:48:27 2014
@@ -65,6 +65,7 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
import org.apache.uima.ducc.transport.event.common.IDuccUimaDeployableConfiguration;
import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
@@ -996,6 +997,90 @@ public class JobDriver extends Thread im
return;
}
+ // <UIMA-3600>
+
+ private IDuccProcess getProcess(WorkItem workItem) {
+ IDuccProcess process = null;
+ if(workItem != null) {
+ DuccProcessMap duccProcessMap = (DuccProcessMap)getJob().getProcessMap();
+ process = duccProcessMap.get(workItem.getProcessId());
+ }
+ return process;
+ }
+
+ private boolean isUserFailure(IDuccProcess process) {
+ boolean retVal = false;
+ return retVal;
+ }
+
+ private boolean isNodeFailure(WorkItem workItem) {
+ return isNodeFailure(getProcess(workItem));
+ }
+
+ private boolean isNodeFailure(IDuccProcess process) {
+ boolean retVal = false;
+ if(process != null) {
+ ProcessDeallocationType deallocationType = process.getProcessDeallocationType();
+ if(deallocationType != null) {
+ switch(deallocationType) {
+ case Purged:
+ retVal = true;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ return retVal;
+ }
+
+ private int ProcessStatusMaxWaitSeconds = 5 * 60;
+ private long MillisPerSecond = 1000;
+
+ private void waitForProcessStatus(IJobDriver jobDriver, WorkItem workItem) {
+ String location = "waitForProcessStatus";
+ try {
+ String casId = workItem.getCasId();
+ String seqNo = ""+workItem.getSeqNo();
+ IDuccWorkJob job = jobDriver.getJob();
+ DuccId jobDuccId = job.getDuccId();
+ DuccId processDuccId = null;
+ IDuccProcess process = getProcess(workItem);
+ if(process != null) {
+ processDuccId = process.getDuccId();
+ duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: pending");
+ long expiry = System.currentTimeMillis() + ProcessStatusMaxWaitSeconds * MillisPerSecond;
+ duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" max wait millis: "+expiry);
+ long now = System.currentTimeMillis();
+ while(now < expiry) {
+ try {
+ Thread.sleep(1000);
+ }
+ catch(InterruptedException e) {
+ }
+ if(isNodeFailure(process)) {
+ duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: node failure");
+ break;
+ }
+ if(isUserFailure(process)) {
+ duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: user failure");
+ break;
+ }
+ now = System.currentTimeMillis();
+ }
+ duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: expired");
+ }
+ else {
+ duccOut.debug(location, jobDuccId, processDuccId, "seqNo:"+seqNo+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+casId+" process status: not found");
+ }
+ }
+ catch(Exception e) {
+ duccOut.error(location, null, "process status error?", e);
+ }
+ }
+
+ // </UIMA-3600>
+
public void waitForLocation(IJobDriver jobDriver, WorkItem workItem) {
String location = "waitForLocation";
try {
@@ -1395,7 +1480,18 @@ public class JobDriver extends Thread im
}
else if(timeout) {
duccOut.debug(location, jobid, "action:timeout "+getThreadLocationInfo(workItem), e);
- employPluginExceptionHandler(workItem, e);
+ // <UIMA-3600>
+ waitForProcessStatus(this, workItem);
+ if(isNodeFailure(workItem)) {
+ duccOut.debug(location, jobid, "action:timeout-node-failure-retry "+getThreadLocationInfo(workItem), e);
+ retry(workItem);
+ // node failed, work item interrupted - don't add another work item to queue
+ }
+ else {
+ duccOut.debug(location, jobid, "action:timeout-handler "+getThreadLocationInfo(workItem), e);
+ employPluginExceptionHandler(workItem, e);
+ }
+ // </UIMA-3600>
}
else if(isUnknownProcess(workItem)) {
duccOut.debug(location, jobid, "action:unknown-process "+getThreadLocationInfo(workItem), e);