You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/11/02 13:32:10 UTC
svn commit: r1636120 -
/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java
Author: degenaro
Date: Sun Nov 2 12:32:10 2014
New Revision: 1636120
URL: http://svn.apache.org/r1636120
Log:
UIMA-4069 Redesign of JD toward the main goal of classpath separation for container (system) code.
Update dispatcher to handle preemption events.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java?rev=1636120&r1=1636119&r2=1636120&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java Sun Nov 2 12:32:10 2014
@@ -18,6 +18,7 @@
*/
package org.apache.uima.ducc.container.jd.dispatch;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.uima.ducc.container.common.ContainerLogger;
@@ -30,6 +31,8 @@ import org.apache.uima.ducc.container.co
import org.apache.uima.ducc.container.jd.CasManagerStats;
import org.apache.uima.ducc.container.jd.JobDriverCasManager;
import org.apache.uima.ducc.container.jd.JobDriverCommon;
+import org.apache.uima.ducc.container.jd.dispatch.iface.IRemoteWorkerIdentity;
+import org.apache.uima.ducc.container.jd.dispatch.iface.IWorkItem;
import org.apache.uima.ducc.container.jd.fsm.wi.ActionData;
import org.apache.uima.ducc.container.jd.fsm.wi.WiFsm;
import org.apache.uima.ducc.container.jd.mh.iface.INodeInfo;
@@ -48,25 +51,77 @@ public class Dispatcher {
}
public IOperatingInfo handleGetOperatingInfo() {
- IOperatingInfo retVal = new OperatingInfo();
- JobDriverCasManager jdcm = JobDriverCommon.getInstance().getCasManager();
- CasManagerStats cms = jdcm.getCasManagerStats();
- retVal.setWorkItemCrTotal(cms.getCrTotal());
- retVal.setWorkItemCrFetches(cms.getCrGets());
+ String location = "handleGetOperatingInfo";
+ IOperatingInfo retVal = null;
+ try {
+ retVal = new OperatingInfo();
+ JobDriverCasManager jdcm = JobDriverCommon.getInstance().getCasManager();
+ CasManagerStats cms = jdcm.getCasManagerStats();
+ retVal.setWorkItemCrTotal(cms.getCrTotal());
+ retVal.setWorkItemCrFetches(cms.getCrGets());
+ retVal.setWorkItemPreemptions(cms.getNumberOfPreemptions());
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.crTotal.get()+retVal.getWorkItemCrTotal());
+ mb.append(Standardize.Label.crFetches.get()+retVal.getWorkItemCrFetches());
+ mb.append(Standardize.Label.preemptions.get()+retVal.getWorkItemPreemptions());
+ logger.debug(location, IEntityId.null_id, mb.toString());
+ }
+ catch(Exception e) {
+ logger.error(location, IEntityId.null_id, e);
+ }
return retVal;
}
public void handleDownNode(INodeInfo nodeInfo) {
-
+ String location = "handleDownNode";
+ try {
+ ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map = JobDriverCommon.getInstance().getMap();
+ //TODO
+ }
+ catch(Exception e) {
+ logger.error(location, IEntityId.null_id, e);
+ }
}
public void handleDownProcess(IProcessInfo processInfo) {
-
+ String location = "handleDownProcess";
+ try {
+ ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map = JobDriverCommon.getInstance().getMap();
+ //TODO
+ }
+ catch(Exception e) {
+ logger.error(location, IEntityId.null_id, e);
+ }
}
-
public void handlePreemptProcess(IProcessInfo processInfo) {
-
+ String location = "handlePreemptProcess";
+ try {
+ ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map = JobDriverCommon.getInstance().getMap();
+ for(Entry<IRemoteWorkerIdentity, IWorkItem> entry : map.entrySet()) {
+ IRemoteWorkerIdentity rwi = entry.getKey();
+ if(rwi.comprises(processInfo)) {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwi.toString());
+ mb.append(Boolean.TRUE.toString());
+ logger.debug(location, IEntityId.null_id, mb.toString());
+ IWorkItem wi = entry.getValue();
+ IFsm fsm = wi.getFsm();
+ IEvent event = WiFsm.Process_Preempt;
+ Object actionData = new ActionData(wi, rwi, null);
+ fsm.transition(event, actionData);
+ }
+ else {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwi.toString());
+ mb.append(Boolean.FALSE.toString());
+ logger.trace(location, IEntityId.null_id, mb.toString());
+ }
+ }
+ }
+ catch(Exception e) {
+ logger.error(location, IEntityId.null_id, e);
+ }
}
public void handleMetaCasTransation(IMetaCasTransaction trans) {
@@ -132,7 +187,7 @@ public class Dispatcher {
return wi;
}
- public void handleMetaCasTransationGet(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
+ private void handleMetaCasTransationGet(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
IWorkItem wi = register(rwi);
IFsm fsm = wi.getFsm();
IEvent event = WiFsm.Get_Request;
@@ -140,7 +195,7 @@ public class Dispatcher {
fsm.transition(event, actionData);
}
- public void handleMetaCasTransationAck(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
+ private void handleMetaCasTransationAck(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
String location = "handleMetaCasTransationAck";
IWorkItem wi = find(rwi);
if(wi == null) {
@@ -158,7 +213,7 @@ public class Dispatcher {
}
}
- public void handleMetaCasTransationEnd(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
+ private void handleMetaCasTransationEnd(IMetaCasTransaction trans, IRemoteWorkerIdentity rwi) {
String location = "handleMetaCasTransationEnd";
IWorkItem wi = find(rwi);
if(wi == null) {