You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/02/23 21:14:04 UTC

svn commit: r1661761 - /uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java

Author: degenaro
Date: Mon Feb 23 20:14:03 2015
New Revision: 1661761

URL: http://svn.apache.org/r1661761
Log:
UIMA-4069 DUCC Job Driver (JD) system classpath

Add process deallocate reason to JD logging (e.g. Purged, Forced. etc.)

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1661761&r1=1661760&r2=1661761&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Mon Feb 23 20:14:03 2015
@@ -46,6 +46,7 @@ import org.apache.uima.ducc.container.jd
 import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo;
 import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo.CompletionType;
 import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
 import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
 import org.apache.uima.ducc.container.jd.mh.impl.OperatingInfo;
 import org.apache.uima.ducc.container.jd.wi.IRunningWorkItemStatistics;
@@ -71,6 +72,8 @@ public class MessageHandler implements I
 	
 	private ConcurrentHashMap<IRemoteWorkerThread,IRemoteWorkerThread> wipMap = new ConcurrentHashMap<IRemoteWorkerThread,IRemoteWorkerThread>();
 	
+	private JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
+	
 	public MessageHandler() {
 	}
 	
@@ -193,6 +196,27 @@ public class MessageHandler implements I
 		*/
 	}
 	
+	private void processBlacklist(IProcessInfo processInfo, IRemoteWorkerProcess rwp) {
+		String location = "processBlacklist";
+		if(jobProcessBlacklist.includes(rwp)) {
+			MessageBuffer mb1 = new MessageBuffer();
+			mb1.append(Standardize.Label.remote.get()+rwp.toString());
+			mb1.append(Standardize.Label.status.get()+"already kaput");
+			logger.trace(location, ILogger.null_id, mb1.toString());
+		}
+		else {
+			jobProcessBlacklist.add(rwp);
+			MessageBuffer mb1 = new MessageBuffer();
+			mb1.append(Standardize.Label.remote.get()+rwp.toString());
+			mb1.append(Standardize.Label.status.get()+"transition to down");
+			String reasonDeallocated = processInfo.getReasonDeallocated();
+			if(reasonDeallocated != null) {
+				mb1.append(Standardize.Label.deallocate.get()+reasonDeallocated);
+			}
+			logger.warn(location, ILogger.null_id, mb1.toString());
+		}
+	}
+	
 	@Override
 	public void handleProcessDown(IProcessInfo processInfo) {
 		String location = "handleProcessDown";
@@ -203,33 +227,17 @@ public class MessageHandler implements I
 			mb.append(Standardize.Label.pid.get()+processInfo.getPid());
 			logger.trace(location, ILogger.null_id, mb.toString());
 			ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
-			JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
+			
 			for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
 				IRemoteWorkerThread rwt = entry.getKey();
 				if(rwt.comprises(processInfo)) {
 					RemoteWorkerProcess rwp = new RemoteWorkerProcess(rwt);
-					if(jobProcessBlacklist.includes(rwp)) {
-						MessageBuffer mb1 = new MessageBuffer();
-						mb1.append(Standardize.Label.remote.get()+rwt.toString());
-						mb1.append(Standardize.Label.status.get()+"already kaput");
-						logger.trace(location, ILogger.null_id, mb1.toString());
-					}
-					else {
-						jobProcessBlacklist.add(rwp);
-						MessageBuffer mb1 = new MessageBuffer();
-						mb1.append(Standardize.Label.remote.get()+rwt.toString());
-						mb1.append(Standardize.Label.status.get()+"transition to down");
-						String reasonDeallocated = processInfo.getReasonDeallocated();
-						if(reasonDeallocated != null) {
-							mb1.append(Standardize.Label.deallocate.get()+reasonDeallocated);
-						}
-						logger.warn(location, ILogger.null_id, mb1.toString());
-						IWorkItem wi = entry.getValue();
-						IFsm fsm = wi.getFsm();
-						IEvent event = WiFsm.Process_Failure;
-						Object actionData = new ActionData(wi, rwt, null);
-						fsm.transition(event, actionData);
-					}
+					processBlacklist(processInfo, rwp);
+					IWorkItem wi = entry.getValue();
+					IFsm fsm = wi.getFsm();
+					IEvent event = WiFsm.Process_Failure;
+					Object actionData = new ActionData(wi, rwt, null);
+					fsm.transition(event, actionData);
 				}
 				else {
 					MessageBuffer mb1 = new MessageBuffer();
@@ -254,30 +262,16 @@ public class MessageHandler implements I
 			mb.append(Standardize.Label.pid.get()+processInfo.getPid());
 			logger.trace(location, ILogger.null_id, mb.toString());
 			ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
-			JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
 			for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
 				IRemoteWorkerThread rwt = entry.getKey();
 				if(rwt.comprises(processInfo)) {
 					RemoteWorkerProcess rwp = new RemoteWorkerProcess(rwt);
-					if(jobProcessBlacklist.includes(rwp)) {
-						MessageBuffer mb1 = new MessageBuffer();
-						mb1.append(Standardize.Label.remote.get()+rwt.toString());
-						mb1.append(Standardize.Label.status.get()+"already kaput");
-						logger.trace(location, ILogger.null_id, mb1.toString());
-					}
-					else {
-						jobProcessBlacklist.add(rwp);
-						MessageBuffer mb1 = new MessageBuffer();
-						mb1.append(Standardize.Label.remote.get()+rwt.toString());
-						mb1.append(Standardize.Label.status.get()+"transition to down");
-						logger.warn(location, ILogger.null_id, mb1.toString());
-						IWorkItem wi = entry.getValue();
-						IFsm fsm = wi.getFsm();
-						IEvent event = WiFsm.Process_Preempt;
-						Object actionData = new ActionData(wi, rwt, null);
-						fsm.transition(event, actionData);
-					}
-					
+					processBlacklist(processInfo, rwp);
+					IWorkItem wi = entry.getValue();
+					IFsm fsm = wi.getFsm();
+					IEvent event = WiFsm.Process_Preempt;
+					Object actionData = new ActionData(wi, rwt, null);
+					fsm.transition(event, actionData);
 				}
 				else {
 					MessageBuffer mb1 = new MessageBuffer();