You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/02/14 15:17:06 UTC
svn commit: r1659797 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src:
main/java/org/apache/uima/ducc/container/common/
main/java/org/apache/uima/ducc/container/jd/
main/java/org/apache/uima/ducc/container/jd/mh/
main/java/org/apache/uima/duc...
Author: degenaro
Date: Sat Feb 14 14:17:06 2015
New Revision: 1659797
URL: http://svn.apache.org/r1659797
Log:
UIMA-4236 DUCC Job Driver (JD) should support configurable startup initialization error limit
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java?rev=1659797&r1=1659796&r2=1659797&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java Sat Feb 14 14:17:06 2015
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.container.c
public class Standardize {
public enum Label {
+ limit,
classname,
exception,
instance,
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java?rev=1659797&r1=1659796&r2=1659797&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java Sat Feb 14 14:17:06 2015
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
import org.apache.uima.ducc.common.jd.files.workitem.WorkItemStateKeeper;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.container.common.FlagsExtendedHelper;
import org.apache.uima.ducc.container.common.MessageBuffer;
import org.apache.uima.ducc.container.common.Standardize;
@@ -83,6 +84,9 @@ public class JobDriver {
private boolean killJob = false;
private CompletionType completionType = CompletionType.Normal;
+ private String completionText = null;
+
+ private int startup_initialization_error_limit = DuccPropertiesResolver.get(DuccPropertiesResolver.ducc_jd_startup_initialization_error_limit, 1);
private JobDriver() throws JobDriverException {
initialize();
@@ -216,8 +220,15 @@ public class JobDriver {
}
public void killJob(CompletionType value) {
- killJob = true;
- completionType = value;
+ killJob(value, null);
+ }
+
+ public void killJob(CompletionType value, String text) {
+ if(!killJob) {
+ killJob = true;
+ completionType = value;
+ completionText = text;
+ }
}
public boolean isKillJob() {
@@ -227,4 +238,12 @@ public class JobDriver {
public CompletionType getCompletionType() {
return completionType;
}
+
+ public String getCompletionText() {
+ return completionText;
+ }
+
+ public int getStartupInitializationErrorLimit() {
+ return startup_initialization_error_limit;
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java?rev=1659797&r1=1659796&r2=1659797&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java Sat Feb 14 14:17:06 2015
@@ -26,9 +26,13 @@ import org.apache.uima.ducc.container.ne
public interface IMessageHandler {
public IOperatingInfo handleGetOperatingInfo();
- public void handleDownNode(INodeInfo nodeInfo);
- public void handleDownProcess(IProcessInfo processInfo);
- public void handlePreemptProcess(IProcessInfo processInfo);
+
+ public void handleNodeDown(INodeInfo nodeInfo);
+
+ public void handleProcessDown(IProcessInfo processInfo);
+ public void handleProcessPreempt(IProcessInfo processInfo);
+ public void handleProcessFailedInitialization(IProcessInfo processInfo);
+
public void handleMetaCasTransation(IMetaCasTransaction trans);
public void incGets();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1659797&r1=1659796&r2=1659797&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Sat Feb 14 14:17:06 2015
@@ -18,6 +18,7 @@
*/
package org.apache.uima.ducc.container.jd.mh;
+import java.io.File;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -43,6 +44,7 @@ import org.apache.uima.ducc.container.jd
import org.apache.uima.ducc.container.jd.mh.iface.INodeInfo;
import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo;
import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
+import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo.CompletionType;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
import org.apache.uima.ducc.container.jd.mh.impl.OperatingInfo;
import org.apache.uima.ducc.container.jd.wi.IRunningWorkItemStatistics;
@@ -65,6 +67,8 @@ public class MessageHandler implements I
private ConcurrentLinkedQueue<IRemoteWorkerThread> legacyList = new ConcurrentLinkedQueue<IRemoteWorkerThread>();
+ private ConcurrentHashMap<String,String> failedInitializationMap = new ConcurrentHashMap<String,String>();
+
private ConcurrentHashMap<IRemoteWorkerThread,IRemoteWorkerThread> wipMap = new ConcurrentHashMap<IRemoteWorkerThread,IRemoteWorkerThread>();
public MessageHandler() {
@@ -136,6 +140,7 @@ public class MessageHandler implements I
if(jd.isKillJob()) {
oi.setKillJob();
oi.setCompletionType(jd.getCompletionType());
+ oi.setCompletionText(jd.getCompletionText());
}
oi.setWorkItemDispatcheds(cms.getDispatched());
oi.setWorkItemRetrys(cms.getNumberOfRetrys());
@@ -175,7 +180,7 @@ public class MessageHandler implements I
}
@Override
- public void handleDownNode(INodeInfo nodeInfo) {
+ public void handleNodeDown(INodeInfo nodeInfo) {
//TODO
/*
String location = "handleDownNode";
@@ -189,8 +194,8 @@ public class MessageHandler implements I
}
@Override
- public void handleDownProcess(IProcessInfo processInfo) {
- String location = "handleDownProcess";
+ public void handleProcessDown(IProcessInfo processInfo) {
+ String location = "handleProcessDown";
try {
MessageBuffer mb = new MessageBuffer();
mb.append(Standardize.Label.node.get()+processInfo.getNodeName());
@@ -234,8 +239,8 @@ public class MessageHandler implements I
}
@Override
- public void handlePreemptProcess(IProcessInfo processInfo) {
- String location = "handlePreemptProcess";
+ public void handleProcessPreempt(IProcessInfo processInfo) {
+ String location = "handleProcessPreempt";
try {
MessageBuffer mb = new MessageBuffer();
mb.append(Standardize.Label.node.get()+processInfo.getNodeName());
@@ -311,6 +316,73 @@ public class MessageHandler implements I
}
}
+ @Override
+ public void handleProcessFailedInitialization(IProcessInfo processInfo) {
+ String location = "handleProcessFailedInitialization";
+ try {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+processInfo.getNodeName());
+ mb.append(Standardize.Label.ip.get()+processInfo.getNodeAddress());
+ mb.append(Standardize.Label.pid.get()+processInfo.getPid());
+ logger.trace(location, ILogger.null_id, mb.toString());
+
+ String nodeName = processInfo.getNodeName();
+ String nodeAddress = processInfo.getNodeAddress();
+ int pid = processInfo.getPid();
+
+ StringBuffer sb = new StringBuffer();
+ sb.append(nodeName);
+ sb.append(File.pathSeparator);
+ sb.append(nodeAddress);
+ sb.append(pid);
+
+ String jp = sb.toString();
+
+ JobDriver jd = JobDriver.getInstance();
+ boolean isKillJob = jd.isKillJob();
+
+ boolean added = failedInitializationMap.putIfAbsent(jp, jp) == null;
+ int failedCount = failedInitializationMap.size();
+ int failedLimit = jd.getStartupInitializationErrorLimit();
+
+ if(added) {
+ mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+nodeName);
+ mb.append(Standardize.Label.ip.get()+nodeAddress);
+ mb.append(Standardize.Label.pid.get()+pid);
+ mb.append(Standardize.Label.count.get()+failedCount);
+ mb.append(Standardize.Label.limit.get()+failedCount);
+ mb.append(Standardize.Label.isKillJob.get()+isKillJob);
+ logger.info(location, ILogger.null_id, mb.toString());
+ if(!isKillJob) {
+ if(failedCount >= failedLimit) {
+ String text = "startup initialization error limit exceeded";
+ jd.killJob(CompletionType.Exception, text);
+ mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+nodeName);
+ mb.append(Standardize.Label.ip.get()+nodeAddress);
+ mb.append(Standardize.Label.pid.get()+pid);
+ mb.append(Standardize.Label.isKillJob.get()+jd.isKillJob());
+ mb.append(Standardize.Label.type.get()+jd.getCompletionType().toString());
+ mb.append(Standardize.Label.reason.get()+jd.getCompletionText());
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+ }
+ }
+ else {
+ mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+nodeName);
+ mb.append(Standardize.Label.ip.get()+nodeAddress);
+ mb.append(Standardize.Label.pid.get()+pid);
+ mb.append(Standardize.Label.count.get()+failedCount);
+ logger.trace(location, ILogger.null_id, mb.toString());
+ }
+ }
+ catch(Exception e) {
+ logger.error(location, ILogger.null_id, e);
+ }
+ }
+
@Override
public void handleMetaCasTransation(IMetaCasTransaction trans) {
String location = "handleMetaCasTransation";
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java?rev=1659797&r1=1659796&r2=1659797&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java Sat Feb 14 14:17:06 2015
@@ -31,6 +31,9 @@ public interface IOperatingInfo extends
public void setCompletionType(CompletionType value);
public CompletionType getCompletionType();
+ public void setCompletionText(String value);
+ public String getCompletionText();
+
public void setJobId(String value);
public String getJobId();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java?rev=1659797&r1=1659796&r2=1659797&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java Sat Feb 14 14:17:06 2015
@@ -29,6 +29,7 @@ public class OperatingInfo implements IO
private static final long serialVersionUID = 1L;
private CompletionType completionType = CompletionType.Normal;
+ private String completionText = null;
private String jobId = null;
@@ -77,6 +78,16 @@ public class OperatingInfo implements IO
}
@Override
+ public void setCompletionText(String value) {
+ completionText = value;
+ }
+
+ @Override
+ public String getCompletionText() {
+ return completionText;
+ }
+
+ @Override
public void setJobId(String value) {
jobId = value;
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java?rev=1659797&r1=1659796&r2=1659797&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java Sat Feb 14 14:17:06 2015
@@ -394,7 +394,7 @@ public class TestMessageHandler extends
int n = randomTest03.nextInt(100);
if(n < pctTest03) {
IProcessInfo processInfo = new ProcessInfo(ti.getNodeName(),null,""+ti.getPid(),ti.getPid());
- messageHandler.handlePreemptProcess(processInfo);
+ messageHandler.handleProcessPreempt(processInfo);
}
}