You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2018/10/05 11:36:32 UTC

svn commit: r1842893 - in /uima/uima-ducc/trunk: uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/ uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ uima-ducc-orchestrator/src/main/java/org/apache/uima/d...

Author: degenaro
Date: Fri Oct  5 11:36:32 2018
New Revision: 1842893

URL: http://svn.apache.org/viewvc?rev=1842893&view=rev
Log:
UIMA-5883 DUCC JobDriver (JD) may cause job to never process all work items if JobProcess (JP) is preempted

Modified:
    uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
    uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java
    uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java

Modified: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java?rev=1842893&r1=1842892&r2=1842893&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java Fri Oct  5 11:36:32 2018
@@ -122,6 +122,7 @@ public class Standardize {
 		skewCount,
 		runMax,
 		runMin,
+		userKey,
 		;
 		
 		Label() {

Modified: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java?rev=1842893&r1=1842892&r2=1842893&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java Fri Oct  5 11:36:32 2018
@@ -37,6 +37,7 @@ import org.apache.uima.ducc.container.jd
 import org.apache.uima.ducc.container.jd.JobDriverHelper;
 import org.apache.uima.ducc.container.jd.blacklist.JobProcessBlacklist;
 import org.apache.uima.ducc.container.jd.cas.CasManager;
+import org.apache.uima.ducc.container.jd.cas.CasManagerStats.RetryReason;
 import org.apache.uima.ducc.container.jd.log.LoggerHelper;
 import org.apache.uima.ducc.container.jd.mh.RemoteWorkerProcess;
 import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo.CompletionType;
@@ -161,6 +162,25 @@ public class ActionGet implements IActio
 		return mmc;
 	}
 	
+	private synchronized void ungetMetaMetaCas(IActionData actionData, IRemoteWorkerProcess rwp, IMetaMetaCas mmc, RetryReason rr) throws JobDriverException {
+		String location = "ungetMetaMetaCas";
+		JobDriver jd = JobDriver.getInstance();
+		CasManager cm = jd.getCasManager();
+		if(mmc != null) {
+			IMetaTask metaCas = mmc.getMetaCas();
+			if(metaCas != null) {
+				String wiId = metaCas.getUserKey();
+				MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+				mb.append(Standardize.Label.node.get()+rwp.getNodeName());
+				mb.append(Standardize.Label.pid.get()+rwp.getPid());
+				mb.append(Standardize.Label.userKey+wiId);
+				logger.warn(location, ILogger.null_id, mb.toString());
+				mmc.setMetaCas(null);
+				cm.putMetaCas(metaCas, rr);
+			}
+		}
+	}
+	
 	@Override
 	public void engage(Object objectData) {
 		String location = "engage";
@@ -183,7 +203,7 @@ public class ActionGet implements IActio
 				IMetaMetaCas mmc = getMetaMetaCas(actionData);
 				if(mmc.isExhausted()) {
 					Long time = warnedExhausted.putIfAbsent(rwp, new Long(System.currentTimeMillis()));
-					if(time != null) {
+					if(time == null) {
 						MessageBuffer mbx = LoggerHelper.getMessageBuffer(actionData);
 						mbx.append(Standardize.Label.node.get()+rwp.getNodeName());
 						mbx.append(Standardize.Label.pid.get()+rwp.getPid());
@@ -194,7 +214,7 @@ public class ActionGet implements IActio
 				}
 				if(mmc.isPremature()) {
 					Long time = warnedPremature.putIfAbsent(rwp, new Long(System.currentTimeMillis()));
-					if(time != null) {
+					if(time == null) {
 						String text = fewerWorkItemsAvailableThanExpected;
 						jd.killJob(CompletionType.Exception, text);
 						MessageBuffer mbx = LoggerHelper.getMessageBuffer(actionData);
@@ -207,7 +227,7 @@ public class ActionGet implements IActio
 				}
 				else if(mmc.isKillJob()) {
 					Long time = warnedJobDiscontinued.putIfAbsent(rwp, new Long(System.currentTimeMillis()));
-					if(time != null) {
+					if(time == null) {
 						MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
 						mb.append(Standardize.Label.node.get()+rwp.getNodeName());
 						mb.append(Standardize.Label.pid.get()+rwp.getPid());
@@ -215,10 +235,11 @@ public class ActionGet implements IActio
 						logger.warn(location, ILogger.null_id, mb.toString());
 					}
 					TransactionHelper.addResponseHint(trans, Hint.Killed);
+					ungetMetaMetaCas(actionData,rwp,mmc,RetryReason.ProcessVolunteered);
 				}
 				else if(jobProcessBlacklist.includes(rwp)) {
 					Long time = warnedProcessDiscontinued.put(rwp, new Long(System.currentTimeMillis()));
-					if(time != null) {
+					if(time == null) {
 						MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
 						mb.append(Standardize.Label.node.get()+rwp.getNodeName());
 						mb.append(Standardize.Label.pid.get()+rwp.getPid());
@@ -226,6 +247,7 @@ public class ActionGet implements IActio
 						logger.warn(location, ILogger.null_id, mb.toString());
 					}
 					TransactionHelper.addResponseHint(trans, Hint.Blacklisted);
+					ungetMetaMetaCas(actionData,rwp,mmc,RetryReason.ProcessDown);
 				}
 				else {
 					metaCas = mmc.getMetaCas();

Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1842893&r1=1842892&r2=1842893&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Fri Oct  5 11:36:32 2018
@@ -618,6 +618,7 @@ public class StateManager {
 				copyProcessWorkItemsReport(duccWorkJob, jdStatusReport);
 				copyDriverWorkItemsReport(duccWorkJob, jdStatusReport);
 				//
+				logger.debug(methodName, duccId, duccWorkJob.getJobState(), "total="+jdStatusReport.getWorkItemsTotal(), "completed="+jdStatusReport.getWorkItemsProcessingCompleted(), "error="+jdStatusReport.getWorkItemsProcessingError());
 				switch(duccWorkJob.getJobState()) {
 				case Completed:
 					break;