You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/11/02 13:32:10 UTC

svn commit: r1636120 - /uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java

Author: degenaro
Date: Sun Nov  2 12:32:10 2014
New Revision: 1636120

URL: http://svn.apache.org/r1636120
Log:
UIMA-4069 Redesign of JD toward the main goal of classpath separation for container (system) code.

Update dispatcher to handle preemption events.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java?rev=1636120&r1=1636119&r2=1636120&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java Sun Nov  2 12:32:10 2014
@@ -18,6 +18,7 @@
 */
 package org.apache.uima.ducc.container.jd.dispatch;
 
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.uima.ducc.container.common.ContainerLogger;
@@ -30,6 +31,8 @@ import org.apache.uima.ducc.container.co
 import org.apache.uima.ducc.container.jd.CasManagerStats;
 import org.apache.uima.ducc.container.jd.JobDriverCasManager;
 import org.apache.uima.ducc.container.jd.JobDriverCommon;
+import org.apache.uima.ducc.container.jd.dispatch.iface.IRemoteWorkerIdentity;
+import org.apache.uima.ducc.container.jd.dispatch.iface.IWorkItem;
 import org.apache.uima.ducc.container.jd.fsm.wi.ActionData;
 import org.apache.uima.ducc.container.jd.fsm.wi.WiFsm;
 import org.apache.uima.ducc.container.jd.mh.iface.INodeInfo;
@@ -48,25 +51,77 @@ public class Dispatcher {
 	}
 	
 	public IOperatingInfo handleGetOperatingInfo() {
-		IOperatingInfo retVal = new OperatingInfo();
-		JobDriverCasManager jdcm = JobDriverCommon.getInstance().getCasManager();
-		CasManagerStats cms = jdcm.getCasManagerStats();
-		retVal.setWorkItemCrTotal(cms.getCrTotal());
-		retVal.setWorkItemCrFetches(cms.getCrGets());
+		String location = "handleGetOperatingInfo";
+		IOperatingInfo retVal = null;
+		try {
+			retVal = new OperatingInfo();
+			JobDriverCasManager jdcm = JobDriverCommon.getInstance().getCasManager();
+			CasManagerStats cms = jdcm.getCasManagerStats();
+			retVal.setWorkItemCrTotal(cms.getCrTotal());
+			retVal.setWorkItemCrFetches(cms.getCrGets());
+			retVal.setWorkItemPreemptions(cms.getNumberOfPreemptions());
+			MessageBuffer mb = new MessageBuffer();
+			mb.append(Standardize.Label.crTotal.get()+retVal.getWorkItemCrTotal());
+			mb.append(Standardize.Label.crFetches.get()+retVal.getWorkItemCrFetches());
+			mb.append(Standardize.Label.preemptions.get()+retVal.getWorkItemPreemptions());
+			logger.debug(location, IEntityId.null_id, mb.toString());
+		}
+		catch(Exception e) {
+			logger.error(location, IEntityId.null_id, e);
+		}
 		return retVal;
 	}
 	
 	public void handleDownNode(INodeInfo nodeInfo) {
-		
+		String location = "handleDownNode";
+		try {
+			ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map = JobDriverCommon.getInstance().getMap();
+			//TODO
+		}
+		catch(Exception e) {
+			logger.error(location, IEntityId.null_id, e);
+		}
 	}
 	
 	public void handleDownProcess(IProcessInfo processInfo) {
-		
+		String location = "handleDownProcess";
+		try {
+			ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map = JobDriverCommon.getInstance().getMap();
+			//TODO
+		}
+		catch(Exception e) {
+			logger.error(location, IEntityId.null_id, e);
+		}
 	}
-
 	
 	public void handlePreemptProcess(IProcessInfo processInfo) {
-		
+		String location = "handlePreemptProcess";
+		try {
+			ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map = JobDriverCommon.getInstance().getMap();
+			for(Entry<IRemoteWorkerIdentity, IWorkItem> entry : map.entrySet()) {
+				IRemoteWorkerIdentity rwi = entry.getKey();
+				if(rwi.comprises(processInfo)) {
+					MessageBuffer mb = new MessageBuffer();
+					mb.append(Standardize.Label.remote.get()+rwi.toString());
+					mb.append(Boolean.TRUE.toString());
+					logger.debug(location, IEntityId.null_id, mb.toString());
+					IWorkItem wi = entry.getValue();
+					IFsm fsm = wi.getFsm();
+					IEvent event = WiFsm.Process_Preempt;
+					Object actionData = new ActionData(wi, rwi, null);
+					fsm.transition(event, actionData);
+				}
+				else {
+					MessageBuffer mb = new MessageBuffer();
+					mb.append(Standardize.Label.remote.get()+rwi.toString());
+					mb.append(Boolean.FALSE.toString());
+					logger.trace(location, IEntityId.null_id, mb.toString());
+				}
+			}
+		}
+		catch(Exception e) {
+			logger.error(location, IEntityId.null_id, e);
+		}
 	}
 	
 	public void handleMetaCasTransation(IMetaCasTransaction trans) {
@@ -132,7 +187,7 @@ public class Dispatcher {
 		return wi;
 	}
 	
-	public void handleMetaCasTransationGet(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
+	private void handleMetaCasTransationGet(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
 		IWorkItem wi = register(rwi);
 		IFsm fsm = wi.getFsm();
 		IEvent event = WiFsm.Get_Request;
@@ -140,7 +195,7 @@ public class Dispatcher {
 		fsm.transition(event, actionData);
 	}
 	
-	public void handleMetaCasTransationAck(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
+	private void handleMetaCasTransationAck(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
 		String location = "handleMetaCasTransationAck";
 		IWorkItem wi = find(rwi);
 		if(wi == null) {
@@ -158,7 +213,7 @@ public class Dispatcher {
 		}
 	}
 	
-	public void handleMetaCasTransationEnd(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
+	private void handleMetaCasTransationEnd(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
 		String location = "handleMetaCasTransationEnd";
 		IWorkItem wi = find(rwi);
 		if(wi == null) {