You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/12/16 16:57:59 UTC
svn commit: r1645988 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd:
./ cas/ fsm/wi/ mh/ mh/iface/ mh/impl/ wi/
Author: degenaro
Date: Tue Dec 16 15:57:59 2014
New Revision: 1645988
URL: http://svn.apache.org/r1645988
Log:
UIMA-4069 DUCC Job Driver (JD) system classpath
Processes tab accounting (for JPs)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/ProcessInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/ProcessStatistics.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java Tue Dec 16 15:57:59 2014
@@ -27,15 +27,18 @@ import org.apache.uima.ducc.container.co
import org.apache.uima.ducc.container.common.logger.IComponent;
import org.apache.uima.ducc.container.common.logger.ILogger;
import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.mh.RemoteWorkerProcess;
import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
import org.apache.uima.ducc.container.jd.mh.iface.IWorkItemInfo;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemotePid;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
import org.apache.uima.ducc.container.jd.mh.impl.ProcessInfo;
import org.apache.uima.ducc.container.jd.mh.impl.WorkItemInfo;
import org.apache.uima.ducc.container.jd.wi.IProcessStatistics;
import org.apache.uima.ducc.container.jd.wi.IWorkItem;
import org.apache.uima.ducc.container.jd.wi.ProcessStatistics;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
public class JobDriverHelper {
@@ -80,7 +83,7 @@ public class JobDriverHelper {
}
public ArrayList<IProcessInfo> getProcessInfo() {
- String location = "getActiveProcessInfo";
+ String location = "getProcessInfo";
ArrayList<IProcessInfo> list = new ArrayList<IProcessInfo>();
try {
JobDriver jd = JobDriver.getInstance();
@@ -92,12 +95,16 @@ public class JobDriverHelper {
list.add(processInfo);
MessageBuffer mb = new MessageBuffer();
mb.append(Standardize.Label.node.get()+processInfo.getNodeName());
+ mb.append(Standardize.Label.ip.get()+processInfo.getNodeAddress());
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
mb.append(Standardize.Label.dispatch.get()+processInfo.getDispatch());
mb.append(Standardize.Label.done.get()+processInfo.getDone());
mb.append(Standardize.Label.error.get()+processInfo.getError());
mb.append(Standardize.Label.preempt.get()+processInfo.getPreempt());
mb.append(Standardize.Label.retry.get()+processInfo.getRetry());
+ mb.append(Standardize.Label.avg.get()+processInfo.getAvg());
+ mb.append(Standardize.Label.max.get()+processInfo.getMax());
+ mb.append(Standardize.Label.min.get()+processInfo.getMin());
logger.debug(location, ILogger.null_id, mb);
}
}
@@ -117,4 +124,47 @@ public class JobDriverHelper {
}
return processStatistics;
}
+
+ public boolean isEqual(String a, String b) {
+ boolean retVal = false;
+ if(a != null) {
+ if(b != null) {
+ return a.equals(b);
+ }
+ }
+ return retVal;
+ }
+
+ public boolean isEqual(IMetaCas a, IMetaCas b) {
+ boolean retVal = false;
+ if(a != null) {
+ if(b != null) {
+ return isEqual(a.getSystemKey(), b.getSystemKey());
+ }
+ }
+ return retVal;
+ }
+
+ public boolean isEqual(IWorkItem a, IWorkItem b) {
+ boolean retVal = false;
+ if(a != null) {
+ if(b != null) {
+ return isEqual(a.getMetaCas(), b.getMetaCas());
+ }
+ }
+ return retVal;
+ }
+
+ public IRemoteWorkerProcess getRemoteWorkerProcess(IWorkItem wi) {
+ IRemoteWorkerProcess rwp = null;
+ JobDriver jd = JobDriver.getInstance();
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = jd.getRemoteThreadMap();
+ for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
+ if(isEqual(entry.getValue(), wi)) {
+ IRemoteWorkerThread rwt = entry.getKey();
+ rwp = new RemoteWorkerProcess(rwt.getNodeName(),rwt.getNodeAddress(),rwt.getPid());
+ }
+ }
+ return rwp;
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java Tue Dec 16 15:57:59 2014
@@ -41,7 +41,19 @@ public class CasManagerStats {
private ConcurrentHashMap<String,AtomicInteger> retryReasonsMap = new ConcurrentHashMap<String,AtomicInteger>();
public int getUnfinishedWorkCount() {
- return crTotal.get() - (endSuccess.get() + endFailure.get());
+ return crTotal.get() - getEnded();
+ }
+
+ public int getPendingRetry() {
+ return retryQueuePuts.get() - retryQueueGets.get();
+ }
+
+ public int getEnded() {
+ return endSuccess.get() + endFailure.get();
+ }
+
+ public int getDispatched() {
+ return (crGets.get() - getEnded()) - getPendingRetry();
}
public void setCrTotal(int value) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java Tue Dec 16 15:57:59 2014
@@ -98,6 +98,18 @@ public class ActionEnd implements IActio
checkEnded(cm);
}
+ private void updateStatistics(IWorkItem wi) {
+ String location = "updateStatistics";
+ IWorkItemStatistics wis = JobDriver.getInstance().getWorkItemStatistics();
+ wis.ended(wi);
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.seqNo.get()+wi.getMetaCas().getSystemKey());
+ mb.append(Standardize.Label.avg.get()+wis.getMillisAvg());
+ mb.append(Standardize.Label.max.get()+wis.getMillisMax());
+ mb.append(Standardize.Label.min.get()+wis.getMillisMin());
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+
private void checkEnded(CasManager cm) {
String location = "checkEnded";
int remainder = cm.getCasManagerStats().getUnfinishedWorkCount();
@@ -163,6 +175,16 @@ public class ActionEnd implements IActio
}
}
+ private void displayProcessStatistics(IWorkItem wi, IProcessStatistics pStats) {
+ String location = "displayProcessStatistics";
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.seqNo.get()+wi.getMetaCas().getSystemKey());
+ mb.append(Standardize.Label.avg.get()+pStats.getMillisAvg());
+ mb.append(Standardize.Label.max.get()+pStats.getMillisMax());
+ mb.append(Standardize.Label.min.get()+pStats.getMillisMin());
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+
@Override
public void engage(Object objectData) {
String location = "engage";
@@ -190,8 +212,9 @@ public class ActionEnd implements IActio
}
else {
wisk.ended(seqNo);
- pStats.done(wi);
successWorkItem(cm, wi, trans, metaCas, rwt);
+ pStats.done(wi);
+ displayProcessStatistics(wi, pStats);
}
wi.resetTods();
}
@@ -205,16 +228,5 @@ public class ActionEnd implements IActio
logger.error(location, ILogger.null_id, e);
}
}
-
- private void updateStatistics(IWorkItem wi) {
- String location = "updateStatistics";
- IWorkItemStatistics wis = JobDriver.getInstance().getWorkItemStatistics();
- wis.ended(wi);
- MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.seqNo.get()+wi.getMetaCas().getSystemKey());
- mb.append(Standardize.Label.avg.get()+wis.getMillisAvg());
- mb.append(Standardize.Label.min.get()+wis.getMillisAvg());
- mb.append(Standardize.Label.max.get()+wis.getMillisAvg());
- logger.info(location, ILogger.null_id, mb.toString());
- }
+
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java Tue Dec 16 15:57:59 2014
@@ -18,13 +18,20 @@
*/
package org.apache.uima.ducc.container.jd.fsm.wi;
+import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.MetaCasHelper;
+import org.apache.uima.ducc.container.common.Standardize;
import org.apache.uima.ducc.container.common.fsm.iface.IAction;
import org.apache.uima.ducc.container.common.logger.IComponent;
import org.apache.uima.ducc.container.common.logger.ILogger;
import org.apache.uima.ducc.container.common.logger.Logger;
import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverHelper;
import org.apache.uima.ducc.container.jd.cas.CasManager;
import org.apache.uima.ducc.container.jd.cas.CasManagerStats.RetryReason;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
+import org.apache.uima.ducc.container.jd.wi.IProcessStatistics;
import org.apache.uima.ducc.container.jd.wi.IWorkItem;
import org.apache.uima.ducc.container.net.iface.IMetaCas;
@@ -36,7 +43,17 @@ public class ActionPreempt implements IA
public String getName() {
return ActionPreempt.class.getName();
}
-
+
+ private void preemptWorkItem(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {
+ String location = "preemptWorkItem";
+ cm.putMetaCas(metaCas, RetryReason.ProcessPreempt);
+ cm.getCasManagerStats().incEndRetry();
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.seqNo.get()+metaCas.getSystemKey());
+ mb.append(Standardize.Label.remote.get()+rwp.toString());
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+
@Override
public void engage(Object objectData) {
String location = "engage";
@@ -45,9 +62,32 @@ public class ActionPreempt implements IA
try {
IWorkItem wi = actionData.getWorkItem();
IMetaCas metaCas = wi.getMetaCas();
- //
- CasManager cm = JobDriver.getInstance().getCasManager();
- cm.putMetaCas(metaCas, RetryReason.ProcessPreempt);
+ JobDriver jd = JobDriver.getInstance();
+ CasManager cm = jd.getCasManager();
+ JobDriverHelper jdh = JobDriverHelper.getInstance();
+ IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
+ if(rwp != null) {
+ if(metaCas != null) {
+ preemptWorkItem(cm, wi, metaCas, rwp);
+ IWorkItemStateKeeper wisk = jd.getWorkItemStateKeeper();
+ MetaCasHelper metaCasHelper = new MetaCasHelper(metaCas);
+ IProcessStatistics pStats = jdh.getProcessStatistics(rwp);
+ int seqNo = metaCasHelper.getSystemKey();
+ wisk.preempt(seqNo);
+ pStats.preempt(wi);
+ wi.resetTods();
+ }
+ else {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append("No CAS found for processing");
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+ }
+ else {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append("No remote worker process entry found for processing");
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
}
catch(Exception e) {
logger.error(location, ILogger.null_id, e);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Tue Dec 16 15:57:59 2014
@@ -97,6 +97,7 @@ public class MessageHandler implements I
if(cms.isKillJob()) {
oi.setKillJob();
}
+ oi.setWorkItemDispatcheds(cms.getDispatched());
oi.setWorkItemPreemptions(cms.getNumberOfPreemptions());
oi.setWorkItemFinishedMillisMin(wis.getMillisMin());
oi.setWorkItemFinishedMillisMax(wis.getMillisMax());
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java Tue Dec 16 15:57:59 2014
@@ -51,6 +51,9 @@ public interface IOperatingInfo extends
public void setWorkItemEndRetrys(int value);
public int getWorkItemEndRetrys();
+
+ public void setWorkItemDispatcheds(int value);
+ public int getWorkItemDispatcheds();
public void setWorkItemPreemptions(int value);
public int getWorkItemPreemptions();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java Tue Dec 16 15:57:59 2014
@@ -39,6 +39,7 @@ public class OperatingInfo implements IO
private int jpEndSuccesses = 0;
private int jpEndFailures = 0;
private int jpEndRetrys = 0;
+ private int jpDispatcheds = 0;
private int jpPreemptions = 0;
private int jpUserProcessingTimeouts = 0;
private int jpUserProcessingErrorRetries = 0;
@@ -148,6 +149,16 @@ public class OperatingInfo implements IO
}
@Override
+ public void setWorkItemDispatcheds(int value) {
+ jpDispatcheds = value;
+ }
+
+ @Override
+ public int getWorkItemDispatcheds() {
+ return jpDispatcheds;
+ }
+
+ @Override
public void setWorkItemPreemptions(int value) {
jpPreemptions = value;
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/ProcessInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/ProcessInfo.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/ProcessInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/ProcessInfo.java Tue Dec 16 15:57:59 2014
@@ -46,6 +46,7 @@ public class ProcessInfo implements IPro
public ProcessInfo(String nodeName, String nodeAddress, int pid, IProcessStatistics pStats) {
setNodeName(nodeName);
+ setNodeAddress(nodeAddress);
setPid(pid);
setDispatch(pStats.getCountDispatch());
setDone(pStats.getCountDone());
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/ProcessStatistics.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/ProcessStatistics.java?rev=1645988&r1=1645987&r2=1645988&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/ProcessStatistics.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/ProcessStatistics.java Tue Dec 16 15:57:59 2014
@@ -20,8 +20,16 @@ package org.apache.uima.ducc.container.j
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.Standardize;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+
public class ProcessStatistics implements IProcessStatistics {
+ private static Logger logger = Logger.getLogger(ProcessStatistics.class, IComponent.Id.JD.name());
+
private AtomicLong dispatch = new AtomicLong(0);
private AtomicLong done = new AtomicLong(0);
private AtomicLong error = new AtomicLong(0);
@@ -37,9 +45,16 @@ public class ProcessStatistics implement
@Override
public void done(IWorkItem wi) {
+ String location = "done";
dispatch.decrementAndGet();
done.incrementAndGet();
wis.ended(wi);
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.seqNo.get()+wi.getMetaCas().getSystemKey());
+ mb.append(Standardize.Label.avg.get()+getMillisAvg());
+ mb.append(Standardize.Label.max.get()+getMillisMax());
+ mb.append(Standardize.Label.min.get()+getMillisMin());
+ logger.trace(location, ILogger.null_id, mb.toString());
}
@Override