You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/02/26 16:26:38 UTC

svn commit: r1662483 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container: common/ jd/ jd/cas/ jd/fsm/wi/ jd/mh/ jd/mh/iface/ jd/mh/impl/ jd/wi/ net/iface/

Author: degenaro
Date: Thu Feb 26 15:26:37 2015
New Revision: 1662483

URL: http://svn.apache.org/r1662483
Log:
UIMA-4069 DUCC Job Driver (JD) system classpath

investment reset

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java   (with props)
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java Thu Feb 26 15:26:37 2015
@@ -98,6 +98,7 @@ public class Standardize {
 		killJob,
 		killProcess,
 		killWorkItem,
+		investmentMillis,
 		operatingMillis,
 		;
 		

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java Thu Feb 26 15:26:37 2015
@@ -69,6 +69,7 @@ public class JobDriverHelper {
 				wii.setTid(rwt.getTid());
 				wii.setSeqNo(wi.getSeqNo());
 				wii.setOperatingMillis(wi.getMillisOperating());
+				wii.setInvestmentMillis(wi.getMillisInvestment());
 				MessageBuffer mb = new MessageBuffer();
 				mb.append(Standardize.Label.node.get()+wii.getNodeName());
 				mb.append(Standardize.Label.pid.get()+wii.getPid());

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java Thu Feb 26 15:26:37 2015
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.Atomi
 
 public class CasManagerStats {
 	
-	public enum RetryReason { ProcessPreempt, ProcessDown, NodeDown, UserErrorRetry, TimeoutRetry };
+	public enum RetryReason { ProcessPreempt, ProcessVolunteered, ProcessDown, NodeDown, UserErrorRetry, TimeoutRetry };
 	
 	private AtomicInteger crTotal = new AtomicInteger(0);
 	private AtomicInteger crGets = new AtomicInteger(0);

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java?rev=1662483&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java Thu Feb 26 15:26:37 2015
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.fsm.wi;
+
+import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.MetaCasHelper;
+import org.apache.uima.ducc.container.common.fsm.iface.IAction;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.log.LoggerHelper;
+import org.apache.uima.ducc.container.jd.wi.IWorkItem;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+
+public class ActionInvestmentReset implements IAction {
+
+	private static Logger logger = Logger.getLogger(ActionInvestmentReset.class, IComponent.Id.JD.name());
+	
+	@Override
+	public String getName() {
+		return ActionInvestmentReset.class.getName();
+	}
+	@Override
+	public void engage(Object objectData) {
+		String location = "engage";
+		logger.trace(location, ILogger.null_id, "");
+		IActionData actionData = (IActionData) objectData;
+		try {
+			if(actionData != null) {
+				IWorkItem wi = actionData.getWorkItem();
+				IMetaCas metaCas = wi.getMetaCas();
+				JobDriver jd = JobDriver.getInstance();
+				IWorkItemStateKeeper wisk = jd.getWorkItemStateKeeper();
+				MetaCasHelper metaCasHelper = new MetaCasHelper(metaCas);
+				if(metaCas != null) {
+					int seqNo = metaCasHelper.getSystemKey();
+					wisk.investmentReset(seqNo);
+					//
+					wi.setTodInvestment();
+					MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+					JobDriver.getInstance().getMessageHandler().incInvestmentResets();
+					logger.info(location, ILogger.null_id, mb.toString());
+				}
+				else {
+					MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+					mb.append("No CAS found for processing");
+					logger.info(location, ILogger.null_id, mb.toString());
+				}
+			}
+			else {
+				MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+				mb.append("No action data found for processing");
+				logger.warn(location, ILogger.null_id, mb.toString());
+			}
+		}
+		catch(Exception e) {
+			logger.error(location, ILogger.null_id, e);
+		}
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java?rev=1662483&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java Thu Feb 26 15:26:37 2015
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.fsm.wi;
+
+import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.MetaCasHelper;
+import org.apache.uima.ducc.container.common.fsm.iface.IAction;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverHelper;
+import org.apache.uima.ducc.container.jd.cas.CasManager;
+import org.apache.uima.ducc.container.jd.cas.CasManagerStats.RetryReason;
+import org.apache.uima.ducc.container.jd.log.LoggerHelper;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
+import org.apache.uima.ducc.container.jd.wi.IProcessStatistics;
+import org.apache.uima.ducc.container.jd.wi.IWorkItem;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+
+public class ActionProcessVolunteered extends Action implements IAction {
+
+	private static Logger logger = Logger.getLogger(ActionProcessVolunteered.class, IComponent.Id.JD.name());
+	
+	@Override
+	public String getName() {
+		return ActionProcessVolunteered.class.getName();
+	}
+	
+	private void preemptWorkItem(IActionData actionData, CasManager cm, IMetaCas metaCas) {
+		String location = "preemptWorkItem";
+		cm.putMetaCas(metaCas, RetryReason.ProcessVolunteered);
+		cm.getCasManagerStats().incEndRetry();
+		MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+		logger.info(location, ILogger.null_id, mb.toString());
+	}
+	
+	@Override
+	public void engage(Object objectData) {
+		String location = "engage";
+		logger.trace(location, ILogger.null_id, "");
+		IActionData actionData = (IActionData) objectData;
+		try {
+			if(actionData != null) {
+				IWorkItem wi = actionData.getWorkItem();
+				IMetaCas metaCas = wi.getMetaCas();
+				JobDriver jd = JobDriver.getInstance();
+				CasManager cm = jd.getCasManager();
+				JobDriverHelper jdh = JobDriverHelper.getInstance();
+				IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
+				if(rwp != null) {
+					if(metaCas != null) {
+						preemptWorkItem(actionData, cm, metaCas);
+						IWorkItemStateKeeper wisk = jd.getWorkItemStateKeeper();
+						MetaCasHelper metaCasHelper = new MetaCasHelper(metaCas);
+						IProcessStatistics pStats = jdh.getProcessStatistics(rwp);
+						int seqNo = metaCasHelper.getSystemKey();
+						wisk.preempt(seqNo);
+						pStats.preempt(wi);
+						displayProcessStatistics(logger, actionData, wi, pStats);
+						wi.reset();
+					}
+					else {
+						MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+						mb.append("No CAS found for processing");
+						logger.info(location, ILogger.null_id, mb.toString());
+					}
+				}
+				else {
+					MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+					mb.append("No remote worker process entry found for processing");
+					logger.info(location, ILogger.null_id, mb.toString());
+				}
+			}
+			else {
+				MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+				mb.append("No action data found for processing");
+				logger.warn(location, ILogger.null_id, mb.toString());
+			}
+		}
+		catch(Exception e) {
+			logger.error(location, ILogger.null_id, e);
+		}
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java Thu Feb 26 15:26:37 2015
@@ -39,38 +39,43 @@ public class WiFsm extends Fsm {
 
 	private static Logger logger = Logger.getLogger(WiFsm.class, IComponent.Id.JD.name());
 	
-	public static IState Start 				= new State("Start");
-	public static IState Get_Pending 		= new State("Get_Pending");
-	public static IState CAS_Send 			= new State("CAS_Send");
-	public static IState CAS_Active 		= new State("CAS_Active");
-	
-	public static IEvent Get_Request 		= new Event("Get_Request");
-	public static IEvent CAS_Available		= new Event("CAS_Available");
-	public static IEvent CAS_Unavailable	= new Event("CAS_Unavailable");
-	public static IEvent Ack_Request 		= new Event("Ack_Request");
-	public static IEvent Send_Failure 		= new Event("Send_Failure");
-	public static IEvent Ack_Timer_Pop		= new Event("Ack_Timer_Pop");
-	public static IEvent End_Request 		= new Event("End_Request");
-	public static IEvent End_Timer_Pop		= new Event("End_Timer_Pop");
-	public static IEvent Host_Failure		= new Event("Host_Failure");
-	public static IEvent Process_Failure	= new Event("Process_Failure");
-	public static IEvent Process_Preempt	= new Event("Process_Premept");
-	
-	public IAction ActionGet				= new ActionGet();
-	public IAction ActionGetRedux			= new ActionGetRedux();
-	public IAction ActionSend				= new ActionSend();
-	public IAction ActionAck				= new ActionAck();
-	public IAction ActionAckRedux			= new ActionAckRedux();
-	public IAction ActionEnd				= new ActionEnd();
+	public static IState Start 					= new State("Start");
+	public static IState Get_Pending 			= new State("Get_Pending");
+	public static IState CAS_Send 				= new State("CAS_Send");
+	public static IState CAS_Active 			= new State("CAS_Active");
+	
+	public static IEvent Get_Request 			= new Event("Get_Request");
+	public static IEvent CAS_Available			= new Event("CAS_Available");
+	public static IEvent CAS_Unavailable		= new Event("CAS_Unavailable");
+	public static IEvent Ack_Request 			= new Event("Ack_Request");
+	public static IEvent Send_Failure 			= new Event("Send_Failure");
+	public static IEvent Ack_Timer_Pop			= new Event("Ack_Timer_Pop");
+	public static IEvent End_Request 			= new Event("End_Request");
+	public static IEvent End_Timer_Pop			= new Event("End_Timer_Pop");
+	public static IEvent Host_Failure			= new Event("Host_Failure");
+	public static IEvent Process_Failure		= new Event("Process_Failure");
+	public static IEvent Process_Preempt		= new Event("Process_Premept");
+	public static IEvent Process_Volunteered	= new Event("Process_Volunteered");
+	public static IEvent Investment_Reset		= new Event("Investment_Reset");
+	
+	public IAction ActionGet					= new ActionGet();
+	public IAction ActionGetRedux				= new ActionGetRedux();
+	public IAction ActionSend					= new ActionSend();
+	public IAction ActionAck					= new ActionAck();
+	public IAction ActionAckRedux				= new ActionAckRedux();
+	public IAction ActionEnd					= new ActionEnd();
+	
+	public IAction ActionProcessFailure			= new ActionProcessFailure();
+	public IAction ActionProcessPreempt			= new ActionProcessPreempt();
+	public IAction ActionProcessVolunteered		= new ActionProcessVolunteered();
 	
-	public IAction ActionProcessFailure		= new ActionProcessFailure();
-	public IAction ActionProcessPreempt		= new ActionProcessPreempt();
+	public IAction ActionInvestmentReset		= new ActionInvestmentReset();
 	
-	public IAction ActionAckTimeout			= new ActionAckTimeout();
-	public IAction ActionEndTimeout			= new ActionEndTimeout();
+	public IAction ActionAckTimeout				= new ActionAckTimeout();
+	public IAction ActionEndTimeout				= new ActionEndTimeout();
 	
-	public IAction ActionIgnore 			= new ActionIgnore();
-	public IAction ActionError				= new ActionError();
+	public IAction ActionIgnore 				= new ActionIgnore();
+	public IAction ActionError					= new ActionError();
 	
 	public WiFsm() throws FsmException {
 		super();
@@ -99,9 +104,11 @@ public class WiFsm extends Fsm {
 		add(Start, Ack_Request, ActionError, Start);
 		add(Start, End_Request, ActionError, Start);
 		add(Start, Process_Preempt, ActionIgnore, Start);
+		add(Start, Process_Volunteered, ActionIgnore, Start);
 		add(Start, Process_Failure, ActionIgnore, Start);
 		add(Start, Ack_Timer_Pop, ActionIgnore, Start);
 		add(Start, End_Timer_Pop, ActionIgnore, Start);
+		add(Start, Investment_Reset, ActionIgnore, Start);
 		
 		add(Get_Pending, Get_Request, ActionGetRedux, Get_Pending);
 		add(Get_Pending, CAS_Available, ActionSend, CAS_Send);
@@ -109,9 +116,11 @@ public class WiFsm extends Fsm {
 		add(Get_Pending, Ack_Request, ActionError, Get_Pending);
 		add(Get_Pending, End_Request, ActionError, Get_Pending);
 		add(Get_Pending, Process_Preempt, ActionProcessPreempt, Start);
+		add(Get_Pending, Process_Volunteered, ActionProcessVolunteered, Start);
 		add(Get_Pending, Process_Failure, ActionProcessFailure, Start);
 		add(Get_Pending, Ack_Timer_Pop, ActionIgnore, Get_Pending);
 		add(Get_Pending, End_Timer_Pop, ActionIgnore, Get_Pending);
+		add(Get_Pending, Investment_Reset, ActionIgnore, Get_Pending);
 		
 		add(CAS_Send, Get_Request, ActionGetRedux, Get_Pending);
 		add(CAS_Send, CAS_Available, ActionIgnore, CAS_Send);
@@ -119,9 +128,11 @@ public class WiFsm extends Fsm {
 		add(CAS_Send, Ack_Request, ActionAck, CAS_Active);
 		add(CAS_Send, End_Request, ActionError, CAS_Send);
 		add(CAS_Send, Process_Preempt, ActionProcessPreempt, Start);
+		add(CAS_Send, Process_Volunteered, ActionProcessVolunteered, Start);
 		add(CAS_Send, Process_Failure, ActionProcessFailure, Start);
 		add(CAS_Send, Ack_Timer_Pop, ActionAckTimeout, Start);
 		add(CAS_Send, End_Timer_Pop, ActionIgnore, CAS_Send);
+		add(CAS_Send, Investment_Reset, ActionIgnore, CAS_Send);
 		
 		add(CAS_Active, Get_Request, ActionGetRedux, Get_Pending);
 		add(CAS_Active, CAS_Available, ActionIgnore, CAS_Active);
@@ -129,9 +140,11 @@ public class WiFsm extends Fsm {
 		add(CAS_Active, Ack_Request, ActionAckRedux, CAS_Active);
 		add(CAS_Active, End_Request, ActionEnd, Start);
 		add(CAS_Active, Process_Preempt, ActionProcessPreempt, Start);
+		add(CAS_Active, Process_Volunteered, ActionProcessVolunteered, Start);
 		add(CAS_Active, Process_Failure, ActionProcessFailure, Start);
 		add(CAS_Active, Ack_Timer_Pop, ActionIgnore, CAS_Active);
 		add(CAS_Active, End_Timer_Pop, ActionEndTimeout, Start);
+		add(CAS_Active, Investment_Reset, ActionInvestmentReset, CAS_Active);
 		
 		MessageBuffer mb2 = new MessageBuffer();
 		mb2.append(Standardize.Label.exit.name());

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java Thu Feb 26 15:26:37 2015
@@ -31,10 +31,12 @@ public interface IMessageHandler {
 	
 	public void handleProcessDown(IProcessInfo processInfo);
 	public void handleProcessPreempt(IProcessInfo processInfo);
+	public void handleProcessVolunteered(IProcessInfo processInfo);
 	public void handleProcessFailedInitialization(IProcessInfo processInfo);
 	
 	public void handleMetaCasTransation(IMetaCasTransaction trans);
 	
 	public void incGets();
 	public void incAcks();
+	public void incInvestmentResets();
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Thu Feb 26 15:26:37 2015
@@ -67,6 +67,7 @@ public class MessageHandler implements I
 	
 	private AtomicInteger gets = new AtomicInteger(0);
 	private AtomicInteger acks = new AtomicInteger(0);
+	private AtomicInteger investmentResets = new AtomicInteger(0);
 	
 	private ConcurrentHashMap<String,String> failedInitializationMap = new ConcurrentHashMap<String,String>();
 	
@@ -77,13 +78,20 @@ public class MessageHandler implements I
 	public MessageHandler() {
 	}
 	
+	@Override
 	public void incGets() {
 		gets.incrementAndGet();
 	}
 	
+	@Override
 	public void incAcks() {
 		acks.incrementAndGet();
 	}
+
+	@Override
+	public void incInvestmentResets() {
+		investmentResets.incrementAndGet();
+	}
 	
 	//
 	
@@ -134,6 +142,7 @@ public class MessageHandler implements I
 			oi.setWorkItemCrFetches(cms.getCrGets());
 			oi.setWorkItemJpGets(gets.get());
 			oi.setWorkItemJpAcks(acks.get());
+			oi.setWorkItemJpInvestmentResets(investmentResets.get());
 			oi.setWorkItemEndSuccesses(cms.getEndSuccess());
 			oi.setWorkItemEndFailures(cms.getEndFailure());
 			oi.setWorkItemEndRetrys(cms.getEndRetry());
@@ -286,6 +295,40 @@ public class MessageHandler implements I
 		}
 	}
 	
+	@Override
+	public void handleProcessVolunteered(IProcessInfo processInfo) {
+		String location = "handleProcessVolunteered";
+		try {
+			MessageBuffer mb = new MessageBuffer();
+			mb.append(Standardize.Label.node.get()+processInfo.getNodeName());
+			mb.append(Standardize.Label.ip.get()+processInfo.getNodeAddress());
+			mb.append(Standardize.Label.pid.get()+processInfo.getPid());
+			logger.trace(location, ILogger.null_id, mb.toString());
+			ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
+			for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
+				IRemoteWorkerThread rwt = entry.getKey();
+				if(rwt.comprises(processInfo)) {
+					RemoteWorkerProcess rwp = new RemoteWorkerProcess(rwt);
+					processBlacklist(processInfo, rwp);
+					IWorkItem wi = entry.getValue();
+					IFsm fsm = wi.getFsm();
+					IEvent event = WiFsm.Process_Volunteered;
+					Object actionData = new ActionData(wi, rwt, null);
+					fsm.transition(event, actionData);
+				}
+				else {
+					MessageBuffer mb1 = new MessageBuffer();
+					mb1.append(Standardize.Label.remote.get()+rwt.toString());
+					mb1.append(Standardize.Label.status.get()+"unaffected");
+					logger.trace(location, ILogger.null_id, mb1.toString());
+				}
+			}
+		}
+		catch(Exception e) {
+			logger.error(location, ILogger.null_id, e);
+		}
+	}
+	
 	private void block(IRemoteWorkerThread rwt) {
 		String location = "block";
 		if(rwt != null) {
@@ -408,6 +451,9 @@ public class MessageHandler implements I
 			case End:
 				handleMetaCasTransationEnd(trans, rwt);
 				break;
+			case InvestmentReset:
+				handleMetaCasTransationInvestmentReset(trans, rwt);
+				break;
 			default:
 				break;
 			}
@@ -529,5 +575,27 @@ public class MessageHandler implements I
 			logger.debug(location, ILogger.null_id, mb.toString());
 		}
 	}
+	
+	private void handleMetaCasTransationInvestmentReset(IMetaCasTransaction trans, IRemoteWorkerThread rwt) {
+		String location = "handleMetaCasTransationInvestmentReset";
+		IWorkItem wi = find(rwt);
+		if(wi == null) {
+			MessageBuffer mb = new MessageBuffer();
+			mb.append(Standardize.Label.remote.get()+rwt.toString());
+			mb.append("has no work assigned presently");
+			logger.debug(location, ILogger.null_id, mb.toString());
+		}
+		else {
+			update(wi, trans.getMetaCas());
+			IFsm fsm = wi.getFsm();
+			IEvent event = WiFsm.Investment_Reset;
+			Object actionData = new ActionData(wi, rwt, trans);
+			fsm.transition(event, actionData);
+			MessageBuffer mb = new MessageBuffer();
+			mb.append(Standardize.Label.remote.get()+rwt.toString());
+			mb.append("investment reset");
+			logger.debug(location, ILogger.null_id, mb.toString());
+		}
+	}
 
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java Thu Feb 26 15:26:37 2015
@@ -54,6 +54,9 @@ public interface IOperatingInfo extends
 	public void setWorkItemJpAcks(int value);
 	public int getWorkItemJpAcks();
 	
+	public void setWorkItemJpInvestmentResets(int value);
+	public int getWorkItemJpInvestmentResets();
+	
 	public void setWorkItemEndSuccesses(int value);
 	public int getWorkItemEndSuccesses();
 	

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java Thu Feb 26 15:26:37 2015
@@ -31,4 +31,7 @@ public interface IWorkItemInfo extends I
 	
 	public long getOperatingMillis();
 	public void setOperatingMillis(long value);
+	
+	public long getInvestmentMillis();
+	public void setInvestmentMillis(long value);
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java Thu Feb 26 15:26:37 2015
@@ -39,6 +39,7 @@ public class OperatingInfo implements IO
 	private int crFetches = 0;
 	private int jpGets = 0;
 	private int jpAcks = 0;
+	private int jpInvestmentResets = 0;
 	private int jpEndSuccesses = 0;
 	private int jpEndFailures = 0;
 	private int jpEndRetrys = 0;
@@ -143,6 +144,16 @@ public class OperatingInfo implements IO
 	}
 
 	@Override
+	public void setWorkItemJpInvestmentResets(int value) {
+		jpInvestmentResets = value;
+	}
+
+	@Override
+	public int getWorkItemJpInvestmentResets() {
+		return jpInvestmentResets;
+	}
+	
+	@Override
 	public void setWorkItemEndSuccesses(int value) {
 		jpEndSuccesses = value;
 	}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java Thu Feb 26 15:26:37 2015
@@ -30,6 +30,7 @@ public class WorkItemInfo implements IWo
 	int pid = 0;
 	int tid = 0;
 	long operatingMillis = 0;
+	long investmentMillis = 0;
 	int seqNo = 0;
 	
 	@Override
@@ -93,6 +94,16 @@ public class WorkItemInfo implements IWo
 	}
 
 	@Override
+	public long getInvestmentMillis() {
+		return investmentMillis;
+	}
+
+	@Override
+	public void setInvestmentMillis(long value) {
+		investmentMillis = value;
+	}
+	
+	@Override
 	public int getSeqNo() {
 		return seqNo;
 	}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java Thu Feb 26 15:26:37 2015
@@ -42,9 +42,15 @@ public interface IWorkItem {
 	public void resetTodAck();
 	public long getTodAck();
 	
+	
+	public void setTodInvestment();
+	public void resetTodInvestment();
+	public long getTodInvestment();
+	
 	public void setTodEnd();
 	public void resetTodEnd();
 	public long getTodEnd();
 	
 	public long getMillisOperating();
+	public long getMillisInvestment();
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java Thu Feb 26 15:26:37 2015
@@ -33,6 +33,8 @@ public class WorkItem implements IWorkIt
 	private Tod todAck = new Tod();
 	private Tod todEnd = new Tod();
 	
+	private Tod todInvestment = new Tod();
+	
 	public WorkItem(IMetaCas metaCas, IFsm fsm) {
 		setMetaCas(metaCas);
 		setFsm(fsm);
@@ -94,11 +96,13 @@ public class WorkItem implements IWorkIt
 	@Override
 	public void setTodAck() {
 		todAck.set();
+		setTodInvestment();
 	}
 	
 	@Override
 	public void resetTodAck() {
 		todAck.reset();
+		resetTodInvestment();
 	}
 
 	@Override
@@ -107,6 +111,21 @@ public class WorkItem implements IWorkIt
 	}
 
 	@Override
+	public void setTodInvestment() {
+		todInvestment.set();
+	}
+
+	@Override
+	public void resetTodInvestment() {
+		todInvestment.reset();
+	}
+
+	@Override
+	public long getTodInvestment() {
+		return todInvestment.get();
+	}
+	
+	@Override
 	public void setTodEnd() {
 		todEnd.set();
 	}
@@ -143,6 +162,18 @@ public class WorkItem implements IWorkIt
 	}
 
 	@Override
+	public long getMillisInvestment() {
+		long retVal = 0;
+		IState state = fsm.getStateCurrent();
+		if(state.getName().equals(WiFsm.CAS_Active.getName())) {
+			long now = System.currentTimeMillis();
+			retVal = now - getTodInvestment();
+		}
+		Assertion.nonNegative(retVal);
+		return retVal;
+	}
+	
+	@Override
 	public int getSeqNo() {
 		int retVal = 0;
 		try {
@@ -153,4 +184,5 @@ public class WorkItem implements IWorkIt
 		}
 		return retVal;
 	}
+
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java Thu Feb 26 15:26:37 2015
@@ -25,7 +25,7 @@ import org.apache.uima.ducc.container.ne
 
 public interface IMetaCasTransaction extends IMetaCasProvider, IMetaCasRequester, Serializable {
 
-	public enum Type { Get, Ack, End };
+	public enum Type { Get, Ack, End , InvestmentReset };
 	
 	public Type getType();
 	public void setType(Type value);