You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/12/24 21:30:28 UTC
svn commit: r1647840 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd:
classload/ fsm/wi/ mh/
Author: degenaro
Date: Wed Dec 24 20:30:28 2014
New Revision: 1647840
URL: http://svn.apache.org/r1647840
Log:
UIMA-4069 DUCC Job Driver (JD) system classpath
handle Process Down (part 1, all work items retried regardless)
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java
- copied, changed from r1647818, uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java
Removed:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java?rev=1647840&r1=1647839&r2=1647840&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java Wed Dec 24 20:30:28 2014
@@ -124,6 +124,7 @@ public class ProxyJobDriverErrorHandler
return retVal;
}
+ // Failed work item
public ProxyJobDriverDirective handle(String serializedCAS, String serializedException) throws JobDriverException {
String location = "handle";
ProxyJobDriverDirective retVal = new ProxyJobDriverDirective();
@@ -138,6 +139,23 @@ public class ProxyJobDriverErrorHandler
retVal = new ProxyJobDriverDirective(isKillJob, isKillProcess, isKillWorkItem);
}
catch (Exception e) {
+ logger.error(location, ILogger.null_id, e);
+ }
+ return retVal;
+ }
+
+ // Failed process
+ public ProxyJobDriverDirective handle(String serializedCAS) throws JobDriverException {
+ String location = "handle";
+ ProxyJobDriverDirective retVal = new ProxyJobDriverDirective();
+ try {
+ //TODO
+ boolean isKillJob = false;
+ boolean isKillProcess = false;
+ boolean isKillWorkItem = false;
+ retVal = new ProxyJobDriverDirective(isKillJob, isKillProcess, isKillWorkItem);
+ }
+ catch (Exception e) {
logger.error(location, ILogger.null_id, e);
}
return retVal;
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java?rev=1647840&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java Wed Dec 24 20:30:28 2014
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.fsm.wi;
+
+import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.MetaCasHelper;
+import org.apache.uima.ducc.container.common.Standardize;
+import org.apache.uima.ducc.container.common.fsm.iface.IAction;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverHelper;
+import org.apache.uima.ducc.container.jd.cas.CasManager;
+import org.apache.uima.ducc.container.jd.cas.CasManagerStats.RetryReason;
+import org.apache.uima.ducc.container.jd.classload.ProxyJobDriverDirective;
+import org.apache.uima.ducc.container.jd.classload.ProxyJobDriverErrorHandler;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
+import org.apache.uima.ducc.container.jd.wi.IProcessStatistics;
+import org.apache.uima.ducc.container.jd.wi.IWorkItem;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+
+public class ActionProcessFailure implements IAction {
+
+ private static Logger logger = Logger.getLogger(ActionProcessFailure.class, IComponent.Id.JD.name());
+
+ public ActionProcessFailure() {
+ super();
+ }
+
+ @Override
+ public String getName() {
+ return ActionProcessFailure.class.getName();
+ }
+
+ private void retryWorkItem(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {
+ String location = "retryWorkItem";
+
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.seqNo.get()+metaCas.getSystemKey());
+ mb.append(Standardize.Label.remote.get()+rwp.toString());
+ logger.info(location, ILogger.null_id, mb.toString());
+ //
+ cm.putMetaCas(metaCas, RetryReason.ProcessDown);
+ cm.getCasManagerStats().incEndRetry();
+ JobDriver jd = JobDriver.getInstance();
+ JobDriverHelper jdh = JobDriverHelper.getInstance();
+ IWorkItemStateKeeper wisk = jd.getWorkItemStateKeeper();
+ MetaCasHelper metaCasHelper = new MetaCasHelper(metaCas);
+ IProcessStatistics pStats = jdh.getProcessStatistics(rwp);
+ int seqNo = metaCasHelper.getSystemKey();
+ wisk.retry(seqNo);
+ pStats.retry(wi);
+ wi.resetTods();
+ }
+
+ private void killWorkItem(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {
+ //TODO
+ }
+
+ private void killJob(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {
+ //TODO
+ }
+
+ @Override
+ public void engage(Object objectData) {
+ String location = "engage";
+ logger.debug(location, ILogger.null_id, "");
+ IActionData actionData = (IActionData) objectData;
+ try {
+ IWorkItem wi = actionData.getWorkItem();
+ IMetaCas metaCas = wi.getMetaCas();
+ JobDriver jd = JobDriver.getInstance();
+ CasManager cm = jd.getCasManager();
+ JobDriverHelper jdh = JobDriverHelper.getInstance();
+ IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
+ if(rwp != null) {
+ if(metaCas != null) {
+ String serializedCas = (String) metaCas.getUserSpaceCas();
+ ProxyJobDriverErrorHandler pjdeh = jd.getProxyJobDriverErrorHandler();
+ ProxyJobDriverDirective pjdd = pjdeh.handle(serializedCas);
+ if(pjdd != null) {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.isKillJob.get()+pjdd.isKillJob());
+ mb.append(Standardize.Label.isKillProcess.get()+pjdd.isKillProcess());
+ mb.append(Standardize.Label.isKillWorkItem.get()+pjdd.isKillWorkItem());
+ logger.info(location, ILogger.null_id, mb.toString());
+ if(pjdd.isKillJob()) {
+ killJob(cm, wi, metaCas, rwp);
+ }
+ else if(pjdd.isKillWorkItem()) {
+ killWorkItem(cm, wi, metaCas, rwp);
+ }
+ else {
+ retryWorkItem(cm, wi, metaCas, rwp);
+ }
+ }
+ else {
+ retryWorkItem(cm, wi, metaCas, rwp);
+ }
+ }
+ else {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append("No CAS found for processing");
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+ }
+ else {
+ MessageBuffer mb = new MessageBuffer();
+ mb.append("No remote worker process entry found for processing");
+ logger.info(location, ILogger.null_id, mb.toString());
+ }
+ }
+ catch(Exception e) {
+ logger.error(location, ILogger.null_id, e);
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java (from r1647818, uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java)
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java?p2=uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java&p1=uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java&r1=1647818&r2=1647840&rev=1647840&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java Wed Dec 24 20:30:28 2014
@@ -35,13 +35,13 @@ import org.apache.uima.ducc.container.jd
import org.apache.uima.ducc.container.jd.wi.IWorkItem;
import org.apache.uima.ducc.container.net.iface.IMetaCas;
-public class ActionPreempt implements IAction {
+public class ActionProcessPreempt implements IAction {
- private static Logger logger = Logger.getLogger(ActionPreempt.class, IComponent.Id.JD.name());
+ private static Logger logger = Logger.getLogger(ActionProcessPreempt.class, IComponent.Id.JD.name());
@Override
public String getName() {
- return ActionPreempt.class.getName();
+ return ActionProcessPreempt.class.getName();
}
private void preemptWorkItem(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java?rev=1647840&r1=1647839&r2=1647840&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java Wed Dec 24 20:30:28 2014
@@ -58,7 +58,8 @@ public class WiFsm extends Fsm {
public IAction ActionEnd = new ActionEnd();
public IAction ActionInProgress = new ActionInProgress();
- public IAction ActionPreempt = new ActionPreempt();
+ public IAction ActionProcessFailure = new ActionProcessFailure();
+ public IAction ActionProcessPreempt = new ActionProcessPreempt();
public IAction ActionIgnore = new ActionIgnore();
public IAction ActionError = new ActionError();
@@ -89,25 +90,29 @@ public class WiFsm extends Fsm {
add(Start, CAS_Unavailable, ActionIgnore, Start);
add(Start, Ack_Request, ActionError, Start);
add(Start, Process_Preempt, ActionIgnore, Start);
+ add(Start, Process_Failure, ActionIgnore, Start);
add(Get_Pending, Get_Request, ActionInProgress, Get_Pending);
add(Get_Pending, CAS_Available, ActionSend, CAS_Send);
add(Get_Pending, CAS_Unavailable, ActionSend, Start);
add(Get_Pending, Ack_Request, ActionError, Get_Pending);
- add(Get_Pending, Process_Preempt, ActionPreempt, Start);
+ add(Get_Pending, Process_Preempt, ActionProcessPreempt, Start);
+ add(Get_Pending, Process_Failure, ActionProcessFailure, Start);
add(CAS_Send, Get_Request, ActionInProgress, CAS_Send);
add(CAS_Send, CAS_Available, ActionIgnore, CAS_Send);
add(CAS_Send, CAS_Unavailable, ActionIgnore, CAS_Send);
add(CAS_Send, Ack_Request, ActionAck, CAS_Active);
- add(CAS_Send, Process_Preempt, ActionPreempt, Start);
+ add(CAS_Send, Process_Preempt, ActionProcessPreempt, Start);
+ add(CAS_Send, Process_Failure, ActionProcessFailure, Start);
add(CAS_Active, Get_Request, ActionError, CAS_Active);
add(CAS_Active, CAS_Available, ActionIgnore, CAS_Active);
add(CAS_Active, CAS_Unavailable, ActionIgnore, CAS_Active);
add(CAS_Active, Ack_Request, ActionError, CAS_Active);
add(CAS_Active, End_Request, ActionEnd, Start);
- add(CAS_Active, Process_Preempt, ActionPreempt, Start);
+ add(CAS_Active, Process_Preempt, ActionProcessPreempt, Start);
+ add(CAS_Active, Process_Failure, ActionProcessFailure, Start);
MessageBuffer mb2 = new MessageBuffer();
mb2.append(Standardize.Label.exit.name());
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1647840&r1=1647839&r2=1647840&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Wed Dec 24 20:30:28 2014
@@ -153,7 +153,26 @@ public class MessageHandler implements I
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
logger.info(location, ILogger.null_id, mb.toString());
ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
- //TODO
+ for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
+ IRemoteWorkerThread rwt = entry.getKey();
+ if(rwt.comprises(processInfo)) {
+ MessageBuffer mb1 = new MessageBuffer();
+ mb1.append(Standardize.Label.remote.get()+rwt.toString());
+ mb1.append(Boolean.TRUE.toString());
+ logger.info(location, ILogger.null_id, mb1.toString());
+ IWorkItem wi = entry.getValue();
+ IFsm fsm = wi.getFsm();
+ IEvent event = WiFsm.Process_Failure;
+ Object actionData = new ActionData(wi, rwt, null);
+ fsm.transition(event, actionData);
+ }
+ else {
+ MessageBuffer mb2 = new MessageBuffer();
+ mb2.append(Standardize.Label.remote.get()+rwt.toString());
+ mb2.append(Boolean.FALSE.toString());
+ logger.info(location, ILogger.null_id, mb2.toString());
+ }
+ }
}
catch(Exception e) {
logger.error(location, ILogger.null_id, e);