You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/04/15 14:22:47 UTC
svn commit: r1673740 [1/2] - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/
uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/
uima-ducc-container/src/main/java/org/apache...
Author: degenaro
Date: Wed Apr 15 12:22:46 2015
New Revision: 1673740
URL: http://svn.apache.org/r1673740
Log:
UIMA-4347 DUCC Job Driver (JD) should tell JP when an Ack arrives late
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test0.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test1.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test2.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestBase.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/classloading/
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/classloading/TestClassLoading.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler01.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler02a.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler02b.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler03.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler04.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler05a.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler05b.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler06.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/wi/
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/wi/fsm/
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/wi/fsm/TestWiFsm.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/wi/statefile/
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/wi/statefile/TestWiStateFile.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/wi/tracker/
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/wi/tracker/TestWiTracker.java (with props)
Removed:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/ATest.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestClassLoading.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestWiFsm.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestWiStateFile.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestWiTracker.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionAckTimeout.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEndTimeout.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGetRedux.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/RunningWorkItemStatistics.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WiTracker.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestSuite.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/helper/ThreadInfoFactory.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriver.java Wed Apr 15 12:22:46 2015
@@ -51,29 +51,27 @@ public class JobDriver {
private static JobDriver instance = null;
- public static JobDriver getInstance() {
- return instance;
- }
-
- public static JobDriver createInstance() throws JobDriverException {
- synchronized(JobDriver.class) {
- if(instance == null) {
+ public synchronized static JobDriver getInstance() {
+ String location = "getInstance";
+ if(instance == null) {
+ try {
instance = new JobDriver();
+ } catch (JobDriverException e) {
+ logger.error(location, ILogger.null_id, e);
}
}
- return getInstance();
+ return instance;
}
- public static void destroyInstance() {
- synchronized(JobDriver.class) {
- instance = null;
- }
+ public synchronized static JobDriver getNewInstance() {
+ instance = null;
+ return getInstance();
}
private String jobId = null;
private String logDir = null;
private long workItemTimeoutMillis = 24*60*60*1000;
- private ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> remoteThreadMap = null;
+ private ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> remoteWorkerThreadMap = null;
private ConcurrentHashMap<IRemotePid, IProcessStatistics> remoteProcessMap = null;
private IWorkItemStatistics wis = null;
private CasManager cm = null;
@@ -102,7 +100,7 @@ public class JobDriver {
jobId = feh.getJobId();
logDir = feh.getLogDirectory();
setWorkItemTimeout();
- remoteThreadMap = new ConcurrentHashMap<IRemoteWorkerThread, IWorkItem>();
+ remoteWorkerThreadMap = new ConcurrentHashMap<IRemoteWorkerThread, IWorkItem>();
remoteProcessMap = new ConcurrentHashMap<IRemotePid, IProcessStatistics>();
wis = new WorkItemStatistics();
wisk = new WorkItemStateKeeper(IComponent.Id.JD.name(), logDir);
@@ -145,8 +143,8 @@ public class JobDriver {
return jobId;
}
- public ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> getRemoteThreadMap() {
- return remoteThreadMap;
+ public ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> getRemoteWorkerThreadMap() {
+ return remoteWorkerThreadMap;
}
public ConcurrentHashMap<IRemotePid, IProcessStatistics> getRemoteProcessMap() {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java Wed Apr 15 12:22:46 2015
@@ -28,18 +28,15 @@ import org.apache.uima.ducc.container.co
import org.apache.uima.ducc.container.common.logger.IComponent;
import org.apache.uima.ducc.container.common.logger.ILogger;
import org.apache.uima.ducc.container.common.logger.Logger;
-import org.apache.uima.ducc.container.jd.mh.RemoteWorkerProcess;
import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
import org.apache.uima.ducc.container.jd.mh.iface.IWorkItemInfo;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemotePid;
-import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
import org.apache.uima.ducc.container.jd.mh.impl.ProcessInfo;
import org.apache.uima.ducc.container.jd.mh.impl.WorkItemInfo;
import org.apache.uima.ducc.container.jd.wi.IProcessStatistics;
import org.apache.uima.ducc.container.jd.wi.IWorkItem;
import org.apache.uima.ducc.container.jd.wi.ProcessStatistics;
-import org.apache.uima.ducc.container.net.iface.IMetaCas;
public class JobDriverHelper {
@@ -56,7 +53,7 @@ public class JobDriverHelper {
ArrayList<IWorkItemInfo> list = new ArrayList<IWorkItemInfo>();
try {
JobDriver jd = JobDriver.getInstance();
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = jd.getRemoteThreadMap();
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = jd.getRemoteWorkerThreadMap();
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IRemoteWorkerThread rwt = entry.getKey();
IWorkItem wi = entry.getValue();
@@ -144,47 +141,4 @@ public class JobDriverHelper {
return processStatistics;
}
- public boolean isEqual(String a, String b) {
- boolean retVal = false;
- if(a != null) {
- if(b != null) {
- return a.equals(b);
- }
- }
- return retVal;
- }
-
- public boolean isEqual(IMetaCas a, IMetaCas b) {
- boolean retVal = false;
- if(a != null) {
- if(b != null) {
- return isEqual(a.getSystemKey(), b.getSystemKey());
- }
- }
- return retVal;
- }
-
- public boolean isEqual(IWorkItem a, IWorkItem b) {
- boolean retVal = false;
- if(a != null) {
- if(b != null) {
- return isEqual(a.getMetaCas(), b.getMetaCas());
- }
- }
- return retVal;
- }
-
- public IRemoteWorkerProcess getRemoteWorkerProcess(IWorkItem wi) {
- IRemoteWorkerProcess rwp = null;
- JobDriver jd = JobDriver.getInstance();
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = jd.getRemoteThreadMap();
- for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
- if(isEqual(entry.getValue(), wi)) {
- IRemoteWorkerThread rwt = entry.getKey();
- rwp = new RemoteWorkerProcess(rwt.getNodeName(),rwt.getNodeAddress(),rwt.getPidName(),rwt.getPid());
- }
- }
- return rwp;
- }
-
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionAckTimeout.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionAckTimeout.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionAckTimeout.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionAckTimeout.java Wed Apr 15 12:22:46 2015
@@ -67,8 +67,9 @@ public class ActionAckTimeout extends Ac
IMetaCas metaCas = wi.getMetaCas();
JobDriver jd = JobDriver.getInstance();
CasManager cm = jd.getCasManager();
+ WiTracker tracker = WiTracker.getInstance();
+ IRemoteWorkerProcess rwp = tracker.getRemoteWorkerProcess(wi);
JobDriverHelper jdh = JobDriverHelper.getInstance();
- IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
if(rwp != null) {
if(metaCas != null) {
recallWorkItem(actionData, cm, metaCas, wi);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java Wed Apr 15 12:22:46 2015
@@ -214,8 +214,13 @@ public class ActionEnd extends Action im
IProcessStatistics pStats = jdh.getProcessStatistics(rwp);
//
int seqNo = metaCasHelper.getSystemKey();
- String delimiter = Standardize.Label.seqNo.get()+seqNo+" ***** EXCEPTION *****";
- toJdErrLog(delimiter);
+ try {
+ String delimiter = Standardize.Label.seqNo.get()+seqNo+" ***** EXCEPTION *****";
+ toJdErrLog(delimiter);
+ }
+ catch(Exception e) {
+ logger.error(location, ILogger.null_id, e);
+ }
Object userException = metaCas.getUserSpaceException();
try {
String printableException = proxy.convert(userException);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEndTimeout.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEndTimeout.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEndTimeout.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEndTimeout.java Wed Apr 15 12:22:46 2015
@@ -67,8 +67,9 @@ public class ActionEndTimeout extends Ac
IMetaCas metaCas = wi.getMetaCas();
JobDriver jd = JobDriver.getInstance();
CasManager cm = jd.getCasManager();
+ WiTracker tracker = WiTracker.getInstance();
+ IRemoteWorkerProcess rwp = tracker.getRemoteWorkerProcess(wi);
JobDriverHelper jdh = JobDriverHelper.getInstance();
- IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
if(rwp != null) {
if(metaCas != null) {
recallWorkItem(actionData, cm, metaCas, wi);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java Wed Apr 15 12:22:46 2015
@@ -36,7 +36,6 @@ import org.apache.uima.ducc.container.jd
import org.apache.uima.ducc.container.jd.cas.CasManager;
import org.apache.uima.ducc.container.jd.log.LoggerHelper;
import org.apache.uima.ducc.container.jd.mh.RemoteWorkerProcess;
-import org.apache.uima.ducc.container.jd.mh.RemoteWorkerThread;
import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo.CompletionType;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
@@ -69,10 +68,11 @@ public class ActionGet implements IActio
IActionData actionData = (IActionData) objectData;
try {
if(actionData != null) {
- IWorkItem wi = actionData.getWorkItem();
+ IRemoteWorkerThread rwt = actionData.getRemoteWorkerThread();
+ WiTracker tracker = WiTracker.getInstance();
+ IWorkItem wi = tracker.find(rwt);
IFsm fsm = wi.getFsm();
IMetaCasTransaction trans = actionData.getMetaCasTransaction();
- IRemoteWorkerThread rwt = new RemoteWorkerThread(trans);
IRemoteWorkerProcess rwp = new RemoteWorkerProcess(trans);
//
JobDriver jd = JobDriver.getInstance();
@@ -115,7 +115,6 @@ public class ActionGet implements IActio
IEvent event = null;
//
if(metaCas != null) {
- WiTracker.getInstance().assign(wi, rwt);
int seqNo = metaCasHelper.getSystemKey();
String wiId = metaCas.getUserKey();
String node = rwt.getNodeAddress();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGetRedux.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGetRedux.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGetRedux.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGetRedux.java Wed Apr 15 12:22:46 2015
@@ -18,8 +18,6 @@
*/
package org.apache.uima.ducc.container.jd.fsm.wi;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.uima.ducc.container.common.MessageBuffer;
import org.apache.uima.ducc.container.common.fsm.iface.IAction;
import org.apache.uima.ducc.container.common.fsm.iface.IEvent;
@@ -27,10 +25,10 @@ import org.apache.uima.ducc.container.co
import org.apache.uima.ducc.container.common.logger.IComponent;
import org.apache.uima.ducc.container.common.logger.ILogger;
import org.apache.uima.ducc.container.common.logger.Logger;
-import org.apache.uima.ducc.container.jd.JobDriver;
import org.apache.uima.ducc.container.jd.log.LoggerHelper;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
import org.apache.uima.ducc.container.jd.wi.IWorkItem;
+import org.apache.uima.ducc.container.jd.wi.WiTracker;
import org.apache.uima.ducc.container.net.iface.IMetaCas;
public class ActionGetRedux implements IAction {
@@ -50,8 +48,8 @@ public class ActionGetRedux implements I
try {
if(actionData != null) {
IRemoteWorkerThread rwt = actionData.getRemoteWorkerThread();
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
- IWorkItem wi = map.get(rwt);
+ WiTracker tracker = WiTracker.getInstance();
+ IWorkItem wi = tracker.find(rwt);
IFsm fsm = wi.getFsm();
IEvent event = WiFsm.CAS_Unavailable;
if(wi != null) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java Wed Apr 15 12:22:46 2015
@@ -91,8 +91,9 @@ public class ActionProcessFailure extend
IMetaCas metaCas = wi.getMetaCas();
JobDriver jd = JobDriver.getInstance();
CasManager cm = jd.getCasManager();
+ WiTracker tracker = WiTracker.getInstance();
+ IRemoteWorkerProcess rwp = tracker.getRemoteWorkerProcess(wi);
JobDriverHelper jdh = JobDriverHelper.getInstance();
- IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
if(rwp != null) {
IProcessStatistics pStats = jdh.getProcessStatistics(rwp);
if(metaCas != null) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java Wed Apr 15 12:22:46 2015
@@ -67,8 +67,9 @@ public class ActionProcessPreempt extend
IMetaCas metaCas = wi.getMetaCas();
JobDriver jd = JobDriver.getInstance();
CasManager cm = jd.getCasManager();
+ WiTracker tracker = WiTracker.getInstance();
+ IRemoteWorkerProcess rwp = tracker.getRemoteWorkerProcess(wi);
JobDriverHelper jdh = JobDriverHelper.getInstance();
- IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
if(rwp != null) {
if(metaCas != null) {
recallWorkItem(actionData, cm, metaCas, wi);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java Wed Apr 15 12:22:46 2015
@@ -67,8 +67,9 @@ public class ActionProcessVolunteered ex
IMetaCas metaCas = wi.getMetaCas();
JobDriver jd = JobDriver.getInstance();
CasManager cm = jd.getCasManager();
+ WiTracker tracker = WiTracker.getInstance();
+ IRemoteWorkerProcess rwp = tracker.getRemoteWorkerProcess(wi);
JobDriverHelper jdh = JobDriverHelper.getInstance();
- IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
if(rwp != null) {
if(metaCas != null) {
recallWorkItem(actionData, cm, metaCas, wi);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Wed Apr 15 12:22:46 2015
@@ -53,13 +53,15 @@ import org.apache.uima.ducc.container.jd
import org.apache.uima.ducc.container.jd.wi.IWorkItem;
import org.apache.uima.ducc.container.jd.wi.IWorkItemStatistics;
import org.apache.uima.ducc.container.jd.wi.RunningWorkItemStatistics;
-import org.apache.uima.ducc.container.jd.wi.WorkItem;
+import org.apache.uima.ducc.container.jd.wi.WiTracker;
import org.apache.uima.ducc.container.jd.wi.perf.IWorkItemPerformanceKeeper;
import org.apache.uima.ducc.container.net.iface.IMetaCas;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Hint;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.JdState;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
+import org.apache.uima.ducc.container.net.impl.MetaCas;
+import org.apache.uima.ducc.container.net.impl.TransactionHelper;
public class MessageHandler implements IMessageHandler {
@@ -235,8 +237,7 @@ public class MessageHandler implements I
mb.append(Standardize.Label.ip.get()+processInfo.getNodeAddress());
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
logger.trace(location, ILogger.null_id, mb.toString());
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
-
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = WiTracker.getInstance().find(processInfo);
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IRemoteWorkerThread rwt = entry.getKey();
if(rwt.comprises(processInfo)) {
@@ -270,7 +271,7 @@ public class MessageHandler implements I
mb.append(Standardize.Label.ip.get()+processInfo.getNodeAddress());
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
logger.trace(location, ILogger.null_id, mb.toString());
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = WiTracker.getInstance().find(processInfo);
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IRemoteWorkerThread rwt = entry.getKey();
if(rwt.comprises(processInfo)) {
@@ -304,7 +305,7 @@ public class MessageHandler implements I
mb.append(Standardize.Label.ip.get()+processInfo.getNodeAddress());
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
logger.trace(location, ILogger.null_id, mb.toString());
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = WiTracker.getInstance().find(processInfo);
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IRemoteWorkerThread rwt = entry.getKey();
if(rwt.comprises(processInfo)) {
@@ -480,50 +481,6 @@ public class MessageHandler implements I
}
}
- private IWorkItem register(IRemoteWorkerThread rwi) {
- String location = "register";
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
- IWorkItem wi = map.get(rwi);
- while(wi == null) {
- IMetaCas metaCas = null;
- IFsm fsm = new WiFsm();
- map.putIfAbsent(rwi, new WorkItem(metaCas, fsm));
- wi = map.get(rwi);
- MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.remote.get()+rwi.toString());
- logger.debug(location, ILogger.null_id, mb.toString());
- }
- return wi;
- }
-
- private IWorkItem find(IRemoteWorkerThread rwt) {
- String location = "find";
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
- IWorkItem wi = map.get(rwt);
- if(wi != null) {
- IMetaCas metaCas = wi.getMetaCas();
- if(metaCas != null) {
- MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.remote.get()+rwt.toString());
- mb.append(Standardize.Label.seqNo.get()+metaCas.getSystemKey());
- logger.debug(location, ILogger.null_id, mb.toString());
- }
- else {
- MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.remote.get()+rwt.toString());
- mb.append("has no work assigned presently");
- logger.debug(location, ILogger.null_id, mb.toString());
- }
- }
- else {
- MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.remote.get()+rwt.toString());
- mb.append("has no work assigned presently");
- logger.debug(location, ILogger.null_id, mb.toString());
- }
- return wi;
- }
-
private void update(IWorkItem wi, IMetaCas metaCas) {
IMetaCas local = wi.getMetaCas();
IMetaCas remote = metaCas;
@@ -536,64 +493,63 @@ public class MessageHandler implements I
}
private void handleMetaCasTransationGet(IMetaCasTransaction trans, IRemoteWorkerThread rwt) {
- IWorkItem wi = register(rwt);
+ WiTracker tracker = WiTracker.getInstance();
+ IWorkItem wi = tracker.assign(rwt);
IFsm fsm = wi.getFsm();
IEvent event = WiFsm.Get_Request;
Object actionData = new ActionData(wi, rwt, trans);
fsm.transition(event, actionData);
}
-
+
private void handleMetaCasTransationAck(IMetaCasTransaction trans, IRemoteWorkerThread rwt) {
String location = "handleMetaCasTransationAck";
- IWorkItem wi = find(rwt);
- if(wi == null) {
- MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.remote.get()+rwt.toString());
- mb.append("has no work assigned presently");
- logger.debug(location, ILogger.null_id, mb.toString());
- }
- else {
- update(wi, trans.getMetaCas());
+ WiTracker tracker = WiTracker.getInstance();
+ MetaCas metaCas = (MetaCas) trans.getMetaCas();
+ if(tracker.isRecognized(rwt, metaCas)) {
+ IWorkItem wi = tracker.find(rwt);
+ update(wi, metaCas);
IFsm fsm = wi.getFsm();
IEvent event = WiFsm.Ack_Request;
Object actionData = new ActionData(wi, rwt, trans);
fsm.transition(event, actionData);
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.AckMsecs.get()+(wi.getTodAck()-wi.getTodGet()));
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ else {
+ trans.setMetaCas(null);
+ TransactionHelper.addResponseHint(trans, Hint.Rejected);
}
}
private void handleMetaCasTransationEnd(IMetaCasTransaction trans, IRemoteWorkerThread rwt) {
String location = "handleMetaCasTransationEnd";
- IWorkItem wi = find(rwt);
- if(wi == null) {
- MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.remote.get()+rwt.toString());
- mb.append("has no work assigned presently");
- logger.debug(location, ILogger.null_id, mb.toString());
- }
- else {
- update(wi, trans.getMetaCas());
+ WiTracker tracker = WiTracker.getInstance();
+ MetaCas metaCas = (MetaCas) trans.getMetaCas();
+ if(tracker.isRecognized(rwt, metaCas)) {
+ IWorkItem wi = tracker.find(rwt);
+ update(wi, metaCas);
IFsm fsm = wi.getFsm();
IEvent event = WiFsm.End_Request;
Object actionData = new ActionData(wi, rwt, trans);
fsm.transition(event, actionData);
MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.AckMsecs.get()+(wi.getTodAck()-wi.getTodGet()));
mb.append(Standardize.Label.EndMsecs.get()+(wi.getTodEnd()-wi.getTodAck()));
logger.debug(location, ILogger.null_id, mb.toString());
}
+ else {
+ trans.setMetaCas(null);
+ TransactionHelper.addResponseHint(trans, Hint.Rejected);
+ }
}
private void handleMetaCasTransationInvestmentReset(IMetaCasTransaction trans, IRemoteWorkerThread rwt) {
String location = "handleMetaCasTransationInvestmentReset";
- IWorkItem wi = find(rwt);
- if(wi == null) {
- MessageBuffer mb = new MessageBuffer();
- mb.append(Standardize.Label.remote.get()+rwt.toString());
- mb.append("has no work assigned presently");
- logger.debug(location, ILogger.null_id, mb.toString());
- }
- else {
- update(wi, trans.getMetaCas());
+ WiTracker tracker = WiTracker.getInstance();
+ MetaCas metaCas = (MetaCas) trans.getMetaCas();
+ if(tracker.isRecognized(rwt, metaCas)) {
+ IWorkItem wi = tracker.find(rwt);
+ update(wi, metaCas);
IFsm fsm = wi.getFsm();
IEvent event = WiFsm.Investment_Reset;
Object actionData = new ActionData(wi, rwt, trans);
@@ -603,6 +559,10 @@ public class MessageHandler implements I
mb.append("investment reset");
logger.debug(location, ILogger.null_id, mb.toString());
}
+ else {
+ trans.setMetaCas(null);
+ TransactionHelper.addResponseHint(trans, Hint.Rejected);
+ }
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/RunningWorkItemStatistics.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/RunningWorkItemStatistics.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/RunningWorkItemStatistics.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/RunningWorkItemStatistics.java Wed Apr 15 12:22:46 2015
@@ -35,7 +35,7 @@ public class RunningWorkItemStatistics i
long min = Long.MAX_VALUE;
long max = 0;
long todMrs = 0;
- ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteWorkerThreadMap();
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IWorkItem wi = entry.getValue();
long time = wi.getMillisOperating();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WiTracker.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WiTracker.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WiTracker.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WiTracker.java Wed Apr 15 12:22:46 2015
@@ -23,54 +23,174 @@ import java.util.concurrent.ConcurrentHa
import org.apache.uima.ducc.container.common.MessageBuffer;
import org.apache.uima.ducc.container.common.Standardize;
+import org.apache.uima.ducc.container.common.fsm.iface.IFsm;
import org.apache.uima.ducc.container.common.logger.IComponent;
import org.apache.uima.ducc.container.common.logger.ILogger;
import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.fsm.wi.WiFsm;
import org.apache.uima.ducc.container.jd.log.LoggerHelper;
+import org.apache.uima.ducc.container.jd.mh.RemoteWorkerProcess;
+import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+import org.apache.uima.ducc.container.net.impl.MetaCas;
public class WiTracker {
-
- private ConcurrentHashMap<IWorkItem, IRemoteWorkerThread> map = new ConcurrentHashMap<IWorkItem, IRemoteWorkerThread>();
private static Logger logger = Logger.getLogger(WiTracker.class, IComponent.Id.JD.name());
private static WiTracker instance = new WiTracker();
+ private WiTracker() {
+ }
+
public static WiTracker getInstance() {
return instance;
}
- public void assign(IWorkItem wi, IRemoteWorkerThread rwt) {
+ private ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> getMap() {
+ return JobDriver.getInstance().getRemoteWorkerThreadMap();
+ }
+
+ public IWorkItem assign(IRemoteWorkerThread rwt) {
String location = "assign";
- try {
- map.put(wi, rwt);
- report();
- }
- catch(Exception e) {
- logger.error(location, ILogger.null_id, e);
- }
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = getMap();
+ IWorkItem wi = null;
+ if(rwt != null) {
+ wi = find(rwt);
+ if(wi == null) {
+ IMetaCas metaCas = null;
+ IFsm fsm = new WiFsm();
+ wi = new WorkItem(metaCas, fsm);
+ map.put(rwt, wi);
+ }
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwt.toString());
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ report();
+ return wi;
}
public void unassign(IWorkItem wi) {
+ String location = "unassign";
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = getMap();
+ IRemoteWorkerThread rwt = find(wi);
+ if(rwt != null) {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwt.toString());
+ logger.debug(location, ILogger.null_id, mb.toString());
+ map.remove(rwt);
+ }
+ report();
+ }
+
+ public IWorkItem find(IRemoteWorkerThread rwt) {
+ String location = "find";
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = getMap();
+ IWorkItem wi = null;
+ if(rwt != null) {
+ wi = map.get(rwt);
+ }
+ if(wi != null) {
+ IMetaCas metaCas = wi.getMetaCas();
+ if(metaCas != null) {
+
+ }
+ else {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwt.toString());
+ mb.append("has no work assigned presently");
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ }
+ return wi;
+ }
+
+ public IRemoteWorkerThread find(IWorkItem wi) {
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = getMap();
+ IRemoteWorkerThread rwt = null;
if(wi != null) {
- map.remove(wi);
- report();
+ for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
+ if(wi.getSeqNo() == entry.getValue().getSeqNo()) {
+ rwt = entry.getKey();
+ break;
+ }
+ }
}
+ return rwt;
}
+ public IRemoteWorkerProcess getRemoteWorkerProcess(IWorkItem wi) {
+ IRemoteWorkerProcess rwp = null;
+ IRemoteWorkerThread rwt = find(wi);
+ if(wi != null) {
+ rwp = new RemoteWorkerProcess(rwt.getNodeName(),rwt.getNodeAddress(),rwt.getPidName(),rwt.getPid());
+ }
+ return rwp;
+ }
+
+ public ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> find(IProcessInfo processInfo) {
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = getMap();
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> submap = new ConcurrentHashMap<IRemoteWorkerThread, IWorkItem>();
+ if(processInfo != null) {
+ for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
+ IRemoteWorkerThread rwt = entry.getKey();
+ if(rwt.comprises(processInfo)) {
+ IWorkItem wi = entry.getValue();
+ submap.put(rwt, wi);
+ }
+ }
+ }
+ return submap;
+ }
+
+ public boolean isRecognized(IRemoteWorkerThread rwt, MetaCas metaCas) {
+ String location = "isRecognized";
+ boolean retVal = true;
+ IWorkItem wi = find(rwt);
+ if(wi == null) {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwt.toString());
+ mb.append("has no work assigned presently");
+ logger.debug(location, ILogger.null_id, mb.toString());
+ retVal = false;
+ }
+ else if(metaCas == null) {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwt.toString());
+ mb.append("meta-cas not present");
+ logger.debug(location, ILogger.null_id, mb.toString());
+ retVal = false;
+ }
+ else if(wi.getSeqNo() != metaCas.getSeqNo()) {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwt.toString());
+ mb.append(Standardize.Label.seqNo.get()+metaCas.getSeqNo());
+ mb.append(Standardize.Label.seqNo.get()+wi.getSeqNo());
+ mb.append("remote/local sequence number mis-match");
+ logger.debug(location, ILogger.null_id, mb.toString());
+ retVal = false;
+ }
+ return retVal;
+ }
+
public int getSize() {
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = getMap();
return map.size();
}
private void report() {
String location = "report";
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = getMap();
MessageBuffer mb = new MessageBuffer();
mb.append(Standardize.Label.size.get()+map.size());
logger.trace(location, ILogger.null_id, mb.toString());
- for(Entry<IWorkItem, IRemoteWorkerThread> entry : map.entrySet()) {
- IWorkItem wi = entry.getKey();
- IRemoteWorkerThread rwt = entry.getValue();
+ for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
+ IWorkItem wi = entry.getValue();
+ IRemoteWorkerThread rwt = entry.getKey();
MessageBuffer mb1 = LoggerHelper.getMessageBuffer(wi, rwt);
logger.trace(location, ILogger.null_id, mb1.toString());
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java Wed Apr 15 12:22:46 2015
@@ -34,6 +34,7 @@ public interface IMetaCasTransaction ext
Blacklisted, // the requesting JP has been blacklisted (no workitems will ever be assigned)
Killed, // the JD has been killed
Exhausted, // the workitems have all been processed (successfully or otherwise)
+ Rejected, // the request has been rejected
};
public List<Hint> getResponseHints();
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test0.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test0.java?rev=1673740&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test0.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test0.java Wed Apr 15 12:22:46 2015
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.test;
+
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class Test0 extends TestBase {
+
+ protected JobDriver jd;
+
+ @Before
+ public void setUp() throws JobDriverException {
+ initialize();
+ jd = JobDriver.getInstance();
+ }
+
+ @Test
+ public void test_00() throws JobDriverException {
+ jd.getJdState();
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test0.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test0.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test1.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test1.java?rev=1673740&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test1.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test1.java Wed Apr 15 12:22:46 2015
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.test;
+
+import java.io.File;
+
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverException;
+import org.apache.uima.ducc.container.jd.test.helper.Utilities;
+import org.junit.Before;
+import org.junit.Test;
+
+public class Test1 extends TestSuite {
+
+ protected JobDriver jd;
+
+ @Before
+ public void setUp() throws JobDriverException {
+ initialize();
+ jd = JobDriver.getInstance();
+ }
+
+ @Test
+ public void test_11() {
+ testIncludeAll();
+ }
+
+ @Test
+ public void test_12() {
+ testExcludeOne(2);
+ String userClasspath = Utilities.getInstance().getUserCP();
+ String[] cpParts = userClasspath.split(File.pathSeparator);
+ for(int i=0; i<cpParts.length; i++) {
+ testExcludeOne(i);
+ }
+ }
+
+ //@Test
+ public void test_13() {
+ testNoXml();
+ }
+
+ @Test
+ public void test_14() {
+ getTotal();
+ }
+
+ @Test
+ public void test_15() {
+ getMetaCas();
+ }
+
+ @Test
+ public void test_16() {
+ getMetaCases(0);
+ }
+
+ @Test
+ public void test_17() {
+ getMetaCases(10);
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test1.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test1.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test2.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test2.java?rev=1673740&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test2.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test2.java Wed Apr 15 12:22:46 2015
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverException;
+import org.apache.uima.ducc.container.jd.cas.CasManager;
+import org.apache.uima.ducc.container.jd.cas.CasManagerStats.RetryReason;
+import org.apache.uima.ducc.container.jd.mh.RemoteWorkerThread;
+import org.apache.uima.ducc.container.jd.mh.iface.IWorkItemInfo;
+import org.apache.uima.ducc.container.jd.mh.impl.OperatingInfo;
+import org.apache.uima.ducc.container.jd.mh.impl.WorkItemInfo;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+import org.junit.Before;
+import org.junit.Test;
+
+public class Test2 extends TestSuite {
+
+ protected JobDriver jd;
+
+ @Before
+ public void setUp() throws JobDriverException {
+ initialize();
+ jd = JobDriver.getInstance();
+ }
+
+ @Test
+ public void test_20() {
+ try {
+ config();
+ CasManager cm = new CasManager();
+ int total = cm.getCasManagerStats().getCrTotal();
+ assertTrue(total == 100);
+ IMetaCas metaCas = cm.getMetaCas();
+ int retrys = 3;
+ while(metaCas != null) {
+ if(cm.getCasManagerStats().getRetryQueuePuts() < retrys) {
+ cm.putMetaCas(metaCas, RetryReason.ProcessPreempt);
+ }
+ metaCas = cm.getMetaCas();
+ }
+ int crGets = cm.getCasManagerStats().getCrGets();
+ debug("crGets:"+crGets);
+ assertTrue(crGets == total);
+ int rqPuts = cm.getCasManagerStats().getRetryQueuePuts();
+ debug("rqPuts:"+rqPuts);
+ int rqGets = cm.getCasManagerStats().getRetryQueueGets();
+ debug("rqGets:"+rqGets);
+ assertTrue(rqPuts == retrys);
+ assertTrue(rqGets == rqPuts);
+ asExpected("puts == "+rqPuts);
+ asExpected("gets == "+rqGets);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail("Exception");
+ }
+ }
+ @Test
+ public void test_21() {
+ String n01 = "node01";
+ int p10 = 10;
+ int t20 = 20;
+ RemoteWorkerThread rwt01A = new RemoteWorkerThread(n01,null,p10+"",p10,t20);
+ RemoteWorkerThread rwt01B = new RemoteWorkerThread(n01,null,p10+"",p10,t20);
+ assertTrue(rwt01A.equals(rwt01A));
+ assertTrue(rwt01B.equals(rwt01B));
+ assertTrue(rwt01A.equals(rwt01B));
+ assertTrue(rwt01A.compareTo(rwt01A) == 0);
+ assertTrue(rwt01B.compareTo(rwt01B) == 0);
+ assertTrue(rwt01A.compareTo(rwt01B) == 0);
+ String n02 = "node02";
+ int p30 = 30;
+ int t40 = 40;
+ RemoteWorkerThread rwt02A = new RemoteWorkerThread(n02,null,p10+"",p10,t20);
+ RemoteWorkerThread rwt01C = new RemoteWorkerThread(n01,null,p30+"",p30,t20);
+ RemoteWorkerThread rwt01D = new RemoteWorkerThread(n01,null,p10+"",p10,t40);
+ assertTrue(!rwt01A.equals(rwt02A));
+ assertTrue(!rwt01A.equals(rwt01C));
+ assertTrue(!rwt01A.equals(rwt01D));
+ assertTrue(rwt01A.compareTo(rwt02A) != 0);
+ assertTrue(rwt01A.compareTo(rwt01C) != 0);
+ assertTrue(rwt01A.compareTo(rwt01D) != 0);
+ assertTrue(rwt01A.getNodeName().equals(n01));
+ assertTrue(rwt01A.getPid() == p10);
+ assertTrue(rwt01A.getTid() == t20);
+ }
+
+ @Test
+ public void test_22() {
+ OperatingInfo oi = new OperatingInfo();
+ oi.setWorkItemCrTotal(100);
+ assertTrue(oi.getWorkItemCrTotal() == 100);
+ oi.setWorkItemCrFetches(50);
+ assertTrue(oi.getWorkItemCrFetches() == 50);
+ oi.setWorkItemEndFailures(55);
+ assertTrue(oi.getWorkItemEndFailures() == 55);
+ oi.setWorkItemEndSuccesses(60);
+ assertTrue(oi.getWorkItemEndSuccesses() == 60);
+ oi.setWorkItemJpAcks(65);
+ assertTrue(oi.getWorkItemJpAcks() == 65);
+ oi.setWorkItemJpGets(70);
+ assertTrue(oi.getWorkItemJpGets() == 70);
+ oi.setWorkItemUserProcessingErrorRetries(75);
+ assertTrue(oi.getWorkItemUserProcessingErrorRetries() == 75);
+ oi.setWorkItemUserProcessingTimeouts(80);
+ assertTrue(oi.getWorkItemUserProcessingTimeouts() == 80);
+ oi.setWorkItemFinishedMillisMin(1000);
+ assertTrue(oi.getWorkItemFinishedMillisMin() == 1000);
+ oi.setWorkItemFinishedMillisMax(2000);
+ assertTrue(oi.getWorkItemFinishedMillisMax() == 2000);
+ oi.setWorkItemFinishedMillisAvg(1500);
+ assertTrue(oi.getWorkItemFinishedMillisAvg() == 1500);
+ oi.setWorkItemRunningMillisMin(1001);
+ assertTrue(oi.getWorkItemRunningMillisMin() == 1001);
+ oi.setWorkItemRunningMillisMax(2001);
+ assertTrue(oi.getWorkItemRunningMillisMax() == 2001);
+ ArrayList<String> pids01 = new ArrayList<String>();
+ pids01.add("011");
+ pids01.add("012");
+ ArrayList<IWorkItemInfo> list = new ArrayList<IWorkItemInfo>();
+ IWorkItemInfo wii = new WorkItemInfo();
+ wii.setNodeName("node01");
+ wii.setPid(1);
+ wii.setTid(1);
+ wii.setOperatingMillis(9991);
+ list.add(wii);
+ wii = new WorkItemInfo();
+ wii.setNodeName("node02");
+ wii.setPid(2);
+ wii.setTid(2);
+ wii.setOperatingMillis(9992);
+ list.add(wii);
+ oi.setActiveWorkItemInfo(list);
+ list = oi.getActiveWorkItemInfo();
+ assertTrue(list.size() == 2);
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test2.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/Test2.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestBase.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestBase.java?rev=1673740&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestBase.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestBase.java Wed Apr 15 12:22:46 2015
@@ -0,0 +1,129 @@
+package org.apache.uima.ducc.container.jd.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.net.URL;
+
+import org.apache.uima.ducc.common.container.FlagsHelper;
+import org.apache.uima.ducc.container.jd.JobDriverException;
+import org.apache.uima.ducc.container.jd.test.helper.Utilities;
+
+public class TestBase {
+
+ private boolean debug = false;
+
+ private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+
+ public void initialize() throws JobDriverException {
+ if(!debug) {
+ System.setOut(new PrintStream(outContent));
+ System.setErr(new PrintStream(errContent));
+ }
+ ducc_home();
+ jd_setup();
+ }
+
+ public String getResource(String name) {
+ String retVal = null;
+ URL urlXml = null;
+ File file = null;
+ String path = null;
+ //
+ urlXml = this.getClass().getResource(name);
+ file = new File(urlXml.getFile());
+ path = file.getAbsolutePath();
+ retVal = path;
+ return retVal;
+ }
+
+ public void ducc_home() {
+ String folder = "/ducc_runtime";
+ String file = "/resources/log4j.xml";
+ String path = getResource(folder+file);
+ String value = path.replace(file, "");
+ String key = "DUCC_HOME";
+ System.setProperty(key, value);
+ }
+
+ public void jd_setup() throws JobDriverException {
+ File working = mkWorkingDir();
+ String directory = working.getAbsolutePath();
+ System.setProperty(FlagsHelper.Name.JobDirectory.pname(), directory);
+ //
+ URL urlXml = null;
+ File file = null;
+ String path = null;
+ //
+ urlXml = this.getClass().getResource("/CR100.xml");
+ file = new File(urlXml.getFile());
+ path = file.getAbsolutePath();
+ System.setProperty(FlagsHelper.Name.CollectionReaderXml.pname(), path);
+ //
+ urlXml = this.getClass().getResource("/DDSleepDescriptor.xml");
+ file = new File(urlXml.getFile());
+ path = file.getAbsolutePath();
+ System.setProperty(FlagsHelper.Name.JpDd.pname(), path);
+ //
+ String userClasspath = Utilities.getInstance().getUserCP();
+ System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
+ }
+
+ public boolean isDebug() {
+ return debug;
+ }
+
+ protected void out_println(String message) {
+ System.out.println(message);
+ }
+
+ protected void debug(String message) {
+ if(isDebug()) {
+ out_println(message);
+ }
+ }
+
+ protected void asExpected(String text) {
+ if(isDebug()) {
+ String message = "as expected: "+text;
+ out_println(message);
+ }
+ }
+
+ protected void asExpected(Exception e) {
+ if(isDebug()) {
+ String message = "as expected: "+e.getMessage();
+ out_println(message);
+ }
+ }
+
+ protected void delete(File directory) {
+ try {
+ for(File file : directory.listFiles()) {
+ debug("delete: "+file.getName());
+ file.delete();
+ }
+ debug("delete: "+directory.getName());
+ directory.delete();
+ }
+ catch(Exception e) {
+ //e.printStackTrace();
+ }
+ }
+
+ protected File mkWorkingDir() {
+ URL url = this.getClass().getResource("/");
+ File root = new File(url.getFile());
+ String name = root.getAbsolutePath();
+ debug(name);
+ assertTrue(root.isDirectory());
+ String nameWorking = name+File.separator+"working";
+ File working = new File(nameWorking);
+ delete(working);
+ working.mkdir();
+ return working;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestBase.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestBase.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestSuite.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestSuite.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestSuite.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestSuite.java Wed Apr 15 12:22:46 2015
@@ -23,23 +23,14 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.net.URL;
-import java.util.ArrayList;
import org.apache.uima.ducc.common.container.FlagsHelper;
import org.apache.uima.ducc.container.common.classloader.ProxyException;
-import org.apache.uima.ducc.container.jd.cas.CasManager;
-import org.apache.uima.ducc.container.jd.cas.CasManagerStats.RetryReason;
import org.apache.uima.ducc.container.jd.classload.ProxyJobDriverCollectionReader;
-import org.apache.uima.ducc.container.jd.mh.RemoteWorkerThread;
-import org.apache.uima.ducc.container.jd.mh.iface.IWorkItemInfo;
-import org.apache.uima.ducc.container.jd.mh.impl.OperatingInfo;
-import org.apache.uima.ducc.container.jd.mh.impl.WorkItemInfo;
import org.apache.uima.ducc.container.jd.test.helper.Utilities;
-import org.apache.uima.ducc.container.net.iface.IMetaCas;
import org.apache.uima.ducc.container.net.impl.MetaCas;
-import org.junit.Test;
-public class TestSuite extends ATest {
+public class TestSuite extends TestBase {
String prefix1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><xmi:XMI xmlns:tcas=\"http:///uima/tcas.ecore\"";
String prefix0 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><xmi:XMI xmlns:cas=\"http:///uima/cas.ecore\"";
@@ -47,7 +38,8 @@ public class TestSuite extends ATest {
private void checkCas(String cas) {
assertTrue(cas.startsWith(prefix0) || cas.startsWith(prefix1));
}
- private void config() {
+
+ protected void config() {
URL urlXml = this.getClass().getResource("/CR100.xml");
File file = new File(urlXml.getFile());
String crXml = file.getAbsolutePath();
@@ -56,7 +48,7 @@ public class TestSuite extends ATest {
System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
}
- private void testIncludeAll() {
+ protected void testIncludeAll() {
try {
config();
new ProxyJobDriverCollectionReader();
@@ -67,15 +59,7 @@ public class TestSuite extends ATest {
}
}
- @Test
- public void test_01() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- testIncludeAll();
- }
-
- private void testExcludeOne(int skip) {
+ protected void testExcludeOne(int skip) {
try {
URL urlXml = this.getClass().getResource("/CR100.xml");
File file = new File(urlXml.getFile());
@@ -111,20 +95,7 @@ public class TestSuite extends ATest {
}
}
- @Test
- public void test_02() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- testExcludeOne(2);
- String userClasspath = Utilities.getInstance().getUserCP();
- String[] cpParts = userClasspath.split(File.pathSeparator);
- for(int i=0; i<cpParts.length; i++) {
- testExcludeOne(i);
- }
- }
-
- private void testNoXml() {
+ protected void testNoXml() {
try {
String userClasspath = Utilities.getInstance().getUserCP();
System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
@@ -140,15 +111,7 @@ public class TestSuite extends ATest {
}
}
- @Test
- public void test_03() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- testNoXml();
- }
-
- private void getTotal() {
+ protected void getTotal() {
try {
config();
ProxyJobDriverCollectionReader pjdcr = new ProxyJobDriverCollectionReader();
@@ -161,17 +124,8 @@ public class TestSuite extends ATest {
fail("Exception");
}
}
-
- @Test
- public void test_04() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- getTotal();
- }
-
- private void getMetaCas() {
+ protected void getMetaCas() {
try {
config();
ProxyJobDriverCollectionReader pjdcr = new ProxyJobDriverCollectionReader();
@@ -191,14 +145,6 @@ public class TestSuite extends ATest {
fail("Exception");
}
}
-
- @Test
- public void test_05() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- getMetaCas();
- }
private void getMetaCases(ProxyJobDriverCollectionReader pjdcr, int total) throws ProxyException {
for(int c=1; c <= total; c++) {
@@ -215,7 +161,7 @@ public class TestSuite extends ATest {
}
}
- private void getMetaCases(int extra) {
+ protected void getMetaCases(int extra) {
try {
config();
ProxyJobDriverCollectionReader pjdcr = new ProxyJobDriverCollectionReader();
@@ -233,143 +179,5 @@ public class TestSuite extends ATest {
fail("Exception");
}
}
-
- @Test
- public void test_06() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- getMetaCases(0);
- }
-
- @Test
- public void test_07() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- getMetaCases(10);
- }
-
- @Test
- public void test_10() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- try {
- config();
- CasManager cm = new CasManager();
- int total = cm.getCasManagerStats().getCrTotal();
- assertTrue(total == 100);
- IMetaCas metaCas = cm.getMetaCas();
- int retrys = 3;
- while(metaCas != null) {
- if(cm.getCasManagerStats().getRetryQueuePuts() < retrys) {
- cm.putMetaCas(metaCas, RetryReason.ProcessPreempt);
- }
- metaCas = cm.getMetaCas();
- }
- int crGets = cm.getCasManagerStats().getCrGets();
- debug("crGets:"+crGets);
- assertTrue(crGets == total);
- int rqPuts = cm.getCasManagerStats().getRetryQueuePuts();
- debug("rqPuts:"+rqPuts);
- int rqGets = cm.getCasManagerStats().getRetryQueueGets();
- debug("rqGets:"+rqGets);
- assertTrue(rqPuts == retrys);
- assertTrue(rqGets == rqPuts);
- asExpected("puts == "+rqPuts);
- asExpected("gets == "+rqGets);
- }
- catch(Exception e) {
- e.printStackTrace();
- fail("Exception");
- }
- }
-
- @Test
- public void test_20() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- String n01 = "node01";
- int p10 = 10;
- int t20 = 20;
- RemoteWorkerThread rwt01A = new RemoteWorkerThread(n01,null,p10+"",p10,t20);
- RemoteWorkerThread rwt01B = new RemoteWorkerThread(n01,null,p10+"",p10,t20);
- assertTrue(rwt01A.equals(rwt01A));
- assertTrue(rwt01B.equals(rwt01B));
- assertTrue(rwt01A.equals(rwt01B));
- assertTrue(rwt01A.compareTo(rwt01A) == 0);
- assertTrue(rwt01B.compareTo(rwt01B) == 0);
- assertTrue(rwt01A.compareTo(rwt01B) == 0);
- String n02 = "node02";
- int p30 = 30;
- int t40 = 40;
- RemoteWorkerThread rwt02A = new RemoteWorkerThread(n02,null,p10+"",p10,t20);
- RemoteWorkerThread rwt01C = new RemoteWorkerThread(n01,null,p30+"",p30,t20);
- RemoteWorkerThread rwt01D = new RemoteWorkerThread(n01,null,p10+"",p10,t40);
- assertTrue(!rwt01A.equals(rwt02A));
- assertTrue(!rwt01A.equals(rwt01C));
- assertTrue(!rwt01A.equals(rwt01D));
- assertTrue(rwt01A.compareTo(rwt02A) != 0);
- assertTrue(rwt01A.compareTo(rwt01C) != 0);
- assertTrue(rwt01A.compareTo(rwt01D) != 0);
- assertTrue(rwt01A.getNodeName().equals(n01));
- assertTrue(rwt01A.getPid() == p10);
- assertTrue(rwt01A.getTid() == t20);
- }
-
- @Test
- public void test_30() {
- if(isDisabled(this.getClass().getName())) {
- return;
- }
- OperatingInfo oi = new OperatingInfo();
- oi.setWorkItemCrTotal(100);
- assertTrue(oi.getWorkItemCrTotal() == 100);
- oi.setWorkItemCrFetches(50);
- assertTrue(oi.getWorkItemCrFetches() == 50);
- oi.setWorkItemEndFailures(55);
- assertTrue(oi.getWorkItemEndFailures() == 55);
- oi.setWorkItemEndSuccesses(60);
- assertTrue(oi.getWorkItemEndSuccesses() == 60);
- oi.setWorkItemJpAcks(65);
- assertTrue(oi.getWorkItemJpAcks() == 65);
- oi.setWorkItemJpGets(70);
- assertTrue(oi.getWorkItemJpGets() == 70);
- oi.setWorkItemUserProcessingErrorRetries(75);
- assertTrue(oi.getWorkItemUserProcessingErrorRetries() == 75);
- oi.setWorkItemUserProcessingTimeouts(80);
- assertTrue(oi.getWorkItemUserProcessingTimeouts() == 80);
- oi.setWorkItemFinishedMillisMin(1000);
- assertTrue(oi.getWorkItemFinishedMillisMin() == 1000);
- oi.setWorkItemFinishedMillisMax(2000);
- assertTrue(oi.getWorkItemFinishedMillisMax() == 2000);
- oi.setWorkItemFinishedMillisAvg(1500);
- assertTrue(oi.getWorkItemFinishedMillisAvg() == 1500);
- oi.setWorkItemRunningMillisMin(1001);
- assertTrue(oi.getWorkItemRunningMillisMin() == 1001);
- oi.setWorkItemRunningMillisMax(2001);
- assertTrue(oi.getWorkItemRunningMillisMax() == 2001);
- ArrayList<String> pids01 = new ArrayList<String>();
- pids01.add("011");
- pids01.add("012");
- ArrayList<IWorkItemInfo> list = new ArrayList<IWorkItemInfo>();
- IWorkItemInfo wii = new WorkItemInfo();
- wii.setNodeName("node01");
- wii.setPid(1);
- wii.setTid(1);
- wii.setOperatingMillis(9991);
- list.add(wii);
- wii = new WorkItemInfo();
- wii.setNodeName("node02");
- wii.setPid(2);
- wii.setTid(2);
- wii.setOperatingMillis(9992);
- list.add(wii);
- oi.setActiveWorkItemInfo(list);
- list = oi.getActiveWorkItemInfo();
- assertTrue(list.size() == 2);
- }
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/classloading/TestClassLoading.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/classloading/TestClassLoading.java?rev=1673740&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/classloading/TestClassLoading.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/classloading/TestClassLoading.java Wed Apr 15 12:22:46 2015
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.test.classloading;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+
+import org.apache.uima.ducc.common.container.FlagsHelper;
+import org.apache.uima.ducc.container.common.classloader.PrivateClassLoader;
+import org.apache.uima.ducc.container.common.classloader.ProxyException;
+import org.apache.uima.ducc.container.dgen.classload.ProxyDeployableGeneration;
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverException;
+import org.apache.uima.ducc.container.jd.classload.ProxyJobDriverCollectionReader;
+import org.apache.uima.ducc.container.jd.classload.ProxyJobDriverDirective;
+import org.apache.uima.ducc.container.jd.classload.ProxyJobDriverErrorHandler;
+import org.apache.uima.ducc.container.jd.test.TestBase;
+import org.apache.uima.ducc.container.jd.test.helper.Utilities;
+import org.apache.uima.ducc.container.jd.user.error.classload.ProxyUserErrorStringify;
+import org.apache.uima.ducc.container.net.impl.MetaCas;
+import org.apache.uima.ducc.user.error.iface.Transformer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestClassLoading extends TestBase {
+
+ protected JobDriver jd;
+
+ @Before
+ public void setUp() throws JobDriverException {
+ initialize();
+ jd = JobDriver.getInstance();
+ }
+
+ @Test
+ public void test_01() {
+ try {
+ String userClasspath = Utilities.getInstance().getUserCP();
+ System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
+ ProxyJobDriverErrorHandler pjdeh = new ProxyJobDriverErrorHandler();
+ String serializedCAS = null;
+ String serializedException = null;
+ pjdeh.handle(serializedCAS, serializedException);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail("Exception " + e);
+ }
+ }
+
+ @Test
+ public void test_02() {
+ try {
+ String userClasspath = Utilities.getInstance().getUserCP();
+ System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
+ URL urlXml = this.getClass().getResource("/CR100.xml");
+ File file = new File(urlXml.getFile());
+ String crXml = file.getAbsolutePath();
+ System.setProperty(FlagsHelper.Name.CollectionReaderXml.pname(), crXml);
+ ProxyJobDriverCollectionReader pjdcr = new ProxyJobDriverCollectionReader();
+ MetaCas mc = pjdcr.getMetaCas();
+ String serializedCAS = mc.getSerializedCas();
+ String serializedException = null;
+ ProxyJobDriverErrorHandler pjdeh = new ProxyJobDriverErrorHandler();
+ pjdeh.handle(serializedCAS, serializedException);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail("Exception");
+ }
+ }
+
+ @Test
+ public void test_03() {
+ try {
+ String userClasspath = Utilities.getInstance().getUserCP();
+ System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
+ URL urlXml = this.getClass().getResource("/CR100.xml");
+ File file = new File(urlXml.getFile());
+ String crXml = file.getAbsolutePath();
+ System.setProperty(FlagsHelper.Name.CollectionReaderXml.pname(), crXml);
+ ProxyJobDriverCollectionReader pjdcr = new ProxyJobDriverCollectionReader();
+ MetaCas mc = pjdcr.getMetaCas();
+ String serializedCAS = mc.getSerializedCas();
+ String serializedException = null;
+ String className = "org.apache.uima.ducc.user.jd.test.helper.TestJdContainerErrorHandler";
+ System.setProperty(FlagsHelper.Name.UserErrorHandlerClassname.pname(), className);
+ ProxyJobDriverErrorHandler pjdeh = new ProxyJobDriverErrorHandler();
+ ProxyJobDriverDirective directive = pjdeh.handle(serializedCAS, serializedException);
+ assertTrue(directive.isKillJob() == true);
+ assertTrue(directive.isKillProcess() == true);
+ assertTrue(directive.isKillWorkItem() == false);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail("Exception");
+ }
+ }
+
+ @Test
+ public void test_04() {
+ try {
+ //TODO fix & keep this test?
+ boolean disabled = true;
+ if(disabled) {
+ return;
+ }
+ String userClasspath = Utilities.getInstance().getUserCP();
+ System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
+ ProxyDeployableGeneration proxy = new ProxyDeployableGeneration();
+ //
+ URL url = this.getClass().getResource("/");
+ File root = new File(url.getFile());
+ String name = root.getAbsolutePath();
+ debug(name);
+ assertTrue(root.isDirectory());
+ String nameWorking = name+File.separator+"working";
+ File working = new File(nameWorking);
+ delete(working);
+ working.mkdir();
+ //
+ String directory = working.getAbsolutePath();
+ String id = "12345";
+ String dgenName = "name";
+ String dgenDescription = "description";
+ Integer dgenThreadCount = new Integer(1);
+ String dgenBrokerURL = "brokerURL";
+ String dgenEndpoint = "endpoint";
+ String dgenFlowController = "flowController";
+ String cmDescriptor = null;
+ List<String> cmOverrides = null;
+ //String aeDescriptor = "org.apache.uima.ducc.test.randomsleep.FixedSleepAE";
+ String aeDescriptor = "FixedSleepAE";
+ List<String> aeOverrides = null;
+ String ccDescriptor = null;
+ List<String> ccOverrides = null;
+ String dgen = proxy.generate(
+ directory,
+ id,
+ dgenName,
+ dgenDescription,
+ dgenThreadCount,
+ dgenBrokerURL,
+ dgenEndpoint,
+ dgenFlowController,
+ cmDescriptor,
+ cmOverrides,
+ aeDescriptor,
+ aeOverrides,
+ ccDescriptor,
+ ccOverrides
+ );
+ debug(dgen);
+ //
+ delete(working);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail("Exception");
+ }
+ }
+
+ //@Test
+ public void test_05() {
+ try {
+ String userClasspath = Utilities.getInstance().getUserCP();
+ System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
+ ProxyUserErrorStringify pues = new ProxyUserErrorStringify();
+ Exception e = new RuntimeException("error test #05");
+ Object serializedException = Transformer.serialize(e);
+ String stringifiedException = pues.convert(serializedException);
+ String prefix = "java.lang.RuntimeException: error test #05";
+ assertTrue(stringifiedException.startsWith(prefix));
+ //System.out.println(stringifiedException);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail("Exception");
+ }
+ }
+
+ //@Test
+ public void test_06() {
+ try {
+ String userClasspath = Utilities.getInstance().getUserCP();
+ System.setProperty(FlagsHelper.Name.UserClasspath.pname(), userClasspath);
+ URL urlXml = this.getClass().getResource("/CrInitException.xml");
+ File file = new File(urlXml.getFile());
+ String crXml = file.getAbsolutePath();
+ System.setProperty(FlagsHelper.Name.CollectionReaderXml.pname(), crXml);
+ new ProxyJobDriverCollectionReader();
+ fail("No Exception?");
+ }
+ catch(ProxyException e) {
+ // as expected
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail("Exception");
+ }
+ }
+
+ //@Test
+ public void test_loader() throws URISyntaxException, IOException {
+
+ // First set up a private class-loaded and verify that its resources exist
+ // and are not also in the system class-loader.
+ String privateCP = "src/test/java/";
+ String privateResource = "org/apache/uima/ducc/container/jd/test/TestClassLoading.java";
+ String publicClass = "org.apache.uima.ducc.container.common.Util";
+
+ URLClassLoader scl = (URLClassLoader) ClassLoader.getSystemClassLoader();
+ URLClassLoader pcl = PrivateClassLoader.create(privateCP);
+ for (URL u : pcl.getURLs()) {
+ if (!(new File(u.getPath())).exists()) {
+ fail("Missing test classpath resource: " + u);
+ }
+ URI ur = u.toURI().normalize();
+ for (URL uu : scl.getURLs()) {
+ if (ur.equals(uu.toURI().normalize())) {
+ fail("Test classpath resource: " + u + " is also in the system class-loader");
+ }
+ }
+ }
+
+ // Check that a private resource can only be loaded from the private class-loader
+ // i.e. no leakage from private to system
+ // (Can't use a class as all in this project are in the public classpath)
+ if (scl.findResource(privateResource) != null) {
+ fail("Found private resource in system class-loader");
+ }
+ if (pcl.findResource(privateResource) == null) {
+ PrivateClassLoader.dump(pcl, 1);
+ fail("Cannot load private resource");
+ }
+
+ // Check that a public class can only be loaded from the system class-loader
+ // i.e. no leakage from public to private
+ try {
+ pcl.loadClass(publicClass);
+ fail("Found public class in private class-loader");
+ } catch (ClassNotFoundException e) {
+ }
+ try {
+ scl.loadClass(publicClass);
+ } catch (ClassNotFoundException e) {
+ fail("Cannot load public class");
+ }
+ // pcl.close(); // Requires Java 1.7
+ }
+
+ @Test
+ public void test_invalid_cp() throws URISyntaxException, IOException {
+
+ // Set up a private class-loader with a couple of missing
+ // or invalid entries that should be quietly ignored
+ String[] privateCP = {"target/classes",
+ "pom.xml*",
+ "unknown-file.jar",
+ "unknown-wildcard/*"};
+
+ URLClassLoader pcl = null;
+ try {
+ pcl = PrivateClassLoader.create(privateCP);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to create private class loader");
+ }
+ URL[] urls = pcl.getURLs();
+
+ if (urls.length != 1) {
+ fail("Should have only one entry in the classpath, not " + urls.length);
+ }
+ File f = new File(privateCP[0]);
+ URL u = f.toURI().toURL();
+ if (!u.equals(urls[0])) {
+ fail("C;asspath should have only " + u);
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/classloading/TestClassLoading.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/classloading/TestClassLoading.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/helper/ThreadInfoFactory.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/helper/ThreadInfoFactory.java?rev=1673740&r1=1673739&r2=1673740&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/helper/ThreadInfoFactory.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/helper/ThreadInfoFactory.java Wed Apr 15 12:22:46 2015
@@ -49,4 +49,13 @@ public class ThreadInfoFactory {
}
return ti;
}
+
+ public ThreadInfo getUnique() {
+ ThreadInfo ti = null;
+ if(list.size() > 0) {
+ ti = list.get(0);
+ list.remove(0);
+ }
+ return ti;
+ }
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler.java?rev=1673740&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler.java Wed Apr 15 12:22:46 2015
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.test.messagehandler;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.uima.ducc.container.jd.mh.MessageHandler;
+import org.apache.uima.ducc.container.jd.test.TestBase;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
+import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
+import org.apache.uima.ducc.container.net.impl.TransactionId;
+
+public class TestMessageHandler extends TestBase {
+
+ protected boolean enforce = true;
+ protected boolean skipAll = true;
+
+ private MetaCasTransaction create(String node, int pid, int tid, Type type) {
+ MetaCasTransaction mct = new MetaCasTransaction();
+ mct.setRequesterNodeName(node);
+ mct.setRequesterProcessName(pid+"");
+ mct.setRequesterProcessId(pid);
+ mct.setRequesterThreadId(tid);
+ mct.setType(type);
+ return mct;
+ }
+
+ private IMetaCas transCommon(MessageHandler messageHandler, MetaCasTransaction trans, int reqNo) {
+ messageHandler.handleMetaCasTransation(trans);
+ IMetaCas metaCas = trans.getMetaCas();
+ if(metaCas != null) {
+ if(reqNo > 0) {
+ String seqNo = ""+reqNo;
+ debug("system key:"+metaCas.getSystemKey());
+ if(enforce) {
+ assertTrue(metaCas.getSystemKey().equals(seqNo));
+ }
+ asExpected("system key == "+seqNo);
+ debug("user key:"+metaCas.getUserKey());
+ if(enforce) {
+ assertTrue(metaCas.getUserKey().equals(seqNo));
+ }
+ asExpected("user key == "+seqNo);
+ }
+ }
+ else {
+ debug("metaCas is null");
+ }
+ return metaCas;
+ }
+
+ protected MetaCasTransaction transGet(MessageHandler messageHandler, String node, int pid, int tid, int reqNo) {
+ debug("Get");
+ MetaCasTransaction trans = create(node, pid, tid, Type.Get);
+ trans.setTransactionId(new TransactionId(reqNo,0));
+ transCommon(messageHandler, trans, reqNo);
+ return trans;
+ }
+
+ protected void transAck(MessageHandler messageHandler, MetaCasTransaction trans, int reqNo) {
+ debug("Ack");
+ trans.setType(Type.Ack);
+ trans.setTransactionId(new TransactionId(reqNo,1));
+ transCommon(messageHandler, trans, reqNo);
+ }
+
+ protected void transEnd(MessageHandler messageHandler, MetaCasTransaction trans, int reqNo) {
+ debug("End");
+ trans.setType(Type.End);
+ trans.setTransactionId(new TransactionId(reqNo,2));
+ transCommon(messageHandler, trans, reqNo);
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/messagehandler/TestMessageHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain