You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/02/26 16:26:38 UTC
svn commit: r1662483 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container:
common/ jd/ jd/cas/ jd/fsm/wi/ jd/mh/ jd/mh/iface/ jd/mh/impl/ jd/wi/
net/iface/
Author: degenaro
Date: Thu Feb 26 15:26:37 2015
New Revision: 1662483
URL: http://svn.apache.org/r1662483
Log:
UIMA-4069 DUCC Job Driver (JD) system classpath
investment reset
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java (with props)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java Thu Feb 26 15:26:37 2015
@@ -98,6 +98,7 @@ public class Standardize {
killJob,
killProcess,
killWorkItem,
+ investmentMillis,
operatingMillis,
;
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/JobDriverHelper.java Thu Feb 26 15:26:37 2015
@@ -69,6 +69,7 @@ public class JobDriverHelper {
wii.setTid(rwt.getTid());
wii.setSeqNo(wi.getSeqNo());
wii.setOperatingMillis(wi.getMillisOperating());
+ wii.setInvestmentMillis(wi.getMillisInvestment());
MessageBuffer mb = new MessageBuffer();
mb.append(Standardize.Label.node.get()+wii.getNodeName());
mb.append(Standardize.Label.pid.get()+wii.getPid());
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java Thu Feb 26 15:26:37 2015
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.Atomi
public class CasManagerStats {
- public enum RetryReason { ProcessPreempt, ProcessDown, NodeDown, UserErrorRetry, TimeoutRetry };
+ public enum RetryReason { ProcessPreempt, ProcessVolunteered, ProcessDown, NodeDown, UserErrorRetry, TimeoutRetry };
private AtomicInteger crTotal = new AtomicInteger(0);
private AtomicInteger crGets = new AtomicInteger(0);
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java?rev=1662483&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java Thu Feb 26 15:26:37 2015
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.fsm.wi;
+
+import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.MetaCasHelper;
+import org.apache.uima.ducc.container.common.fsm.iface.IAction;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.log.LoggerHelper;
+import org.apache.uima.ducc.container.jd.wi.IWorkItem;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+
+public class ActionInvestmentReset implements IAction {
+
+ private static Logger logger = Logger.getLogger(ActionInvestmentReset.class, IComponent.Id.JD.name());
+
+ @Override
+ public String getName() {
+ return ActionInvestmentReset.class.getName();
+ }
+ @Override
+ public void engage(Object objectData) {
+ String location = "engage";
+ logger.trace(location, ILogger.null_id, "");
+ IActionData actionData = (IActionData) objectData;
+ try {
+ if(actionData != null) {
+ IWorkItem wi = actionData.getWorkItem();
+ IMetaCas metaCas = wi.getMetaCas();
+ JobDriver jd = JobDriver.getInstance();
+ IWorkItemStateKeeper wisk = jd.getWorkItemStateKeeper();
+ MetaCasHelper metaCasHelper = new MetaCasHelper(metaCas);
+ if(metaCas != null) {
+ int seqNo = metaCasHelper.getSystemKey();
+ wisk.investmentReset(seqNo);
+ //
+ wi.setTodInvestment();
+ MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+ JobDriver.getInstance().getMessageHandler().incInvestmentResets();
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+ else {
+ MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+ mb.append("No CAS found for processing");
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+ }
+ else {
+ MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+ mb.append("No action data found for processing");
+ logger.warn(location, ILogger.null_id, mb.toString());
+ }
+ }
+ catch(Exception e) {
+ logger.error(location, ILogger.null_id, e);
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionInvestmentReset.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java?rev=1662483&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java Thu Feb 26 15:26:37 2015
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.fsm.wi;
+
+import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.MetaCasHelper;
+import org.apache.uima.ducc.container.common.fsm.iface.IAction;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverHelper;
+import org.apache.uima.ducc.container.jd.cas.CasManager;
+import org.apache.uima.ducc.container.jd.cas.CasManagerStats.RetryReason;
+import org.apache.uima.ducc.container.jd.log.LoggerHelper;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
+import org.apache.uima.ducc.container.jd.wi.IProcessStatistics;
+import org.apache.uima.ducc.container.jd.wi.IWorkItem;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+
+public class ActionProcessVolunteered extends Action implements IAction {
+
+ private static Logger logger = Logger.getLogger(ActionProcessVolunteered.class, IComponent.Id.JD.name());
+
+ @Override
+ public String getName() {
+ return ActionProcessVolunteered.class.getName();
+ }
+
+ private void preemptWorkItem(IActionData actionData, CasManager cm, IMetaCas metaCas) {
+ String location = "preemptWorkItem";
+ cm.putMetaCas(metaCas, RetryReason.ProcessVolunteered);
+ cm.getCasManagerStats().incEndRetry();
+ MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+
+ @Override
+ public void engage(Object objectData) {
+ String location = "engage";
+ logger.trace(location, ILogger.null_id, "");
+ IActionData actionData = (IActionData) objectData;
+ try {
+ if(actionData != null) {
+ IWorkItem wi = actionData.getWorkItem();
+ IMetaCas metaCas = wi.getMetaCas();
+ JobDriver jd = JobDriver.getInstance();
+ CasManager cm = jd.getCasManager();
+ JobDriverHelper jdh = JobDriverHelper.getInstance();
+ IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
+ if(rwp != null) {
+ if(metaCas != null) {
+ preemptWorkItem(actionData, cm, metaCas);
+ IWorkItemStateKeeper wisk = jd.getWorkItemStateKeeper();
+ MetaCasHelper metaCasHelper = new MetaCasHelper(metaCas);
+ IProcessStatistics pStats = jdh.getProcessStatistics(rwp);
+ int seqNo = metaCasHelper.getSystemKey();
+ wisk.preempt(seqNo);
+ pStats.preempt(wi);
+ displayProcessStatistics(logger, actionData, wi, pStats);
+ wi.reset();
+ }
+ else {
+ MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+ mb.append("No CAS found for processing");
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+ }
+ else {
+ MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+ mb.append("No remote worker process entry found for processing");
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+ }
+ else {
+ MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+ mb.append("No action data found for processing");
+ logger.warn(location, ILogger.null_id, mb.toString());
+ }
+ }
+ catch(Exception e) {
+ logger.error(location, ILogger.null_id, e);
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessVolunteered.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java Thu Feb 26 15:26:37 2015
@@ -39,38 +39,43 @@ public class WiFsm extends Fsm {
private static Logger logger = Logger.getLogger(WiFsm.class, IComponent.Id.JD.name());
- public static IState Start = new State("Start");
- public static IState Get_Pending = new State("Get_Pending");
- public static IState CAS_Send = new State("CAS_Send");
- public static IState CAS_Active = new State("CAS_Active");
-
- public static IEvent Get_Request = new Event("Get_Request");
- public static IEvent CAS_Available = new Event("CAS_Available");
- public static IEvent CAS_Unavailable = new Event("CAS_Unavailable");
- public static IEvent Ack_Request = new Event("Ack_Request");
- public static IEvent Send_Failure = new Event("Send_Failure");
- public static IEvent Ack_Timer_Pop = new Event("Ack_Timer_Pop");
- public static IEvent End_Request = new Event("End_Request");
- public static IEvent End_Timer_Pop = new Event("End_Timer_Pop");
- public static IEvent Host_Failure = new Event("Host_Failure");
- public static IEvent Process_Failure = new Event("Process_Failure");
- public static IEvent Process_Preempt = new Event("Process_Premept");
-
- public IAction ActionGet = new ActionGet();
- public IAction ActionGetRedux = new ActionGetRedux();
- public IAction ActionSend = new ActionSend();
- public IAction ActionAck = new ActionAck();
- public IAction ActionAckRedux = new ActionAckRedux();
- public IAction ActionEnd = new ActionEnd();
+ public static IState Start = new State("Start");
+ public static IState Get_Pending = new State("Get_Pending");
+ public static IState CAS_Send = new State("CAS_Send");
+ public static IState CAS_Active = new State("CAS_Active");
+
+ public static IEvent Get_Request = new Event("Get_Request");
+ public static IEvent CAS_Available = new Event("CAS_Available");
+ public static IEvent CAS_Unavailable = new Event("CAS_Unavailable");
+ public static IEvent Ack_Request = new Event("Ack_Request");
+ public static IEvent Send_Failure = new Event("Send_Failure");
+ public static IEvent Ack_Timer_Pop = new Event("Ack_Timer_Pop");
+ public static IEvent End_Request = new Event("End_Request");
+ public static IEvent End_Timer_Pop = new Event("End_Timer_Pop");
+ public static IEvent Host_Failure = new Event("Host_Failure");
+ public static IEvent Process_Failure = new Event("Process_Failure");
+ public static IEvent Process_Preempt = new Event("Process_Premept");
+ public static IEvent Process_Volunteered = new Event("Process_Volunteered");
+ public static IEvent Investment_Reset = new Event("Investment_Reset");
+
+ public IAction ActionGet = new ActionGet();
+ public IAction ActionGetRedux = new ActionGetRedux();
+ public IAction ActionSend = new ActionSend();
+ public IAction ActionAck = new ActionAck();
+ public IAction ActionAckRedux = new ActionAckRedux();
+ public IAction ActionEnd = new ActionEnd();
+
+ public IAction ActionProcessFailure = new ActionProcessFailure();
+ public IAction ActionProcessPreempt = new ActionProcessPreempt();
+ public IAction ActionProcessVolunteered = new ActionProcessVolunteered();
- public IAction ActionProcessFailure = new ActionProcessFailure();
- public IAction ActionProcessPreempt = new ActionProcessPreempt();
+ public IAction ActionInvestmentReset = new ActionInvestmentReset();
- public IAction ActionAckTimeout = new ActionAckTimeout();
- public IAction ActionEndTimeout = new ActionEndTimeout();
+ public IAction ActionAckTimeout = new ActionAckTimeout();
+ public IAction ActionEndTimeout = new ActionEndTimeout();
- public IAction ActionIgnore = new ActionIgnore();
- public IAction ActionError = new ActionError();
+ public IAction ActionIgnore = new ActionIgnore();
+ public IAction ActionError = new ActionError();
public WiFsm() throws FsmException {
super();
@@ -99,9 +104,11 @@ public class WiFsm extends Fsm {
add(Start, Ack_Request, ActionError, Start);
add(Start, End_Request, ActionError, Start);
add(Start, Process_Preempt, ActionIgnore, Start);
+ add(Start, Process_Volunteered, ActionIgnore, Start);
add(Start, Process_Failure, ActionIgnore, Start);
add(Start, Ack_Timer_Pop, ActionIgnore, Start);
add(Start, End_Timer_Pop, ActionIgnore, Start);
+ add(Start, Investment_Reset, ActionIgnore, Start);
add(Get_Pending, Get_Request, ActionGetRedux, Get_Pending);
add(Get_Pending, CAS_Available, ActionSend, CAS_Send);
@@ -109,9 +116,11 @@ public class WiFsm extends Fsm {
add(Get_Pending, Ack_Request, ActionError, Get_Pending);
add(Get_Pending, End_Request, ActionError, Get_Pending);
add(Get_Pending, Process_Preempt, ActionProcessPreempt, Start);
+ add(Get_Pending, Process_Volunteered, ActionProcessVolunteered, Start);
add(Get_Pending, Process_Failure, ActionProcessFailure, Start);
add(Get_Pending, Ack_Timer_Pop, ActionIgnore, Get_Pending);
add(Get_Pending, End_Timer_Pop, ActionIgnore, Get_Pending);
+ add(Get_Pending, Investment_Reset, ActionIgnore, Get_Pending);
add(CAS_Send, Get_Request, ActionGetRedux, Get_Pending);
add(CAS_Send, CAS_Available, ActionIgnore, CAS_Send);
@@ -119,9 +128,11 @@ public class WiFsm extends Fsm {
add(CAS_Send, Ack_Request, ActionAck, CAS_Active);
add(CAS_Send, End_Request, ActionError, CAS_Send);
add(CAS_Send, Process_Preempt, ActionProcessPreempt, Start);
+ add(CAS_Send, Process_Volunteered, ActionProcessVolunteered, Start);
add(CAS_Send, Process_Failure, ActionProcessFailure, Start);
add(CAS_Send, Ack_Timer_Pop, ActionAckTimeout, Start);
add(CAS_Send, End_Timer_Pop, ActionIgnore, CAS_Send);
+ add(CAS_Send, Investment_Reset, ActionIgnore, CAS_Send);
add(CAS_Active, Get_Request, ActionGetRedux, Get_Pending);
add(CAS_Active, CAS_Available, ActionIgnore, CAS_Active);
@@ -129,9 +140,11 @@ public class WiFsm extends Fsm {
add(CAS_Active, Ack_Request, ActionAckRedux, CAS_Active);
add(CAS_Active, End_Request, ActionEnd, Start);
add(CAS_Active, Process_Preempt, ActionProcessPreempt, Start);
+ add(CAS_Active, Process_Volunteered, ActionProcessVolunteered, Start);
add(CAS_Active, Process_Failure, ActionProcessFailure, Start);
add(CAS_Active, Ack_Timer_Pop, ActionIgnore, CAS_Active);
add(CAS_Active, End_Timer_Pop, ActionEndTimeout, Start);
+ add(CAS_Active, Investment_Reset, ActionInvestmentReset, CAS_Active);
MessageBuffer mb2 = new MessageBuffer();
mb2.append(Standardize.Label.exit.name());
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/IMessageHandler.java Thu Feb 26 15:26:37 2015
@@ -31,10 +31,12 @@ public interface IMessageHandler {
public void handleProcessDown(IProcessInfo processInfo);
public void handleProcessPreempt(IProcessInfo processInfo);
+ public void handleProcessVolunteered(IProcessInfo processInfo);
public void handleProcessFailedInitialization(IProcessInfo processInfo);
public void handleMetaCasTransation(IMetaCasTransaction trans);
public void incGets();
public void incAcks();
+ public void incInvestmentResets();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Thu Feb 26 15:26:37 2015
@@ -67,6 +67,7 @@ public class MessageHandler implements I
private AtomicInteger gets = new AtomicInteger(0);
private AtomicInteger acks = new AtomicInteger(0);
+ private AtomicInteger investmentResets = new AtomicInteger(0);
private ConcurrentHashMap<String,String> failedInitializationMap = new ConcurrentHashMap<String,String>();
@@ -77,13 +78,20 @@ public class MessageHandler implements I
public MessageHandler() {
}
+ @Override
public void incGets() {
gets.incrementAndGet();
}
+ @Override
public void incAcks() {
acks.incrementAndGet();
}
+
+ @Override
+ public void incInvestmentResets() {
+ investmentResets.incrementAndGet();
+ }
//
@@ -134,6 +142,7 @@ public class MessageHandler implements I
oi.setWorkItemCrFetches(cms.getCrGets());
oi.setWorkItemJpGets(gets.get());
oi.setWorkItemJpAcks(acks.get());
+ oi.setWorkItemJpInvestmentResets(investmentResets.get());
oi.setWorkItemEndSuccesses(cms.getEndSuccess());
oi.setWorkItemEndFailures(cms.getEndFailure());
oi.setWorkItemEndRetrys(cms.getEndRetry());
@@ -286,6 +295,40 @@ public class MessageHandler implements I
}
}
+ @Override
+ public void handleProcessVolunteered(IProcessInfo processInfo) {
+ String location = "handleProcessVolunteered";
+ try {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+processInfo.getNodeName());
+ mb.append(Standardize.Label.ip.get()+processInfo.getNodeAddress());
+ mb.append(Standardize.Label.pid.get()+processInfo.getPid());
+ logger.trace(location, ILogger.null_id, mb.toString());
+ ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
+ for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
+ IRemoteWorkerThread rwt = entry.getKey();
+ if(rwt.comprises(processInfo)) {
+ RemoteWorkerProcess rwp = new RemoteWorkerProcess(rwt);
+ processBlacklist(processInfo, rwp);
+ IWorkItem wi = entry.getValue();
+ IFsm fsm = wi.getFsm();
+ IEvent event = WiFsm.Process_Volunteered;
+ Object actionData = new ActionData(wi, rwt, null);
+ fsm.transition(event, actionData);
+ }
+ else {
+ MessageBuffer mb1 = new MessageBuffer();
+ mb1.append(Standardize.Label.remote.get()+rwt.toString());
+ mb1.append(Standardize.Label.status.get()+"unaffected");
+ logger.trace(location, ILogger.null_id, mb1.toString());
+ }
+ }
+ }
+ catch(Exception e) {
+ logger.error(location, ILogger.null_id, e);
+ }
+ }
+
private void block(IRemoteWorkerThread rwt) {
String location = "block";
if(rwt != null) {
@@ -408,6 +451,9 @@ public class MessageHandler implements I
case End:
handleMetaCasTransationEnd(trans, rwt);
break;
+ case InvestmentReset:
+ handleMetaCasTransationInvestmentReset(trans, rwt);
+ break;
default:
break;
}
@@ -529,5 +575,27 @@ public class MessageHandler implements I
logger.debug(location, ILogger.null_id, mb.toString());
}
}
+
+ private void handleMetaCasTransationInvestmentReset(IMetaCasTransaction trans, IRemoteWorkerThread rwt) {
+ String location = "handleMetaCasTransationInvestmentReset";
+ IWorkItem wi = find(rwt);
+ if(wi == null) {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwt.toString());
+ mb.append("has no work assigned presently");
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ else {
+ update(wi, trans.getMetaCas());
+ IFsm fsm = wi.getFsm();
+ IEvent event = WiFsm.Investment_Reset;
+ Object actionData = new ActionData(wi, rwt, trans);
+ fsm.transition(event, actionData);
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get()+rwt.toString());
+ mb.append("investment reset");
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java Thu Feb 26 15:26:37 2015
@@ -54,6 +54,9 @@ public interface IOperatingInfo extends
public void setWorkItemJpAcks(int value);
public int getWorkItemJpAcks();
+ public void setWorkItemJpInvestmentResets(int value);
+ public int getWorkItemJpInvestmentResets();
+
public void setWorkItemEndSuccesses(int value);
public int getWorkItemEndSuccesses();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IWorkItemInfo.java Thu Feb 26 15:26:37 2015
@@ -31,4 +31,7 @@ public interface IWorkItemInfo extends I
public long getOperatingMillis();
public void setOperatingMillis(long value);
+
+ public long getInvestmentMillis();
+ public void setInvestmentMillis(long value);
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java Thu Feb 26 15:26:37 2015
@@ -39,6 +39,7 @@ public class OperatingInfo implements IO
private int crFetches = 0;
private int jpGets = 0;
private int jpAcks = 0;
+ private int jpInvestmentResets = 0;
private int jpEndSuccesses = 0;
private int jpEndFailures = 0;
private int jpEndRetrys = 0;
@@ -143,6 +144,16 @@ public class OperatingInfo implements IO
}
@Override
+ public void setWorkItemJpInvestmentResets(int value) {
+ jpInvestmentResets = value;
+ }
+
+ @Override
+ public int getWorkItemJpInvestmentResets() {
+ return jpInvestmentResets;
+ }
+
+ @Override
public void setWorkItemEndSuccesses(int value) {
jpEndSuccesses = value;
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/WorkItemInfo.java Thu Feb 26 15:26:37 2015
@@ -30,6 +30,7 @@ public class WorkItemInfo implements IWo
int pid = 0;
int tid = 0;
long operatingMillis = 0;
+ long investmentMillis = 0;
int seqNo = 0;
@Override
@@ -93,6 +94,16 @@ public class WorkItemInfo implements IWo
}
@Override
+ public long getInvestmentMillis() {
+ return investmentMillis;
+ }
+
+ @Override
+ public void setInvestmentMillis(long value) {
+ investmentMillis = value;
+ }
+
+ @Override
public int getSeqNo() {
return seqNo;
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/IWorkItem.java Thu Feb 26 15:26:37 2015
@@ -42,9 +42,15 @@ public interface IWorkItem {
public void resetTodAck();
public long getTodAck();
+
+ public void setTodInvestment();
+ public void resetTodInvestment();
+ public long getTodInvestment();
+
public void setTodEnd();
public void resetTodEnd();
public long getTodEnd();
public long getMillisOperating();
+ public long getMillisInvestment();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/wi/WorkItem.java Thu Feb 26 15:26:37 2015
@@ -33,6 +33,8 @@ public class WorkItem implements IWorkIt
private Tod todAck = new Tod();
private Tod todEnd = new Tod();
+ private Tod todInvestment = new Tod();
+
public WorkItem(IMetaCas metaCas, IFsm fsm) {
setMetaCas(metaCas);
setFsm(fsm);
@@ -94,11 +96,13 @@ public class WorkItem implements IWorkIt
@Override
public void setTodAck() {
todAck.set();
+ setTodInvestment();
}
@Override
public void resetTodAck() {
todAck.reset();
+ resetTodInvestment();
}
@Override
@@ -107,6 +111,21 @@ public class WorkItem implements IWorkIt
}
@Override
+ public void setTodInvestment() {
+ todInvestment.set();
+ }
+
+ @Override
+ public void resetTodInvestment() {
+ todInvestment.reset();
+ }
+
+ @Override
+ public long getTodInvestment() {
+ return todInvestment.get();
+ }
+
+ @Override
public void setTodEnd() {
todEnd.set();
}
@@ -143,6 +162,18 @@ public class WorkItem implements IWorkIt
}
@Override
+ public long getMillisInvestment() {
+ long retVal = 0;
+ IState state = fsm.getStateCurrent();
+ if(state.getName().equals(WiFsm.CAS_Active.getName())) {
+ long now = System.currentTimeMillis();
+ retVal = now - getTodInvestment();
+ }
+ Assertion.nonNegative(retVal);
+ return retVal;
+ }
+
+ @Override
public int getSeqNo() {
int retVal = 0;
try {
@@ -153,4 +184,5 @@ public class WorkItem implements IWorkIt
}
return retVal;
}
+
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java?rev=1662483&r1=1662482&r2=1662483&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java Thu Feb 26 15:26:37 2015
@@ -25,7 +25,7 @@ import org.apache.uima.ducc.container.ne
public interface IMetaCasTransaction extends IMetaCasProvider, IMetaCasRequester, Serializable {
- public enum Type { Get, Ack, End };
+ public enum Type { Get, Ack, End , InvestmentReset };
public Type getType();
public void setType(Type value);