You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/02/23 21:14:04 UTC
svn commit: r1661761 -
/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
Author: degenaro
Date: Mon Feb 23 20:14:03 2015
New Revision: 1661761
URL: http://svn.apache.org/r1661761
Log:
UIMA-4069 DUCC Job Driver (JD) system classpath
Add process deallocate reason to JD logging (e.g. Purged, Forced. etc.)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1661761&r1=1661760&r2=1661761&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Mon Feb 23 20:14:03 2015
@@ -46,6 +46,7 @@ import org.apache.uima.ducc.container.jd
import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo;
import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo.CompletionType;
import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
import org.apache.uima.ducc.container.jd.mh.impl.OperatingInfo;
import org.apache.uima.ducc.container.jd.wi.IRunningWorkItemStatistics;
@@ -71,6 +72,8 @@ public class MessageHandler implements I
private ConcurrentHashMap<IRemoteWorkerThread,IRemoteWorkerThread> wipMap = new ConcurrentHashMap<IRemoteWorkerThread,IRemoteWorkerThread>();
+ private JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
+
public MessageHandler() {
}
@@ -193,6 +196,27 @@ public class MessageHandler implements I
*/
}
+ private void processBlacklist(IProcessInfo processInfo, IRemoteWorkerProcess rwp) {
+ String location = "processBlacklist";
+ if(jobProcessBlacklist.includes(rwp)) {
+ MessageBuffer mb1 = new MessageBuffer();
+ mb1.append(Standardize.Label.remote.get()+rwp.toString());
+ mb1.append(Standardize.Label.status.get()+"already kaput");
+ logger.trace(location, ILogger.null_id, mb1.toString());
+ }
+ else {
+ jobProcessBlacklist.add(rwp);
+ MessageBuffer mb1 = new MessageBuffer();
+ mb1.append(Standardize.Label.remote.get()+rwp.toString());
+ mb1.append(Standardize.Label.status.get()+"transition to down");
+ String reasonDeallocated = processInfo.getReasonDeallocated();
+ if(reasonDeallocated != null) {
+ mb1.append(Standardize.Label.deallocate.get()+reasonDeallocated);
+ }
+ logger.warn(location, ILogger.null_id, mb1.toString());
+ }
+ }
+
@Override
public void handleProcessDown(IProcessInfo processInfo) {
String location = "handleProcessDown";
@@ -203,33 +227,17 @@ public class MessageHandler implements I
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
logger.trace(location, ILogger.null_id, mb.toString());
ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
- JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
+
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IRemoteWorkerThread rwt = entry.getKey();
if(rwt.comprises(processInfo)) {
RemoteWorkerProcess rwp = new RemoteWorkerProcess(rwt);
- if(jobProcessBlacklist.includes(rwp)) {
- MessageBuffer mb1 = new MessageBuffer();
- mb1.append(Standardize.Label.remote.get()+rwt.toString());
- mb1.append(Standardize.Label.status.get()+"already kaput");
- logger.trace(location, ILogger.null_id, mb1.toString());
- }
- else {
- jobProcessBlacklist.add(rwp);
- MessageBuffer mb1 = new MessageBuffer();
- mb1.append(Standardize.Label.remote.get()+rwt.toString());
- mb1.append(Standardize.Label.status.get()+"transition to down");
- String reasonDeallocated = processInfo.getReasonDeallocated();
- if(reasonDeallocated != null) {
- mb1.append(Standardize.Label.deallocate.get()+reasonDeallocated);
- }
- logger.warn(location, ILogger.null_id, mb1.toString());
- IWorkItem wi = entry.getValue();
- IFsm fsm = wi.getFsm();
- IEvent event = WiFsm.Process_Failure;
- Object actionData = new ActionData(wi, rwt, null);
- fsm.transition(event, actionData);
- }
+ processBlacklist(processInfo, rwp);
+ IWorkItem wi = entry.getValue();
+ IFsm fsm = wi.getFsm();
+ IEvent event = WiFsm.Process_Failure;
+ Object actionData = new ActionData(wi, rwt, null);
+ fsm.transition(event, actionData);
}
else {
MessageBuffer mb1 = new MessageBuffer();
@@ -254,30 +262,16 @@ public class MessageHandler implements I
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
logger.trace(location, ILogger.null_id, mb.toString());
ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
- JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IRemoteWorkerThread rwt = entry.getKey();
if(rwt.comprises(processInfo)) {
RemoteWorkerProcess rwp = new RemoteWorkerProcess(rwt);
- if(jobProcessBlacklist.includes(rwp)) {
- MessageBuffer mb1 = new MessageBuffer();
- mb1.append(Standardize.Label.remote.get()+rwt.toString());
- mb1.append(Standardize.Label.status.get()+"already kaput");
- logger.trace(location, ILogger.null_id, mb1.toString());
- }
- else {
- jobProcessBlacklist.add(rwp);
- MessageBuffer mb1 = new MessageBuffer();
- mb1.append(Standardize.Label.remote.get()+rwt.toString());
- mb1.append(Standardize.Label.status.get()+"transition to down");
- logger.warn(location, ILogger.null_id, mb1.toString());
- IWorkItem wi = entry.getValue();
- IFsm fsm = wi.getFsm();
- IEvent event = WiFsm.Process_Preempt;
- Object actionData = new ActionData(wi, rwt, null);
- fsm.transition(event, actionData);
- }
-
+ processBlacklist(processInfo, rwp);
+ IWorkItem wi = entry.getValue();
+ IFsm fsm = wi.getFsm();
+ IEvent event = WiFsm.Process_Preempt;
+ Object actionData = new ActionData(wi, rwt, null);
+ fsm.transition(event, actionData);
}
else {
MessageBuffer mb1 = new MessageBuffer();