You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/02/18 14:05:21 UTC

svn commit: r1569302 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/ uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/ uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/ uima-ducc-transport/...

Author: degenaro
Date: Tue Feb 18 13:05:21 2014
New Revision: 1569302

URL: http://svn.apache.org/r1569302
Log:
UIMA-577 DUCC Job Driver (JD) should auto-retry lost work items

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java   (with props)
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/JobDriver.java Tue Feb 18 13:05:21 2014
@@ -65,11 +65,11 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
 import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
-import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
 import org.apache.uima.ducc.transport.event.common.IDuccUimaDeployableConfiguration;
 import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
 import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
 import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ProcessDeallocationType;
 import org.apache.uima.ducc.transport.event.common.Rationale;
 import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
 import org.apache.uima.ducc.transport.event.jd.DuccProcessWorkItemsMap;
@@ -155,9 +155,7 @@ public class JobDriver extends Thread im
 			workItemStateManager = new WorkItemStateManager(logsjobdir);
 			synchronizedStats = new SynchronizedStats();
 			// Prepare UIMA-AS client instance and multiple threads
-			ClientThreadFactory factory = new ClientThreadFactory("UimaASClientThread");
-			queue = new LinkedBlockingQueue<Runnable>();
-			executor = new DynamicThreadPoolExecutor(1, 1, 10, TimeUnit.MICROSECONDS, queue, factory, job.getDuccId());
+			initThreadPool();
 			client = new BaseUIMAAsynchronousEngine_impl();
 			workItemListener = new WorkItemListener(this);
 			client.addStatusCallbackListener(workItemListener);
@@ -206,6 +204,15 @@ public class JobDriver extends Thread im
 		}
 	}
 	
+	private void initThreadPool() {
+		if(executor != null) {
+			executor.shutdown();
+		}
+		ClientThreadFactory factory = new ClientThreadFactory("UimaASClientThread");
+		queue = new LinkedBlockingQueue<Runnable>();
+		executor = new DynamicThreadPoolExecutor(1, 1, 10, TimeUnit.MICROSECONDS, queue, factory, job.getDuccId());
+	}
+	
 	public String summarize(Exception e) {
 		return ExceptionHelper.summarize(e);
 	}
