You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/02/18 14:05:21 UTC
svn commit: r1569302 - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/
uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/
uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/
uima-ducc-transport/...
Author: degenaro
Date: Tue Feb 18 13:05:21 2014
New Revision: 1569302
URL: http://svn.apache.org/r1569302
Log:
UIMA-577 DUCC Job Driver (JD) should auto-retry lost work items
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java (with props)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java
uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java Tue Feb 18 13:05:21 2014
@@ -65,11 +65,11 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
-import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
import org.apache.uima.ducc.transport.event.common.IDuccUimaDeployableConfiguration;
import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
import org.apache.uima.ducc.transport.event.common.Rationale;
import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
import org.apache.uima.ducc.transport.event.jd.DuccProcessWorkItemsMap;
@@ -155,9 +155,7 @@ public class JobDriver extends Thread im
workItemStateManager = new WorkItemStateManager(logsjobdir);
synchronizedStats = new SynchronizedStats();
// Prepare UIMA-AS client instance and multiple threads
- ClientThreadFactory factory = new ClientThreadFactory("UimaASClientThread");
- queue = new LinkedBlockingQueue<Runnable>();
- executor = new DynamicThreadPoolExecutor(1, 1, 10, TimeUnit.MICROSECONDS, queue, factory, job.getDuccId());
+ initThreadPool();
client = new BaseUIMAAsynchronousEngine_impl();
workItemListener = new WorkItemListener(this);
client.addStatusCallbackListener(workItemListener);
@@ -206,6 +204,15 @@ public class JobDriver extends Thread im
}
}
+ private void initThreadPool() {
+ if(executor != null) {
+ executor.shutdown();
+ }
+ ClientThreadFactory factory = new ClientThreadFactory("UimaASClientThread");
+ queue = new LinkedBlockingQueue<Runnable>();
+ executor = new DynamicThreadPoolExecutor(1, 1, 10, TimeUnit.MICROSECONDS, queue, factory, job.getDuccId());
+ }
+
public String summarize(Exception e) {
return ExceptionHelper.summarize(e);
}
@@ -248,7 +255,7 @@ public class JobDriver extends Thread im
private void missingCallbackReaper() {
String location = "missingCallbackReaper";
try {
- if(casSource.isEmpty()) {
+ if(casSource.isExhaustedReader()) {
IDuccWorkJob job = getJob();
DriverStatusReport driverStatusReport = getDriverStatusReportLive();
long todo = driverStatusReport.getWorkItemsToDo();
@@ -262,13 +269,14 @@ public class JobDriver extends Thread im
WorkItem workItem = workItems.nextElement();
int seqNo = workItem.getSeqNo();
String casId = workItem.getCasId();
+ duccOut.debug(location, jobid, "seqNo:"+seqNo+" "+"state:"+workItem.getCallbackState().getState().name());
if(workItem.getCallbackState().isPendingCallback()) {
long sTime = workItem.getTimeWindow().getStartLong();
long cTime = System.currentTimeMillis();
long mTime = 1000*60*lostTimeout;
long tdiff = cTime - sTime;
if(tdiff > mTime) {
- duccOut.warn(location, null, "reaping (no callback) seqNo:"+seqNo+" "+"casId:"+casId+" "+"tdiff:"+tdiff);
+ duccOut.debug(location, jobid, "reaping (no callback) seqNo:"+seqNo+" "+"casId:"+casId+" "+"tdiff:"+tdiff);
registerLostCas(workItem.getCasId(), getCasDispatchMap().get(casId));
workItem.lost();
}
@@ -285,6 +293,67 @@ public class JobDriver extends Thread im
}
}
+ private void requeue() throws JobDriverTerminateException {
+ String location = "requeue";
+ if(casSource.hasDelayed()) {
+ if(!job.isProcessReady()) {
+ initThreadPool();
+ ArrayList<CasTuple> list = casSource.releaseLimbo();
+ int released = list.size();
+ if(released > 0) {
+ duccOut.debug(location, jobid, "released:"+released);
+ driverStatusReport.decrementWorkItemsLost(released);
+ driverStatusReport.incrementWorkItemsRetry(released);
+ for(CasTuple casTuple : list) {
+ int seqNo = casTuple.getSeqno();
+ duccOut.info(location, jobid, "seqNo:"+seqNo);
+ }
+ }
+ waitForEligibility();
+ }
+ }
+ }
+
+ private void queue() throws JobDriverTerminateException {
+ String location = "queue";
+ int threadCount = calculateThreadCount();
+ driverStatusReport.setThreadCount(threadCount);
+ executor.changeCorePoolSize(threadCount);
+ if(threadCount > 0) {
+ int poolSize = executor.getCorePoolSize();
+ duccOut.debug(location, jobid, "pool size:"+poolSize);
+ int queueCount = 0;
+ while(isQueueDeficit(threadCount)) {
+ if(!casSource.isExhaustedReader()) {
+ duccOut.debug(location, jobid, "not exhausted reader");
+ queueCASes(1,queue,workItemFactory);
+ queueCount++;
+ continue;
+ }
+ else {
+ duccOut.trace(location, jobid, "exhausted reader");
+ duccOut.debug(location, jobid, "exhausted reader");
+ }
+ if(!casSource.isLimboEmpty()) {
+ if(casSource.hasLimboAvailable()) {
+ duccOut.debug(location, jobid, "limbo available size:"+casSource.getLimboSize());
+ queueCASes(1,queue,workItemFactory);
+ queueCount++;
+ continue;
+ }
+ else {
+ duccOut.debug(location, jobid, "limbo unavailable size:"+casSource.getLimboSize());
+ }
+ }
+ else {
+ duccOut.debug(location, jobid, "limbo empty size:"+casSource.getLimboSize());
+ }
+ break;
+ }
+ duccOut.debug(location, jobid, "newly queued:"+queueCount);
+ }
+ }
+
private void process() throws JobDriverTerminateException {
String location = "process";
try {
@@ -292,7 +361,7 @@ public class JobDriver extends Thread im
if(getJob().isRunnable()) {
uimaAsClientInitialize();
duccOut.info(location, jobid, "jd.step:"+location);
- executor.prestartAllCoreThreads();
+ //executor.prestartAllCoreThreads();
workItemFactory = new WorkItemFactory(client, jobid, this);
queueCASes(1, queue, workItemFactory);
boolean run = true;
@@ -318,44 +387,15 @@ public class JobDriver extends Thread im
}
logState(getJob());
interrupter();
- int threadCount = calculateThreadCount();
- driverStatusReport.setThreadCount(threadCount);
- if(threadCount > 0) {
- executor.changeCorePoolSize(threadCount);
- }
- int poolSize = executor.getCorePoolSize();
- duccOut.debug(location, jobid, "pool size:"+poolSize);
- while(isQueueDeficit(threadCount)) {
- if(!casSource.isExhaustedReader()) {
- duccOut.debug(location, jobid, "not exhausted reader");
- queueCASes(1,queue,workItemFactory);
- continue;
- }
- else {
- duccOut.trace(location, jobid, "exhausted reader");
- }
- if(!casSource.isLimboEmpty()) {
- if(casSource.hasLimboAvailable()) {
- duccOut.debug(location, jobid, "limbo available size:"+casSource.getLimboSize());
- queueCASes(1,queue,workItemFactory);
- continue;
- }
- else {
- duccOut.debug(location, jobid, "limbo unavailable size:"+casSource.getLimboSize());
- }
- }
- else {
- duccOut.debug(location, jobid, "limbo empty size:"+casSource.getLimboSize());
- }
- break;
- }
+ requeue();
+ queue();
+ missingCallbackReaper();
value = driverStatusReport.isComplete();
if(value) {
duccOut.info(location, jobid, "DriverComplete:"+value+" "+"DriverState:"+driverStatusReport.getDriverState());
run = false;
continue;
}
- missingCallbackReaper();
try {
Thread.sleep(10000);
}
@@ -484,8 +524,9 @@ public class JobDriver extends Thread im
CasTuple casTuple = casSource.pop();
driverStatusReport.setWorkItemsFetched(casSource.getSeqNo());
if(casTuple == null) {
- if(casSource.isEmpty()) {
+ if(casSource.isExhaustedReader()) {
driverStatusReport.resetWorkItemsPending();
+ duccOut.debug(location, jobid, "resetWorkItemsPending");
}
break;
}
@@ -537,17 +578,19 @@ public class JobDriver extends Thread im
private void interrupter() {
String location = "interrupter";
CasDispatchMap casDispatchMap = getCasDispatchMap();
- IDuccProcessMap processMap = (IDuccProcessMap) getJob().getProcessMap().deepCopy();
- Iterator<DuccId> iterator = processMap.keySet().iterator();
- while(iterator.hasNext()) {
- DuccId duccId = iterator.next();
- IDuccProcess duccProcess = processMap.get(duccId);
- boolean statusComplete = duccProcess.isComplete();
- boolean statusDeallocated = duccProcess.isDeallocated();
- boolean statusProcessFailed = duccProcess.isFailed();
- if(statusComplete || statusDeallocated || statusProcessFailed) {
- duccOut.debug(location, jobid, duccProcess.getDuccId(), "isComplete:"+statusComplete+" "+"isDeallocated:"+statusDeallocated+" "+"isProcessFailed:"+statusProcessFailed);
- casDispatchMap.interrupt(getJob(), duccProcess);
+ if(casDispatchMap.size() > 0) {
+ IDuccProcessMap processMap = (IDuccProcessMap) getJob().getProcessMap().deepCopy();
+ Iterator<DuccId> iterator = processMap.keySet().iterator();
+ while(iterator.hasNext()) {
+ DuccId duccId = iterator.next();
+ IDuccProcess duccProcess = processMap.get(duccId);
+ boolean statusComplete = duccProcess.isComplete();
+ boolean statusDeallocated = duccProcess.isDeallocated();
+ boolean statusProcessFailed = duccProcess.isFailed();
+ if(statusComplete || statusDeallocated || statusProcessFailed) {
+ duccOut.debug(location, jobid, duccProcess.getDuccId(), "isComplete:"+statusComplete+" "+"isDeallocated:"+statusDeallocated+" "+"isProcessFailed:"+statusProcessFailed);
+ casDispatchMap.interrupt(getJob(), duccProcess);
+ }
}
}
}
@@ -702,6 +745,21 @@ public class JobDriver extends Thread im
casWorkItemMap.remove(workItem.getCasId());
}
+ private void delayedRetry(WorkItem workItem) {
+ String location = "delayedRetry";
+ duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+ duccOut.debug(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+workItem.getCAS().hashCode());
+ remove(workItem);
+ duccOut.debug(location, workItem.getJobId(), workItem.getProcessId(), "size:"+casDispatchMap.size());
+ CasTuple casTuple = workItem.getCasTuple();
+ casTuple.setDelayedRetry();
+ casTuple.setDuccId(new DuccId(-1));
+ casSource.push(casTuple);
+ workItemStateManager.retry(workItem.getSeqNo());
+ workItemInactive();
+ return;
+ }
+
private void retry(WorkItem workItem) {
String location = "retry";
duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
@@ -716,6 +774,7 @@ public class JobDriver extends Thread im
workItemStateManager.retry(workItem.getSeqNo());
return;
}
+
// presume retry if registration info not found
private boolean isRetry(WorkItem workItem) {
@@ -1329,49 +1388,54 @@ public class JobDriver extends Thread im
public void ended(WorkItem workItem) {
String location = "ended";
try {
- waitForLocation(this, workItem);
- workItemInactive();
- duccOut.debug(location, jobid, "action:ended "+getThreadLocationInfo(workItem));
- driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
- driverStatusReport.workItemOperatingEnd(workItem.getCasId());
- if(driverStatusReport.isKillJob()) {
- duccOut.debug(location, jobid, "action:kill-job "+getThreadLocationInfo(workItem));
- // killing job - don't add bother with retry
- }
- else if(driverStatusReport.isKillProcess(workItem.getProcessId())) {
- duccOut.debug(location, jobid, "action:kill-process "+getThreadLocationInfo(workItem));
- retry(workItem);
- // killing process - don't add another work item to queue
- }
- else if(isRetry(workItem)) {
- duccOut.debug(location, jobid, "action:shrink "+getThreadLocationInfo(workItem));
- retry(workItem);
- // must be shrinking - don't add another work item to queue
+ if(!casDispatchMap.containsKey(workItem.getCasId())) {
+ duccOut.warn(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"not dispatched");
}
else {
- duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
- duccOut.debug(location, jobid, "action:completed "+getThreadLocationInfo(workItem));
- workItemStateManager.ended(workItem.getSeqNo());
- driverStatusReport.countWorkItemsProcessingCompleted();
- workItem.getTimeWindow().setEnd(TimeStamp.getCurrentMillis());
- long time = workItem.getTimeWindow().getElapsedMillis();
- synchronizedStats.addValue(time);
- DuccPerWorkItemStatistics perWorkItemStatistics = new DuccPerWorkItemStatistics(
- synchronizedStats.getMax(),
- synchronizedStats.getMin(),
- synchronizedStats.getMean(),
- synchronizedStats.getStandardDeviation()
- );
- driverStatusReport.setPerWorkItemStatistics(perWorkItemStatistics);
- performanceSummaryWriter.getSummaryMap().update(duccOut, workItem.getAnalysisEnginePerformanceMetricsList());
- int casCount = performanceSummaryWriter.getSummaryMap().casCount();
- int endCount = driverStatusReport.getWorkItemsProcessingCompleted();
- String message = "casCount:"+casCount+" "+"endCount:"+endCount;
- duccOut.debug(location, jobid, message);
- remove(workItem);
- recycleCAS(workItem);
- accountingWorkItemIsDone(workItem.getProcessId(),time);
- queueCASes(1,queue,workItemFactory);
+ waitForLocation(this, workItem);
+ workItemInactive();
+ duccOut.debug(location, jobid, "action:ended "+getThreadLocationInfo(workItem));
+ driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
+ driverStatusReport.workItemOperatingEnd(workItem.getCasId());
+ if(driverStatusReport.isKillJob()) {
+ duccOut.debug(location, jobid, "action:kill-job "+getThreadLocationInfo(workItem));
+ // killing job - don't add bother with retry
+ }
+ else if(driverStatusReport.isKillProcess(workItem.getProcessId())) {
+ duccOut.debug(location, jobid, "action:kill-process "+getThreadLocationInfo(workItem));
+ retry(workItem);
+ // killing process - don't add another work item to queue
+ }
+ else if(isRetry(workItem)) {
+ duccOut.debug(location, jobid, "action:shrink "+getThreadLocationInfo(workItem));
+ retry(workItem);
+ // must be shrinking - don't add another work item to queue
+ }
+ else {
+ duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+ duccOut.debug(location, jobid, "action:completed "+getThreadLocationInfo(workItem));
+ workItemStateManager.ended(workItem.getSeqNo());
+ driverStatusReport.countWorkItemsProcessingCompleted();
+ workItem.getTimeWindow().setEnd(TimeStamp.getCurrentMillis());
+ long time = workItem.getTimeWindow().getElapsedMillis();
+ synchronizedStats.addValue(time);
+ DuccPerWorkItemStatistics perWorkItemStatistics = new DuccPerWorkItemStatistics(
+ synchronizedStats.getMax(),
+ synchronizedStats.getMin(),
+ synchronizedStats.getMean(),
+ synchronizedStats.getStandardDeviation()
+ );
+ driverStatusReport.setPerWorkItemStatistics(perWorkItemStatistics);
+ performanceSummaryWriter.getSummaryMap().update(duccOut, workItem.getAnalysisEnginePerformanceMetricsList());
+ int casCount = performanceSummaryWriter.getSummaryMap().casCount();
+ int endCount = driverStatusReport.getWorkItemsProcessingCompleted();
+ String message = "casCount:"+casCount+" "+"endCount:"+endCount;
+ duccOut.debug(location, jobid, message);
+ remove(workItem);
+ recycleCAS(workItem);
+ accountingWorkItemIsDone(workItem.getProcessId(),time);
+ queueCASes(1,queue,workItemFactory);
+ }
}
}
catch(Exception e) {
@@ -1439,6 +1503,22 @@ public class JobDriver extends Thread im
String location = "lost";
try {
duccOut.info(location, workItem.getJobId(), "seqNo:"+workItem.getSeqNo());
+ driverStatusReport.workItemDequeued(workItem.getCasId());
+ driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
+ driverStatusReport.workItemOperatingEnd(workItem.getCasId());
+ driverStatusReport.incrementWorkItemsLost();
+ delayedRetry(workItem);
+ }
+ catch(Exception exception) {
+ duccOut.error(location, jobid, "processing error?", exception);
+ }
+ }
+
+ /*
+ public void lost(WorkItem workItem) {
+ String location = "lost";
+ try {
+ duccOut.info(location, workItem.getJobId(), "seqNo:"+workItem.getSeqNo());
workItemInactive();
driverStatusReport.workItemDequeued(workItem.getCasId());
driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
@@ -1454,86 +1534,92 @@ public class JobDriver extends Thread im
duccOut.error(location, jobid, "processing error?", exception);
}
}
+ */
public void exception(WorkItem workItem, Exception e) {
String location = "exception";
try {
- duccOut.debug(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
- duccOut.debug(location, jobid, "action:exception "+getThreadLocationInfo(workItem), e);
- boolean timeout = false;
- if(ExceptionClassifier.isTimeout(e)) {
- ArrayList<WorkItem> removalsList = new ArrayList<WorkItem>();
- removalsList.add(workItem);
- removeLocations(removalsList);
- timeout = true;
+ if(!casDispatchMap.containsKey(workItem.getCasId())) {
+ duccOut.warn(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"not dispatched");
}
else {
- duccOut.debug(location, jobid, "action:location-wait "+getThreadLocationInfo(workItem), e);
- waitForLocation(this, workItem);
- }
- workItemInactive();
- driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
- driverStatusReport.workItemOperatingEnd(workItem.getCasId());
- if(driverStatusReport.isKillJob()) {
- duccOut.debug(location, jobid, "action:kill-job "+getThreadLocationInfo(workItem), e);
- // killing job - don't add bother with retry
- }
- else if(timeout) {
- duccOut.debug(location, jobid, "action:timeout "+getThreadLocationInfo(workItem), e);
- // <UIMA-3600>
- waitForProcessStatus(this, workItem);
- if(isNodeFailure(workItem)) {
- duccOut.debug(location, jobid, "action:timeout-node-failure-retry "+getThreadLocationInfo(workItem), e);
- retry(workItem);
- // node failed, work item interrupted - don't add another work item to queue
+ duccOut.debug(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+ duccOut.debug(location, jobid, "action:exception "+getThreadLocationInfo(workItem), e);
+ boolean timeout = false;
+ if(ExceptionClassifier.isTimeout(e)) {
+ ArrayList<WorkItem> removalsList = new ArrayList<WorkItem>();
+ removalsList.add(workItem);
+ removeLocations(removalsList);
+ timeout = true;
}
else {
- duccOut.debug(location, jobid, "action:timeout-handler "+getThreadLocationInfo(workItem), e);
- employPluginExceptionHandler(workItem, e);
+ duccOut.debug(location, jobid, "action:location-wait "+getThreadLocationInfo(workItem), e);
+ waitForLocation(this, workItem);
}
- // </UIMA-3600>
- }
- else if(isUnknownProcess(workItem)) {
- duccOut.debug(location, jobid, "action:unknown-process "+getThreadLocationInfo(workItem), e);
- retry(workItem);
- // unknown process (no callbacks) - don't add another work item to queue
- }
- else if(driverStatusReport.isKillProcess(workItem.getProcessId())) {
- duccOut.debug(location, jobid, "action:kill-process "+getThreadLocationInfo(workItem), e);
- retry(workItem);
- // killing process - don't add another work item to queue
- }
- else if(isFailedProcess(workItem)) {
- if(ExceptionClassifier.isInterrupted(e)) {
- duccOut.debug(location, jobid, "action:fail-process-retry "+getThreadLocationInfo(workItem), e);
+ workItemInactive();
+ driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
+ driverStatusReport.workItemOperatingEnd(workItem.getCasId());
+ if(driverStatusReport.isKillJob()) {
+ duccOut.debug(location, jobid, "action:kill-job "+getThreadLocationInfo(workItem), e);
+ // killing job - don't add bother with retry
+ }
+ else if(timeout) {
+ duccOut.debug(location, jobid, "action:timeout "+getThreadLocationInfo(workItem), e);
+ // <UIMA-3600>
+ waitForProcessStatus(this, workItem);
+ if(isNodeFailure(workItem)) {
+ duccOut.debug(location, jobid, "action:timeout-node-failure-retry "+getThreadLocationInfo(workItem), e);
+ retry(workItem);
+ // node failed, work item interrupted - don't add another work item to queue
+ }
+ else {
+ duccOut.debug(location, jobid, "action:timeout-handler "+getThreadLocationInfo(workItem), e);
+ employPluginExceptionHandler(workItem, e);
+ }
+ // </UIMA-3600>
+ }
+ else if(isUnknownProcess(workItem)) {
+ duccOut.debug(location, jobid, "action:unknown-process "+getThreadLocationInfo(workItem), e);
+ retry(workItem);
+ // unknown process (no callbacks) - don't add another work item to queue
+ }
+ else if(driverStatusReport.isKillProcess(workItem.getProcessId())) {
+ duccOut.debug(location, jobid, "action:kill-process "+getThreadLocationInfo(workItem), e);
+ retry(workItem);
+ // killing process - don't add another work item to queue
+ }
+ else if(isFailedProcess(workItem)) {
+ if(ExceptionClassifier.isInterrupted(e)) {
+ duccOut.debug(location, jobid, "action:fail-process-retry "+getThreadLocationInfo(workItem), e);
+ retry(workItem);
+ // process failed, work item interrupted - don't add another work item to queue
+ }
+ else {
+ duccOut.debug(location, jobid, "action:fail-process-handler "+getThreadLocationInfo(workItem), e);
+ employPluginExceptionHandler(workItem, e);
+ }
+ }
+ else if(isRetry(workItem)) {
+ duccOut.debug(location, jobid, "action:shrink "+getThreadLocationInfo(workItem), e);
retry(workItem);
- // process failed, work item interrupted - don't add another work item to queue
+ // must be shrinking - don't add another work item to queue
+ }
+ else if(isError(workItem, e)) {
+ duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+ duccOut.debug(location, jobid, "action:error "+getThreadLocationInfo(workItem), e);
+ workItemStateManager.error(workItem.getSeqNo());
+ workItemError(workItem, e);
+ remove(workItem);
+ recycleCAS(workItem);
+ accountingWorkItemIsError(workItem.getProcessId());
+ queueCASes(1,queue,workItemFactory);
}
else {
- duccOut.debug(location, jobid, "action:fail-process-handler "+getThreadLocationInfo(workItem), e);
- employPluginExceptionHandler(workItem, e);
+ duccOut.debug(location, jobid, "action:retry "+getThreadLocationInfo(workItem), e);
+ retry(workItem);
+ queueCASes(1,queue,workItemFactory);
}
}
- else if(isRetry(workItem)) {
- duccOut.debug(location, jobid, "action:shrink "+getThreadLocationInfo(workItem), e);
- retry(workItem);
- // must be shrinking - don't add another work item to queue
- }
- else if(isError(workItem, e)) {
- duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
- duccOut.debug(location, jobid, "action:error "+getThreadLocationInfo(workItem), e);
- workItemStateManager.error(workItem.getSeqNo());
- workItemError(workItem, e);
- remove(workItem);
- recycleCAS(workItem);
- accountingWorkItemIsError(workItem.getProcessId());
- queueCASes(1,queue,workItemFactory);
- }
- else {
- duccOut.debug(location, jobid, "action:retry "+getThreadLocationInfo(workItem), e);
- retry(workItem);
- queueCASes(1,queue,workItemFactory);
- }
}
catch(Exception exception) {
duccOut.error(location, jobid, "processing error?", exception);
@@ -1559,11 +1645,13 @@ public class JobDriver extends Thread im
return;
}
+ /*
private void workItemLost(WorkItem workItem) {
String location = "workItemLost";
driverStatusReport.countWorkItemsLost();
duccOut.error(location, workItem.getJobId(), "seqNo:"+workItem.getSeqNo());
}
+ */
private void workItemError(WorkItem workItem, Throwable t) {
workItemError(workItem, t, null);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java Tue Feb 18 13:05:21 2014
@@ -22,7 +22,10 @@ public class CallbackState {
public static enum State { PendingQueued, PendingAssigned, NotPending };
- public State state = State.NotPending;
+ private State state = State.NotPending;
+
+ public CallbackState() {
+ }
private void setState(State value) {
synchronized(state) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java Tue Feb 18 13:05:21 2014
@@ -18,6 +18,7 @@
*/
package org.apache.uima.ducc.jd.client;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -115,13 +116,13 @@ public class CasLimbo {
String location = "use";
DuccId pDuccId = casTuple.getDuccId();
if(pDuccId == null) {
- duccOut.warn(location, getJobId(), casTuple.getDuccId(), "process ID null");
+ duccOut.debug(location, getJobId(), casTuple.getDuccId(), "process ID null");
}
else {
IDuccProcessMap processMap = getProcessMap();
IDuccProcess process = processMap.getProcess(pDuccId);
if(process == null) {
- duccOut.warn(location, getJobId(), casTuple.getDuccId(), "process is null");
+ duccOut.debug(location, getJobId(), casTuple.getDuccId(), "process is null");
}
else {
if(process.isPreempted()) {
@@ -142,16 +143,19 @@ public class CasLimbo {
String location = "isAvailable";
boolean retVal = false;
DuccId pDuccId = casTuple.getDuccId();
- if(pDuccId == null) {
+ if(casTuple.isDelayedRetry()) {
+ //retVal = false;
+ }
+ else if(pDuccId == null) {
retVal = true;
- duccOut.warn(location, getJobId(), casTuple.getDuccId(), "process ID is null");
+ duccOut.debug(location, getJobId(), casTuple.getDuccId(), "process ID is null");
}
else {
IDuccProcessMap processMap = getProcessMap();
IDuccProcess process = processMap.getProcess(pDuccId);
if(process == null) {
retVal = true;
- duccOut.warn(location, getJobId(), casTuple.getDuccId(), "process is null");
+ duccOut.debug(location, getJobId(), casTuple.getDuccId(), "process is null");
}
else {
if(process.isDefunct()) {
@@ -175,6 +179,34 @@ public class CasLimbo {
return retVal;
}
+ public ArrayList<CasTuple> release() {
+ String location = "release";
+ ArrayList<CasTuple> retVal = new ArrayList<CasTuple>();
+ Iterator<CasTuple> iterator = tupleQueue.iterator();
+ while(iterator.hasNext()) {
+ CasTuple casTuple = iterator.next();
+ boolean released = casTuple.undelay();
+ if(released) {
+ duccOut.debug(location, getJobId(), "seqNo:"+casTuple.getSeqno());
+ casTuple.setRetry();
+ retVal.add(casTuple);
+ }
+ }
+ return retVal;
+ }
+
+ public int delayedSize() {
+ Iterator<CasTuple> iterator = tupleQueue.iterator();
+ int count = 0;
+ while(iterator.hasNext()) {
+ CasTuple casTuple = iterator.next();
+ if(casTuple.isDelayedRetry()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
public int size() {
String location = "size";
int retVal = tupleQueue.size();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java Tue Feb 18 13:05:21 2014
@@ -47,6 +47,7 @@ import org.apache.uima.resource.metadata
import org.apache.uima.resource.metadata.FsIndexDescription;
import org.apache.uima.resource.metadata.TypePriorities;
import org.apache.uima.resource.metadata.TypeSystemDescription;
+import org.apache.uima.util.CasCopier;
import org.apache.uima.util.CasCreationUtils;
import org.apache.uima.util.InvalidXMLException;
import org.apache.uima.util.Progress;
@@ -146,6 +147,14 @@ public class CasSource {
initTotal();
}
+ private CAS clone(CAS casOriginal, int seqNo) throws ResourceInitializationException {
+ String location = "clone";
+ CAS casClone = getEmptyCas(seqNo);
+ CasCopier.copyCas(casOriginal, casClone, true);
+ duccOut.debug(location, null, "seqNo:"+seqNo+" "+"casId:"+casOriginal.hashCode()+" "+"casId:"+casClone.hashCode());
+ return casClone;
+ }
+
private CAS getEmptyCas(int seqNo) throws ResourceInitializationException {
String location = "getEmptyCas";
CAS cas = getRecycledCas();
@@ -220,6 +229,14 @@ public class CasSource {
return casLimbo.hasAvailable();
}
+ public ArrayList<CasTuple> releaseLimbo() {
+ return casLimbo.release();
+ }
+
+ public boolean hasDelayed() {
+ return casLimbo.delayedSize() > 0;
+ }
+
public boolean isEmpty() {
boolean retVal = false;
if(isExhaustedReader()) {
@@ -233,7 +250,13 @@ public class CasSource {
public CasTuple pop() throws Exception {
String location = "pop";
CasTuple casTuple = casLimbo.get();
- if(casTuple == null) {
+ if(casTuple != null) {
+ CAS casOriginal = casTuple.getCas();
+ int seqNo = casTuple.getSeqno();
+ CAS casClone = clone(casOriginal, seqNo);
+ casTuple.setCas(casClone);
+ }
+ else {
try {
synchronized(cr) {
if((total > 0) && (total == seqNo.get())) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java Tue Feb 18 13:05:21 2014
@@ -26,6 +26,7 @@ public class CasTuple {
private CAS cas;
private int seqno;
private boolean retry = false;
+ private boolean delayedRetry = false;
private DuccId pDuccId= null;
public CasTuple(CAS cas, int seqno) {
@@ -37,6 +38,10 @@ public class CasTuple {
this.seqno = seqno;
}
+ public void setCas(CAS value) {
+ cas = value;
+ }
+
public CAS getCas() {
return cas;
}
@@ -57,6 +62,23 @@ public class CasTuple {
return retry;
}
+ public void setDelayedRetry() {
+ delayedRetry = true;
+ }
+
+ public boolean isDelayedRetry() {
+ return delayedRetry;
+ }
+
+ public boolean undelay() {
+ boolean retVal = delayedRetry;
+ if(delayedRetry) {
+ retry = true;
+ delayedRetry = false;
+ }
+ return retVal;
+ }
+
public DuccId getDuccId() {
return pDuccId;
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java Tue Feb 18 13:05:21 2014
@@ -19,7 +19,6 @@
package org.apache.uima.ducc.jd.client;
import java.util.ArrayList;
-import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
@@ -65,10 +64,6 @@ public class WorkItem implements Runnabl
private String keyUimaAsClientTracking = "UimaAsClientTracking";
private boolean uimaAsClientTracking = false;
- // <for testing only!!!>
- private final static boolean injectRandom = false;
- // </for testing only!!!>
-
public WorkItem(UimaAsynchronousEngine client, CasTuple casTuple, DuccId duccId, IWorkItemMonitor workItemMonitor) {
init(client, casTuple, duccId, workItemMonitor);
}
@@ -149,16 +144,8 @@ public class WorkItem implements Runnabl
}
client.sendAndReceiveCAS(cas, analysisEnginePerformanceMetricsList);
if(uimaAsClientTracking) {
- duccOut.info(methodName, null, "seqNo:"+getSeqNo()+" "+"send and receive returned");
- }
- // <for testing only!!!>
- if(injectRandom) {
- Random random = new Random();
- if(random.nextBoolean()) {
- throw new Throwable("just testing Throwable handler");
- }
+ duccOut.debug(methodName, null, "seqNo:"+getSeqNo()+" "+"send and receive returned");
}
- // </for testing only!!!>
if(!isLost.get()) {
ended();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java Tue Feb 18 13:05:21 2014
@@ -26,6 +26,9 @@ import org.apache.uima.ducc.common.utils
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.jd.IJobDriver;
+import org.apache.uima.ducc.jd.client.ThreadLocation;
+import org.apache.uima.ducc.jd.client.WorkItem;
+import org.apache.uima.ducc.jd.reliability.FaultInjector;
public class WorkItemListener extends UimaAsBaseCallbackListener {
@@ -41,12 +44,10 @@ public class WorkItemListener extends Ui
private String keyUimaAsClientTracking = "UimaAsClientTracking";
private boolean uimaAsClientTracking = false;
-
+
// <for testing only!!!>
private final static boolean asynchronous = false;
- private final static boolean injectLost1 = false;
- private final static boolean injectLost2 = false;
- private final static boolean injectDelay3 = false;
+ private final static FaultInjector faultInjector = FaultInjector.getInstance();
// </for testing only!!!>
public WorkItemListener(IJobDriver jobDriver) {
@@ -96,17 +97,8 @@ public class WorkItemListener extends Ui
String methodName = "OnBeforeMessageSendHandler";
try {
// <for testing only!!!>
- if(injectLost1) {
- String casId = null;
- casId = ""+status.getCAS().hashCode();
- WorkItem wi = jobDriver.getWorkItem(casId);
- wi.getCallbackState().statePendingAssigned();
- duccOut.warn(methodName, jobid, "seqNo:"+wi.getSeqNo()+" "+wi.getCallbackState().getState());
- int seqNo = wi.getSeqNo();
- if(seqNo == 1) {
- duccOut.warn(methodName, jobid, "callback #1 discarded seqNo:"+seqNo+" "+"casId:"+casId);
- return;
- }
+ if(faultInjector.isFaultCallBack1(status, jobDriver)) {
+ return;
}
// </for testing only!!!>
String casId = ""+status.getCAS().hashCode();
@@ -192,31 +184,8 @@ public class WorkItemListener extends Ui
String methodName = "OnBeforeProcessCASHandler";
try {
// <for testing only!!!>
- if(injectLost2) {
- String casId = null;
- casId = ""+status.getCAS().hashCode();
- WorkItem wi = jobDriver.getWorkItem(casId);
- wi.getCallbackState().statePendingAssigned();
- duccOut.warn(methodName, jobid, "seqNo:"+wi.getSeqNo()+" "+wi.getCallbackState().getState());
- int seqNo = wi.getSeqNo();
- if(seqNo == 2) {
- duccOut.warn(methodName, jobid, "callback #2 discarded seqNo:"+seqNo+" "+"casId:"+casId);
- return;
- }
- }
- if(injectDelay3) {
- String casId = null;
- casId = ""+status.getCAS().hashCode();
- WorkItem wi = jobDriver.getWorkItem(casId);
- int seqNo = wi.getSeqNo();
- if(seqNo == 3) {
- duccOut.warn(methodName, jobid, "callback delayed seqNo:"+seqNo+" "+"casId:"+casId);
- try {
- Thread.sleep(70*1000);
- }
- catch(Exception e) {
- }
- }
+ if(faultInjector.isFaultCallBack2(status, jobDriver)) {
+ return;
}
// </for testing only!!!>
String casId = ""+status.getCAS().hashCode();
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java?rev=1569302&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java Tue Feb 18 13:05:21 2014
@@ -0,0 +1,127 @@
+package org.apache.uima.ducc.jd.reliability;
+
+import java.util.Random;
+
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.jd.IJobDriver;
+import org.apache.uima.ducc.jd.client.WorkItem;
+
+public class FaultInjector {
+
+ private static DuccLogger duccOut = DuccLoggerComponents.getJdOut(FaultInjector.class.getName());
+
+ private static FaultInjector instance = new FaultInjector();
+
+ public static FaultInjector getInstance() {
+ return instance;
+ }
+
+ private Random random = new Random();
+
+ // Warning: setting enabled to true will inject faults by randomly ignoring UIMA-AS client callbacks
+
+ private boolean enabled = false;
+
+ private enum Type { CallBack1, CallBack2 };
+
+ private double pct1 = 0.1;
+ private double pct2 = 0.1;
+
+ private FaultInjector() {
+ try {
+ initialize();
+ }
+ catch(Exception e) {
+ }
+ }
+
+ private void initialize() {
+ DuccPropertiesResolver dpr = DuccPropertiesResolver.getInstance();
+ String cb1 = dpr.getProperty("ducc.jd.fault.injector.CallBack1");
+ double dv1 = initValue(cb1);
+ if(dv1 > 0.0) {
+ pct1 = dv1;
+ enabled = true;
+ }
+ String cb2 = dpr.getProperty("ducc.jd.fault.injector.CallBack2");
+ double dv2 = initValue(cb2);
+ if(dv2 > 0.0) {
+ pct2 = dv2;
+ enabled = true;
+ }
+ }
+
+ private double initValue(String value) {
+ double retVal = -1;
+ try {
+ if(value != null) {
+ double dVal = Double.parseDouble(value);
+ if(dVal > 0) {
+ if(dVal < 1) {
+ retVal = dVal;
+ }
+ }
+ }
+ }
+ catch(Exception e) {
+ }
+ return retVal;
+ }
+
+ private boolean isFault(DuccId jobid, int seqNo, Type type) {
+ String location = "isFault";
+ boolean retVal = false;
+ try {
+ if(enabled) {
+ double d1 = random.nextDouble();
+ double d0 = d1;
+ switch(type) {
+ case CallBack1:
+ d0 = pct1;
+ break;
+ case CallBack2:
+ d0 = pct2;
+ break;
+ }
+ if(d1 < d0) {
+ duccOut.info(location, jobid, "seqNo:"+seqNo+" "+"type:"+type.name());
+ retVal = true;
+ }
+ }
+ }
+ catch(Exception e) {
+ duccOut.error(location, jobid, e);
+ }
+ return retVal;
+ }
+
+ private boolean isFault(UimaASProcessStatus status, IJobDriver jobDriver, Type type) {
+ String location = "isFault";
+ DuccId jobid = null;
+ boolean retVal = false;
+ try {
+ jobid = jobDriver.getJob().getDuccId();
+ String casId = ""+status.getCAS().hashCode();
+ WorkItem wi = jobDriver.getWorkItem(casId);
+ int seqNo = wi.getSeqNo();
+ retVal = isFault(jobid, seqNo, type);
+ }
+ catch(Exception e) {
+ duccOut.error(location, jobid, e);
+ }
+ return retVal;
+ }
+
+ public boolean isFaultCallBack1(UimaASProcessStatus status, IJobDriver jobDriver) {
+ return isFault(status, jobDriver, Type.CallBack1);
+ }
+
+ public boolean isFaultCallBack2(UimaASProcessStatus status, IJobDriver jobDriver) {
+ return isFault(status, jobDriver, Type.CallBack2);
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java Tue Feb 18 13:05:21 2014
@@ -370,22 +370,46 @@ public class DriverStatusReport implemen
return workItemsProcessingError.get();
}
- public void countWorkItemsLost() {
+ public void incrementWorkItemsLost() {
workItemsLost.incrementAndGet();
calculateState();
logReport();
}
+ public void decrementWorkItemsLost() {
+ workItemsLost.decrementAndGet();
+ calculateState();
+ logReport();
+ }
+
+ public void decrementWorkItemsLost(int value) {
+ int delta = 0 - value;
+ workItemsLost.addAndGet(delta);
+ calculateState();
+ logReport();
+ }
+
public int getWorkItemsLost() {
return workItemsLost.get();
}
+
+ public boolean isWorkItemsLost() {
+ return workItemsLost.get() > 0;
+ }
+
public void countWorkItemsRetry() {
workItemsRetry.incrementAndGet();
calculateState();
logReport();
}
+ public void incrementWorkItemsRetry(int value) {
+ workItemsRetry.addAndGet(value);
+ calculateState();
+ logReport();
+ }
+
public int getWorkItemsRetry() {
return workItemsRetry.get();
}
@@ -622,6 +646,9 @@ public class DriverStatusReport implemen
else if(isPending()) {
setDriverState(DriverState.Idle);
}
+ else if(isWorkItemsLost()) {
+ setDriverState(DriverState.Idle);
+ }
else {
setDriverState(DriverState.Completing);
}
@@ -631,6 +658,9 @@ public class DriverStatusReport implemen
if(isPending()) {
setDriverState(DriverState.Idle);
}
+ else if(isWorkItemsLost()) {
+ setDriverState(DriverState.Idle);
+ }
else {
setDriverState(DriverState.Completing);
}