@@ -248,7 +255,7 @@ public class JobDriver extends Thread im
 	private void missingCallbackReaper() {
 		String location = "missingCallbackReaper";
 		try {
-			if(casSource.isEmpty()) {
+			if(casSource.isExhaustedReader()) {
 				IDuccWorkJob job = getJob();
 				DriverStatusReport driverStatusReport = getDriverStatusReportLive();
 				long todo = driverStatusReport.getWorkItemsToDo();
@@ -262,13 +269,14 @@ public class JobDriver extends Thread im
 								WorkItem workItem = workItems.nextElement();
 								int seqNo = workItem.getSeqNo();
 								String casId = workItem.getCasId();
+								duccOut.debug(location, jobid, "seqNo:"+seqNo+" "+"state:"+workItem.getCallbackState().getState().name());
 								if(workItem.getCallbackState().isPendingCallback()) {
 									long sTime = workItem.getTimeWindow().getStartLong();
 									long cTime = System.currentTimeMillis();
 									long mTime = 1000*60*lostTimeout;
 									long tdiff = cTime - sTime;
 									if(tdiff > mTime) {
-										duccOut.warn(location, null, "reaping (no callback) seqNo:"+seqNo+" "+"casId:"+casId+" "+"tdiff:"+tdiff);
+										duccOut.debug(location, jobid, "reaping (no callback) seqNo:"+seqNo+" "+"casId:"+casId+" "+"tdiff:"+tdiff);
 										registerLostCas(workItem.getCasId(), getCasDispatchMap().get(casId));
 										workItem.lost();
 									}
@@ -285,6 +293,67 @@ public class JobDriver extends Thread im
 		}
 	}
 	
+	private void requeue() throws JobDriverTerminateException {
+		String location = "requeue";
+		if(casSource.hasDelayed()) {
+			if(!job.isProcessReady()) {
+				initThreadPool();
+				ArrayList<CasTuple> list = casSource.releaseLimbo();
+				int released = list.size();
+				if(released > 0) {
+					duccOut.debug(location, jobid, "released:"+released);
+					driverStatusReport.decrementWorkItemsLost(released);
+					driverStatusReport.incrementWorkItemsRetry(released);
+					for(CasTuple casTuple : list) {
+						int seqNo = casTuple.getSeqno();
+						duccOut.info(location, jobid, "seqNo:"+seqNo);
+					}
+				}
+				waitForEligibility();
+			}
+		}
+	}
+	
+	private void queue() throws JobDriverTerminateException {
+		String location = "queue";
+		int threadCount = calculateThreadCount();
+		driverStatusReport.setThreadCount(threadCount);
+		executor.changeCorePoolSize(threadCount);
+		if(threadCount > 0) {
+			int poolSize = executor.getCorePoolSize();
+			duccOut.debug(location, jobid, "pool size:"+poolSize);
+			int queueCount = 0;
+			while(isQueueDeficit(threadCount)) {
+				if(!casSource.isExhaustedReader()) {
+					duccOut.debug(location, jobid, "not exhausted reader");
+					queueCASes(1,queue,workItemFactory);
+					queueCount++;
+					continue;
+				}
+				else {
+					duccOut.trace(location, jobid, "exhausted reader");
+					duccOut.debug(location, jobid, "exhausted reader");
+				}
+				if(!casSource.isLimboEmpty()) {
+					if(casSource.hasLimboAvailable()) {
+						duccOut.debug(location, jobid, "limbo available size:"+casSource.getLimboSize());
+						queueCASes(1,queue,workItemFactory);
+						queueCount++;
+						continue;
+					}
+					else {
+						duccOut.debug(location, jobid, "limbo unavailable size:"+casSource.getLimboSize());
+					}
+				}
+				else {
+					duccOut.debug(location, jobid, "limbo empty size:"+casSource.getLimboSize());
+				}
+				break;
+			}
+			duccOut.debug(location, jobid, "newly queued:"+queueCount);
+		}
+	}
+	
 	private void process() throws JobDriverTerminateException {
 		String location = "process";
 		try {
@@ -292,7 +361,7 @@ public class JobDriver extends Thread im
 			if(getJob().isRunnable()) {
 				uimaAsClientInitialize();
 				duccOut.info(location, jobid, "jd.step:"+location);
-				executor.prestartAllCoreThreads();
+				//executor.prestartAllCoreThreads();
 				workItemFactory = new WorkItemFactory(client, jobid, this);
 				queueCASes(1, queue, workItemFactory);
 				boolean run = true;
@@ -318,44 +387,15 @@ public class JobDriver extends Thread im
 					}
 					logState(getJob());
 					interrupter();
-					int threadCount = calculateThreadCount();
-					driverStatusReport.setThreadCount(threadCount);
-					if(threadCount > 0) {
-						executor.changeCorePoolSize(threadCount);
-					}
-					int poolSize = executor.getCorePoolSize();
-					duccOut.debug(location, jobid, "pool size:"+poolSize);
-					while(isQueueDeficit(threadCount)) {
-						if(!casSource.isExhaustedReader()) {
-							duccOut.debug(location, jobid, "not exhausted reader");
-							queueCASes(1,queue,workItemFactory);
-							continue;
-						}
-						else {
-							duccOut.trace(location, jobid, "exhausted reader");
-						}
-						if(!casSource.isLimboEmpty()) {
-							if(casSource.hasLimboAvailable()) {
-								duccOut.debug(location, jobid, "limbo available size:"+casSource.getLimboSize());
-								queueCASes(1,queue,workItemFactory);
-								continue;
-							}
-							else {
-								duccOut.debug(location, jobid, "limbo unavailable size:"+casSource.getLimboSize());
-							}
-						}
-						else {
-							duccOut.debug(location, jobid, "limbo empty size:"+casSource.getLimboSize());
-						}
-						break;
-					}
+					requeue();
+					queue();
+					missingCallbackReaper();
 					value = driverStatusReport.isComplete();
 					if(value) {
 						duccOut.info(location, jobid, "DriverComplete:"+value+" "+"DriverState:"+driverStatusReport.getDriverState());
 						run = false;
 						continue;
 					}
-					missingCallbackReaper();
 					try {
 						Thread.sleep(10000);
 					} 
@@ -484,8 +524,9 @@ public class JobDriver extends Thread im
 				CasTuple casTuple = casSource.pop();
 				driverStatusReport.setWorkItemsFetched(casSource.getSeqNo());
 				if(casTuple == null) {
-					if(casSource.isEmpty()) {
+					if(casSource.isExhaustedReader()) {
 						driverStatusReport.resetWorkItemsPending();
+						duccOut.debug(location, jobid, "resetWorkItemsPending");
 					}
 					break;
 				}
@@ -537,17 +578,19 @@ public class JobDriver extends Thread im
 	private void interrupter() {
 		String location = "interrupter";
 		CasDispatchMap casDispatchMap = getCasDispatchMap();
-		IDuccProcessMap processMap = (IDuccProcessMap) getJob().getProcessMap().deepCopy();
-		Iterator<DuccId> iterator = processMap.keySet().iterator();
-		while(iterator.hasNext()) {
-			DuccId duccId = iterator.next();
-			IDuccProcess duccProcess = processMap.get(duccId);
-			boolean statusComplete = duccProcess.isComplete();
-			boolean statusDeallocated = duccProcess.isDeallocated();
-			boolean statusProcessFailed = duccProcess.isFailed();
-			if(statusComplete || statusDeallocated || statusProcessFailed) {
-				duccOut.debug(location, jobid, duccProcess.getDuccId(), "isComplete:"+statusComplete+" "+"isDeallocated:"+statusDeallocated+" "+"isProcessFailed:"+statusProcessFailed);
-				casDispatchMap.interrupt(getJob(), duccProcess);
+		if(casDispatchMap.size() > 0) {
+			IDuccProcessMap processMap = (IDuccProcessMap) getJob().getProcessMap().deepCopy();
+			Iterator<DuccId> iterator = processMap.keySet().iterator();
+			while(iterator.hasNext()) {
+				DuccId duccId = iterator.next();
+				IDuccProcess duccProcess = processMap.get(duccId);
+				boolean statusComplete = duccProcess.isComplete();
+				boolean statusDeallocated = duccProcess.isDeallocated();
+				boolean statusProcessFailed = duccProcess.isFailed();
+				if(statusComplete || statusDeallocated || statusProcessFailed) {
+					duccOut.debug(location, jobid, duccProcess.getDuccId(), "isComplete:"+statusComplete+" "+"isDeallocated:"+statusDeallocated+" "+"isProcessFailed:"+statusProcessFailed);
+					casDispatchMap.interrupt(getJob(), duccProcess);
+				}
 			}
 		}
 	}
@@ -702,6 +745,21 @@ public class JobDriver extends Thread im
 		casWorkItemMap.remove(workItem.getCasId());
 	}
 	
+	private void delayedRetry(WorkItem workItem) {
+		String location = "delayedRetry";
+		duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+		duccOut.debug(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText()+" "+"casId:"+workItem.getCAS().hashCode());
+		remove(workItem);
+		duccOut.debug(location, workItem.getJobId(), workItem.getProcessId(), "size:"+casDispatchMap.size());
+		CasTuple casTuple = workItem.getCasTuple();
+		casTuple.setDelayedRetry();
+		casTuple.setDuccId(new DuccId(-1));
+		casSource.push(casTuple);
+		workItemStateManager.retry(workItem.getSeqNo());
+		workItemInactive();
+		return;
+	}
+	
 	private void retry(WorkItem workItem) {
 		String location = "retry";
 		duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
@@ -716,6 +774,7 @@ public class JobDriver extends Thread im
 		workItemStateManager.retry(workItem.getSeqNo());
 		return;
 	}
+	
 	// presume retry if registration info not found
 	
 	private boolean isRetry(WorkItem workItem) {
@@ -1329,49 +1388,54 @@ public class JobDriver extends Thread im
 	public void ended(WorkItem workItem) {
 		String location = "ended";
 		try {
-			waitForLocation(this, workItem);
-			workItemInactive();
-			duccOut.debug(location, jobid, "action:ended "+getThreadLocationInfo(workItem));
-			driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
-			driverStatusReport.workItemOperatingEnd(workItem.getCasId());
-			if(driverStatusReport.isKillJob()) {
-				duccOut.debug(location, jobid, "action:kill-job "+getThreadLocationInfo(workItem));
-				// killing job - don't add bother with retry
-			}
-			else if(driverStatusReport.isKillProcess(workItem.getProcessId())) {
-				duccOut.debug(location, jobid, "action:kill-process "+getThreadLocationInfo(workItem));
-				retry(workItem);
-				// killing process - don't add another work item to queue
-			}
-			else if(isRetry(workItem)) {
-				duccOut.debug(location, jobid, "action:shrink "+getThreadLocationInfo(workItem));
-				retry(workItem);
-				// must be shrinking - don't add another work item to queue
+			if(!casDispatchMap.containsKey(workItem.getCasId())) {
+				duccOut.warn(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"not dispatched");
 			}
 			else {
-				duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
-				duccOut.debug(location, jobid, "action:completed "+getThreadLocationInfo(workItem));
-				workItemStateManager.ended(workItem.getSeqNo());
-				driverStatusReport.countWorkItemsProcessingCompleted();
-				workItem.getTimeWindow().setEnd(TimeStamp.getCurrentMillis());
-				long time = workItem.getTimeWindow().getElapsedMillis();
-				synchronizedStats.addValue(time);
-				DuccPerWorkItemStatistics perWorkItemStatistics = new DuccPerWorkItemStatistics(
-						synchronizedStats.getMax(),
-						synchronizedStats.getMin(),
-						synchronizedStats.getMean(),
-						synchronizedStats.getStandardDeviation()
-						);
-				driverStatusReport.setPerWorkItemStatistics(perWorkItemStatistics);
-				performanceSummaryWriter.getSummaryMap().update(duccOut, workItem.getAnalysisEnginePerformanceMetricsList());
-				int casCount = performanceSummaryWriter.getSummaryMap().casCount();
-				int endCount = driverStatusReport.getWorkItemsProcessingCompleted();
-				String message = "casCount:"+casCount+" "+"endCount:"+endCount;
-				duccOut.debug(location, jobid, message);
-				remove(workItem);
-				recycleCAS(workItem);
-				accountingWorkItemIsDone(workItem.getProcessId(),time);
-				queueCASes(1,queue,workItemFactory);
+				waitForLocation(this, workItem);
+				workItemInactive();
+				duccOut.debug(location, jobid, "action:ended "+getThreadLocationInfo(workItem));
+				driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
+				driverStatusReport.workItemOperatingEnd(workItem.getCasId());
+				if(driverStatusReport.isKillJob()) {
+					duccOut.debug(location, jobid, "action:kill-job "+getThreadLocationInfo(workItem));
+					// killing job - don't add bother with retry
+				}
+				else if(driverStatusReport.isKillProcess(workItem.getProcessId())) {
+					duccOut.debug(location, jobid, "action:kill-process "+getThreadLocationInfo(workItem));
+					retry(workItem);
+					// killing process - don't add another work item to queue
+				}
+				else if(isRetry(workItem)) {
+					duccOut.debug(location, jobid, "action:shrink "+getThreadLocationInfo(workItem));
+					retry(workItem);
+					// must be shrinking - don't add another work item to queue
+				}
+				else {
+					duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+					duccOut.debug(location, jobid, "action:completed "+getThreadLocationInfo(workItem));
+					workItemStateManager.ended(workItem.getSeqNo());
+					driverStatusReport.countWorkItemsProcessingCompleted();
+					workItem.getTimeWindow().setEnd(TimeStamp.getCurrentMillis());
+					long time = workItem.getTimeWindow().getElapsedMillis();
+					synchronizedStats.addValue(time);
+					DuccPerWorkItemStatistics perWorkItemStatistics = new DuccPerWorkItemStatistics(
+							synchronizedStats.getMax(),
+							synchronizedStats.getMin(),
+							synchronizedStats.getMean(),
+							synchronizedStats.getStandardDeviation()
+							);
+					driverStatusReport.setPerWorkItemStatistics(perWorkItemStatistics);
+					performanceSummaryWriter.getSummaryMap().update(duccOut, workItem.getAnalysisEnginePerformanceMetricsList());
+					int casCount = performanceSummaryWriter.getSummaryMap().casCount();
+					int endCount = driverStatusReport.getWorkItemsProcessingCompleted();
+					String message = "casCount:"+casCount+" "+"endCount:"+endCount;
+					duccOut.debug(location, jobid, message);
+					remove(workItem);
+					recycleCAS(workItem);
+					accountingWorkItemIsDone(workItem.getProcessId(),time);
+					queueCASes(1,queue,workItemFactory);
+				}
 			}
 		}
 		catch(Exception e) {
@@ -1439,6 +1503,22 @@ public class JobDriver extends Thread im
 		String location = "lost";
 		try {
 			duccOut.info(location, workItem.getJobId(), "seqNo:"+workItem.getSeqNo());
+			driverStatusReport.workItemDequeued(workItem.getCasId());
+			driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
+			driverStatusReport.workItemOperatingEnd(workItem.getCasId());
+			driverStatusReport.incrementWorkItemsLost();
+			delayedRetry(workItem);
+		}
+		catch(Exception exception) {
+			duccOut.error(location, jobid, "processing error?", exception);
+		}
+	}
+	
+	/*
+	public void lost(WorkItem workItem) {
+		String location = "lost";
+		try {
+			duccOut.info(location, workItem.getJobId(), "seqNo:"+workItem.getSeqNo());
 			workItemInactive();
 			driverStatusReport.workItemDequeued(workItem.getCasId());
 			driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
@@ -1454,86 +1534,92 @@ public class JobDriver extends Thread im
 			duccOut.error(location, jobid, "processing error?", exception);
 		}
 	}
+	*/
 	
 	public void exception(WorkItem workItem, Exception e) {
 		String location = "exception";
 		try {
-			duccOut.debug(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
-			duccOut.debug(location, jobid, "action:exception "+getThreadLocationInfo(workItem), e);
-			boolean timeout = false;
-			if(ExceptionClassifier.isTimeout(e)) {
-				ArrayList<WorkItem> removalsList = new ArrayList<WorkItem>();
-				removalsList.add(workItem);
-				removeLocations(removalsList);
-				timeout = true;
+			if(!casDispatchMap.containsKey(workItem.getCasId())) {
+				duccOut.warn(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"not dispatched");
 			}
 			else {
-				duccOut.debug(location, jobid, "action:location-wait "+getThreadLocationInfo(workItem), e);
-				waitForLocation(this, workItem);
-			}
-			workItemInactive();
-			driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
-			driverStatusReport.workItemOperatingEnd(workItem.getCasId());
-			if(driverStatusReport.isKillJob()) {
-				duccOut.debug(location, jobid, "action:kill-job "+getThreadLocationInfo(workItem), e);
-				// killing job - don't add bother with retry
-			}
-			else if(timeout) {
-				duccOut.debug(location, jobid, "action:timeout "+getThreadLocationInfo(workItem), e);
-				// <UIMA-3600>
-				waitForProcessStatus(this, workItem);
-				if(isNodeFailure(workItem)) {
-					duccOut.debug(location, jobid, "action:timeout-node-failure-retry "+getThreadLocationInfo(workItem), e);
-					retry(workItem);
-					// node failed, work item interrupted - don't add another work item to queue
+				duccOut.debug(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+				duccOut.debug(location, jobid, "action:exception "+getThreadLocationInfo(workItem), e);
+				boolean timeout = false;
+				if(ExceptionClassifier.isTimeout(e)) {
+					ArrayList<WorkItem> removalsList = new ArrayList<WorkItem>();
+					removalsList.add(workItem);
+					removeLocations(removalsList);
+					timeout = true;
 				}
 				else {
-					duccOut.debug(location, jobid, "action:timeout-handler "+getThreadLocationInfo(workItem), e);
-					employPluginExceptionHandler(workItem, e);
+					duccOut.debug(location, jobid, "action:location-wait "+getThreadLocationInfo(workItem), e);
+					waitForLocation(this, workItem);
 				}
-				// </UIMA-3600>
-			}
-			else if(isUnknownProcess(workItem)) {
-				duccOut.debug(location, jobid, "action:unknown-process "+getThreadLocationInfo(workItem), e);
-				retry(workItem);
-				// unknown process (no callbacks) - don't add another work item to queue
-			}
-			else if(driverStatusReport.isKillProcess(workItem.getProcessId())) {
-				duccOut.debug(location, jobid, "action:kill-process "+getThreadLocationInfo(workItem), e);
-				retry(workItem);
-				// killing process - don't add another work item to queue
-			}
-			else if(isFailedProcess(workItem)) {
-				if(ExceptionClassifier.isInterrupted(e)) {
-					duccOut.debug(location, jobid, "action:fail-process-retry "+getThreadLocationInfo(workItem), e);
+				workItemInactive();
+				driverStatusReport.workItemPendingProcessAssignmentRemove(workItem.getCasId());
+				driverStatusReport.workItemOperatingEnd(workItem.getCasId());
+				if(driverStatusReport.isKillJob()) {
+					duccOut.debug(location, jobid, "action:kill-job "+getThreadLocationInfo(workItem), e);
+					// killing job - don't add bother with retry
+				}
+				else if(timeout) {
+					duccOut.debug(location, jobid, "action:timeout "+getThreadLocationInfo(workItem), e);
+					// <UIMA-3600>
+					waitForProcessStatus(this, workItem);
+					if(isNodeFailure(workItem)) {
+						duccOut.debug(location, jobid, "action:timeout-node-failure-retry "+getThreadLocationInfo(workItem), e);
+						retry(workItem);
+						// node failed, work item interrupted - don't add another work item to queue
+					}
+					else {
+						duccOut.debug(location, jobid, "action:timeout-handler "+getThreadLocationInfo(workItem), e);
+						employPluginExceptionHandler(workItem, e);
+					}
+					// </UIMA-3600>
+				}
+				else if(isUnknownProcess(workItem)) {
+					duccOut.debug(location, jobid, "action:unknown-process "+getThreadLocationInfo(workItem), e);
+					retry(workItem);
+					// unknown process (no callbacks) - don't add another work item to queue
+				}
+				else if(driverStatusReport.isKillProcess(workItem.getProcessId())) {
+					duccOut.debug(location, jobid, "action:kill-process "+getThreadLocationInfo(workItem), e);
+					retry(workItem);
+					// killing process - don't add another work item to queue
+				}
+				else if(isFailedProcess(workItem)) {
+					if(ExceptionClassifier.isInterrupted(e)) {
+						duccOut.debug(location, jobid, "action:fail-process-retry "+getThreadLocationInfo(workItem), e);
+						retry(workItem);
+						// process failed, work item interrupted - don't add another work item to queue
+					}
+					else {
+						duccOut.debug(location, jobid, "action:fail-process-handler "+getThreadLocationInfo(workItem), e);
+						employPluginExceptionHandler(workItem, e);
+					}
+				}
+				else if(isRetry(workItem)) {
+					duccOut.debug(location, jobid, "action:shrink "+getThreadLocationInfo(workItem), e);
 					retry(workItem);
-					// process failed, work item interrupted - don't add another work item to queue
+					// must be shrinking - don't add another work item to queue
+				}
+				else if(isError(workItem, e)) {
+					duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
+					duccOut.debug(location, jobid, "action:error "+getThreadLocationInfo(workItem), e);
+					workItemStateManager.error(workItem.getSeqNo());
+					workItemError(workItem, e);
+					remove(workItem);
+					recycleCAS(workItem);
+					accountingWorkItemIsError(workItem.getProcessId());
+					queueCASes(1,queue,workItemFactory);
 				}
 				else {
-					duccOut.debug(location, jobid, "action:fail-process-handler "+getThreadLocationInfo(workItem), e);
-					employPluginExceptionHandler(workItem, e);
+					duccOut.debug(location, jobid, "action:retry "+getThreadLocationInfo(workItem), e);
+					retry(workItem);
+					queueCASes(1,queue,workItemFactory);
 				}
 			}
-			else if(isRetry(workItem)) {
-				duccOut.debug(location, jobid, "action:shrink "+getThreadLocationInfo(workItem), e);
-				retry(workItem);
-				// must be shrinking - don't add another work item to queue
-			}
-			else if(isError(workItem, e)) {
-				duccOut.info(location, workItem.getJobId(), workItem.getProcessId(), "seqNo:"+workItem.getSeqNo()+" "+"wiId:"+workItem.getCasDocumentText());
-				duccOut.debug(location, jobid, "action:error "+getThreadLocationInfo(workItem), e);
-				workItemStateManager.error(workItem.getSeqNo());
-				workItemError(workItem, e);
-				remove(workItem);
-				recycleCAS(workItem);
-				accountingWorkItemIsError(workItem.getProcessId());
-				queueCASes(1,queue,workItemFactory);
-			}
-			else {
-				duccOut.debug(location, jobid, "action:retry "+getThreadLocationInfo(workItem), e);
-				retry(workItem);
-				queueCASes(1,queue,workItemFactory);
-			}
 		}
 		catch(Exception exception) {
 			duccOut.error(location, jobid, "processing error?", exception);
@@ -1559,11 +1645,13 @@ public class JobDriver extends Thread im
 		return;
 	}
 	
+	/*
 	private void workItemLost(WorkItem workItem) {
 		String location = "workItemLost";
 		driverStatusReport.countWorkItemsLost();
 		duccOut.error(location, workItem.getJobId(), "seqNo:"+workItem.getSeqNo());
 	}
+	*/
 	
 	private void workItemError(WorkItem workItem, Throwable t) {
 		workItemError(workItem, t, null);

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CallbackState.java Tue Feb 18 13:05:21 2014
@@ -22,7 +22,10 @@ public class CallbackState {
 	
 	public static enum State { PendingQueued, PendingAssigned, NotPending };
 	
-	public State state = State.NotPending;
+	private State state = State.NotPending;
+	
+	public CallbackState() {
+	}
 	
 	private void setState(State value) {
 		synchronized(state) {

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasLimbo.java Tue Feb 18 13:05:21 2014
@@ -18,6 +18,7 @@
 */
 package org.apache.uima.ducc.jd.client;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -115,13 +116,13 @@ public class CasLimbo {
 		String location = "use";
 		DuccId pDuccId = casTuple.getDuccId();
 		if(pDuccId == null) {
-			duccOut.warn(location, getJobId(), casTuple.getDuccId(), "process ID null");
+			duccOut.debug(location, getJobId(), casTuple.getDuccId(), "process ID null");
 		}
 		else {
 			IDuccProcessMap processMap = getProcessMap();
 			IDuccProcess process = processMap.getProcess(pDuccId);
 			if(process == null) {
-				duccOut.warn(location, getJobId(), casTuple.getDuccId(), "process is null");
+				duccOut.debug(location, getJobId(), casTuple.getDuccId(), "process is null");
 			}
 			else {
 				if(process.isPreempted()) {
@@ -142,16 +143,19 @@ public class CasLimbo {
 		String location = "isAvailable";
 		boolean retVal = false;
 		DuccId pDuccId = casTuple.getDuccId();
-		if(pDuccId == null) {
+		if(casTuple.isDelayedRetry()) {
+			//retVal = false;
+		}
+		else if(pDuccId == null) {
 			retVal = true;
-			duccOut.warn(location, getJobId(), casTuple.getDuccId(), "process ID is null");
+			duccOut.debug(location, getJobId(), casTuple.getDuccId(), "process ID is null");
 		}
 		else {
 			IDuccProcessMap processMap = getProcessMap();
 			IDuccProcess process = processMap.getProcess(pDuccId);
 			if(process == null) {
 				retVal = true;
-				duccOut.warn(location, getJobId(), casTuple.getDuccId(), "process is null");
+				duccOut.debug(location, getJobId(), casTuple.getDuccId(), "process is null");
 			}
 			else {	
 				if(process.isDefunct()) {
@@ -175,6 +179,34 @@ public class CasLimbo {
 		return retVal;
 	}
 	
+	public ArrayList<CasTuple> release() {
+		String location = "release";
+		ArrayList<CasTuple> retVal = new ArrayList<CasTuple>();
+		Iterator<CasTuple> iterator = tupleQueue.iterator();
+		while(iterator.hasNext()) {
+			CasTuple casTuple = iterator.next();
+			boolean released = casTuple.undelay();
+			if(released) {
+				duccOut.debug(location, getJobId(), "seqNo:"+casTuple.getSeqno());
+				casTuple.setRetry();
+				retVal.add(casTuple);
+			}
+		}
+		return retVal;
+	}
+	
+	public int delayedSize() {
+		Iterator<CasTuple> iterator = tupleQueue.iterator();
+		int count = 0;
+		while(iterator.hasNext()) {
+			CasTuple casTuple = iterator.next();
+			if(casTuple.isDelayedRetry()) {
+				count++;
+			}
+		}
+		return count;
+	}
+	
 	public int size() {
 		String location = "size";
 		int retVal = tupleQueue.size();

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasSource.java Tue Feb 18 13:05:21 2014
@@ -47,6 +47,7 @@ import org.apache.uima.resource.metadata
 import org.apache.uima.resource.metadata.FsIndexDescription;
 import org.apache.uima.resource.metadata.TypePriorities;
 import org.apache.uima.resource.metadata.TypeSystemDescription;
+import org.apache.uima.util.CasCopier;
 import org.apache.uima.util.CasCreationUtils;
 import org.apache.uima.util.InvalidXMLException;
 import org.apache.uima.util.Progress;
@@ -146,6 +147,14 @@ public class CasSource {
     	initTotal();
     }
 	
+    private CAS clone(CAS casOriginal, int seqNo) throws ResourceInitializationException {
+    	String location = "clone";
+		CAS casClone = getEmptyCas(seqNo);
+		CasCopier.copyCas(casOriginal, casClone, true);
+		duccOut.debug(location, null, "seqNo:"+seqNo+" "+"casId:"+casOriginal.hashCode()+" "+"casId:"+casClone.hashCode());
+		return casClone;
+    }
+    
 	private CAS getEmptyCas(int seqNo) throws ResourceInitializationException {
 		String location = "getEmptyCas";
 		CAS cas = getRecycledCas();
@@ -220,6 +229,14 @@ public class CasSource {
 		return casLimbo.hasAvailable();
 	}
 	
+	public ArrayList<CasTuple> releaseLimbo() {
+		return casLimbo.release();
+	}
+	
+	public boolean hasDelayed() {
+		return casLimbo.delayedSize() > 0;
+	}
+	
 	public boolean isEmpty() {
 		boolean retVal = false;
 		if(isExhaustedReader()) {
@@ -233,7 +250,13 @@ public class CasSource {
 	public CasTuple pop() throws Exception {
 		String location = "pop";
 		CasTuple casTuple = casLimbo.get();
-		if(casTuple == null) {
+		if(casTuple != null) {
+			CAS casOriginal = casTuple.getCas();
+			int seqNo = casTuple.getSeqno();
+			CAS casClone = clone(casOriginal, seqNo);
+			casTuple.setCas(casClone);
+		}
+		else {
 			try {
 				synchronized(cr) {
 					if((total > 0) && (total == seqNo.get())) {

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/CasTuple.java Tue Feb 18 13:05:21 2014
@@ -26,6 +26,7 @@ public class CasTuple {
 	private CAS cas;
 	private int seqno;
 	private boolean retry = false;
+	private boolean delayedRetry = false;
 	private DuccId pDuccId= null;
 	
 	public CasTuple(CAS cas, int seqno) {
@@ -37,6 +38,10 @@ public class CasTuple {
 		this.seqno = seqno;
 	}
 	
+	public void setCas(CAS value) {
+		cas = value;
+	}
+	
 	public CAS getCas() {
 		return cas;
 	}
@@ -57,6 +62,23 @@ public class CasTuple {
 		return retry;
 	}
 	
+	public void setDelayedRetry() {
+		delayedRetry = true;
+	}
+	
+	public boolean isDelayedRetry() {
+		return delayedRetry;
+	}
+	
+	public boolean undelay() {
+		boolean retVal = delayedRetry;
+		if(delayedRetry) {
+			retry = true;
+			delayedRetry = false;
+		}
+		return retVal;
+	}
+	
 	public DuccId getDuccId() {
 		return pDuccId;
 	}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItem.java Tue Feb 18 13:05:21 2014
@@ -19,7 +19,6 @@
 package org.apache.uima.ducc.jd.client;
 
 import java.util.ArrayList;
-import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
@@ -65,10 +64,6 @@ public class WorkItem implements Runnabl
 	private String keyUimaAsClientTracking = "UimaAsClientTracking";
 	private boolean uimaAsClientTracking = false;
 	
-	// <for testing only!!!>
-	private final static boolean injectRandom = false;
-	// </for testing only!!!>
-	
 	public WorkItem(UimaAsynchronousEngine client, CasTuple casTuple, DuccId duccId, IWorkItemMonitor workItemMonitor) {
 		init(client, casTuple, duccId, workItemMonitor);
 	}
@@ -149,16 +144,8 @@ public class WorkItem implements Runnabl
 				}
 				client.sendAndReceiveCAS(cas, analysisEnginePerformanceMetricsList);
 				if(uimaAsClientTracking) {
-					duccOut.info(methodName, null, "seqNo:"+getSeqNo()+" "+"send and receive returned");
-				}
-				// <for testing only!!!>
-				if(injectRandom) {
-					Random random = new Random();
-					if(random.nextBoolean()) {
-						throw new Throwable("just testing Throwable handler");
-					}
+					duccOut.debug(methodName, null, "seqNo:"+getSeqNo()+" "+"send and receive returned");
 				}
-				// </for testing only!!!>
 				if(!isLost.get()) {
 					ended();
 				}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/client/WorkItemListener.java Tue Feb 18 13:05:21 2014
@@ -26,6 +26,9 @@ import org.apache.uima.ducc.common.utils
 import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.jd.IJobDriver;
+import org.apache.uima.ducc.jd.client.ThreadLocation;
+import org.apache.uima.ducc.jd.client.WorkItem;
+import org.apache.uima.ducc.jd.reliability.FaultInjector;
 
 
 public class WorkItemListener extends UimaAsBaseCallbackListener {
@@ -41,12 +44,10 @@ public class WorkItemListener extends Ui
 	
 	private String keyUimaAsClientTracking = "UimaAsClientTracking";
 	private boolean uimaAsClientTracking = false;
-	
+
 	// <for testing only!!!>
 	private final static boolean asynchronous = false;
-	private final static boolean injectLost1 = false;
-	private final static boolean injectLost2 = false;
-	private final static boolean injectDelay3 = false;
+	private final static FaultInjector faultInjector = FaultInjector.getInstance();
 	// </for testing only!!!>
 	
 	public WorkItemListener(IJobDriver jobDriver) {
@@ -96,17 +97,8 @@ public class WorkItemListener extends Ui
 			String methodName = "OnBeforeMessageSendHandler";
 			try {
 				// <for testing only!!!>
-				if(injectLost1) {
-					String casId = null;
-					casId = ""+status.getCAS().hashCode();
-					WorkItem wi = jobDriver.getWorkItem(casId);
-					wi.getCallbackState().statePendingAssigned();
-					duccOut.warn(methodName, jobid, "seqNo:"+wi.getSeqNo()+" "+wi.getCallbackState().getState());
-					int seqNo = wi.getSeqNo();
-					if(seqNo == 1) {
-						duccOut.warn(methodName, jobid, "callback #1 discarded seqNo:"+seqNo+" "+"casId:"+casId);
-						return;
-					}
+				if(faultInjector.isFaultCallBack1(status, jobDriver)) {
+					return;
 				}
 				// </for testing only!!!>
 				String casId = ""+status.getCAS().hashCode();
@@ -192,31 +184,8 @@ public class WorkItemListener extends Ui
 			String methodName = "OnBeforeProcessCASHandler";
 			try {
 				// <for testing only!!!>
-				if(injectLost2) {
-					String casId = null;
-					casId = ""+status.getCAS().hashCode();
-					WorkItem wi = jobDriver.getWorkItem(casId);
-					wi.getCallbackState().statePendingAssigned();
-					duccOut.warn(methodName, jobid, "seqNo:"+wi.getSeqNo()+" "+wi.getCallbackState().getState());
-					int seqNo = wi.getSeqNo();
-					if(seqNo == 2) {
-						duccOut.warn(methodName, jobid, "callback #2 discarded seqNo:"+seqNo+" "+"casId:"+casId);
-						return;
-					}
-				}
-				if(injectDelay3) {
-					String casId = null;
-					casId = ""+status.getCAS().hashCode();
-					WorkItem wi = jobDriver.getWorkItem(casId);
-					int seqNo = wi.getSeqNo();
-					if(seqNo == 3) {
-						duccOut.warn(methodName, jobid, "callback delayed seqNo:"+seqNo+" "+"casId:"+casId);
-						try {
-							Thread.sleep(70*1000);
-						}
-						catch(Exception e) {
-						}
-					}
+				if(faultInjector.isFaultCallBack2(status, jobDriver)) {
+					return;
 				}
 				// </for testing only!!!>
 				String casId = ""+status.getCAS().hashCode();

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java?rev=1569302&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java Tue Feb 18 13:05:21 2014
@@ -0,0 +1,127 @@
+package org.apache.uima.ducc.jd.reliability;
+
+import java.util.Random;
+
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.jd.IJobDriver;
+import org.apache.uima.ducc.jd.client.WorkItem;
+
+public class FaultInjector {
+	
+	private static DuccLogger duccOut = DuccLoggerComponents.getJdOut(FaultInjector.class.getName());
+	
+	private static FaultInjector instance = new FaultInjector();
+	
+	public static FaultInjector getInstance() {
+		return instance;
+	}
+	
+	private Random random = new Random();
+	
+	// Warning: setting enabled to true will inject faults by randomly ignoring UIMA-AS client callbacks
+	
+	private boolean enabled = false;
+	
+	private enum Type { CallBack1, CallBack2 };
+	
+	private double pct1 = 0.1;
+	private double pct2 = 0.1;
+
+	private FaultInjector() {
+		try {
+			initialize();
+		}
+		catch(Exception e) {
+		}
+	}
+	
+	private void initialize() {
+		DuccPropertiesResolver dpr = DuccPropertiesResolver.getInstance();
+		String cb1 = dpr.getProperty("ducc.jd.fault.injector.CallBack1");
+		double dv1 = initValue(cb1);
+		if(dv1 > 0.0) {
+			pct1 = dv1;
+			enabled = true;
+		}
+		String cb2 = dpr.getProperty("ducc.jd.fault.injector.CallBack2");
+		double dv2 = initValue(cb2);
+		if(dv2 > 0.0) {
+			pct2 = dv2;
+			enabled = true;
+		}
+	}
+	
+	private double initValue(String value) {
+		double retVal = -1;
+		try {
+			if(value != null) {
+				double dVal = Double.parseDouble(value);
+				if(dVal > 0) {
+					if(dVal < 1) {
+						retVal = dVal;
+					}
+				}
+			} 
+		}
+		catch(Exception e) {
+		}
+		return retVal;
+	}
+	
+	private boolean isFault(DuccId jobid, int seqNo, Type type) {
+		String location = "isFault";
+		boolean retVal = false;
+		try {
+			if(enabled) {
+				double d1 = random.nextDouble();
+				double d0 = d1;
+				switch(type) {
+				case CallBack1:
+					d0 = pct1;
+					break;
+				case CallBack2:
+					d0 = pct2;
+					break;
+				}
+				if(d1 < d0) {
+					duccOut.info(location, jobid, "seqNo:"+seqNo+" "+"type:"+type.name());
+					retVal = true;
+				}
+			}
+		}
+		catch(Exception e) {
+			duccOut.error(location, jobid, e);
+		}
+		return retVal;
+	}
+	
+	private boolean isFault(UimaASProcessStatus status, IJobDriver jobDriver, Type type) {
+		String location = "isFault";
+		DuccId jobid = null;
+		boolean retVal = false;
+		try {
+			jobid = jobDriver.getJob().getDuccId();
+			String casId = ""+status.getCAS().hashCode();
+			WorkItem wi = jobDriver.getWorkItem(casId);
+			int seqNo = wi.getSeqNo();
+			retVal = isFault(jobid, seqNo, type);
+		}
+		catch(Exception e) {
+			duccOut.error(location, jobid, e);
+		}
+		return retVal;
+	}
+	
+	public boolean isFaultCallBack1(UimaASProcessStatus status, IJobDriver jobDriver) {
+		return isFault(status, jobDriver, Type.CallBack1);
+	}
+	
+	public boolean isFaultCallBack2(UimaASProcessStatus status, IJobDriver jobDriver) {
+		return isFault(status, jobDriver, Type.CallBack2);
+	}
+	
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-jd/src/main/java/org/apache/uima/ducc/jd/reliability/FaultInjector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java?rev=1569302&r1=1569301&r2=1569302&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java Tue Feb 18 13:05:21 2014
@@ -370,22 +370,46 @@ public class DriverStatusReport implemen
 		return workItemsProcessingError.get();
 	}
 	
-	public void countWorkItemsLost() {
+	public void incrementWorkItemsLost() {
 		workItemsLost.incrementAndGet();
 		calculateState();
 		logReport();
 	}
 	
+	public void decrementWorkItemsLost() {
+		workItemsLost.decrementAndGet();
+		calculateState();
+		logReport();
+	}
+	
+	public void decrementWorkItemsLost(int value) {
+		int delta = 0 - value;
+		workItemsLost.addAndGet(delta);
+		calculateState();
+		logReport();
+	}
+	
 	public int getWorkItemsLost() {
 		return workItemsLost.get();
 	}
 	
+	
+	public boolean isWorkItemsLost() {
+		return workItemsLost.get() > 0;
+	}
+	
 	public void countWorkItemsRetry() {
 		workItemsRetry.incrementAndGet();
 		calculateState();
 		logReport();
 	}
 	
+	public void incrementWorkItemsRetry(int value) {
+		workItemsRetry.addAndGet(value);
+		calculateState();
+		logReport();
+	}
+	
 	public int getWorkItemsRetry() {
 		return workItemsRetry.get();
 	}
@@ -622,6 +646,9 @@ public class DriverStatusReport implemen
 			else if(isPending()) {
 				setDriverState(DriverState.Idle);
 			}
+			else if(isWorkItemsLost()) {
+				setDriverState(DriverState.Idle);
+			}
 			else {
 				setDriverState(DriverState.Completing);
 			}
@@ -631,6 +658,9 @@ public class DriverStatusReport implemen
 				if(isPending()) {
 					setDriverState(DriverState.Idle);
 				}
+				else if(isWorkItemsLost()) {
+					setDriverState(DriverState.Idle);
+				}
 				else {
 					setDriverState(DriverState.Completing);
 				}