You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/12/24 21:30:28 UTC

svn commit: r1647840 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd: classload/ fsm/wi/ mh/

Author: degenaro
Date: Wed Dec 24 20:30:28 2014
New Revision: 1647840

URL: http://svn.apache.org/r1647840
Log:
UIMA-4069 DUCC Job Driver (JD) system classpath

handle Process Down (part 1, all work items retried regardless)

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java
      - copied, changed from r1647818, uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java
Removed:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java?rev=1647840&r1=1647839&r2=1647840&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/classload/ProxyJobDriverErrorHandler.java Wed Dec 24 20:30:28 2014
@@ -124,6 +124,7 @@ public class ProxyJobDriverErrorHandler
 		return retVal;
 	}
 	
+	// Failed work item
 	public ProxyJobDriverDirective handle(String serializedCAS, String serializedException) throws JobDriverException {
 		String location = "handle";
 		ProxyJobDriverDirective retVal = new ProxyJobDriverDirective();
@@ -138,6 +139,23 @@ public class ProxyJobDriverErrorHandler
 			retVal = new ProxyJobDriverDirective(isKillJob, isKillProcess, isKillWorkItem);
 		} 
 		catch (Exception e) {
+			logger.error(location, ILogger.null_id, e);
+		}
+		return retVal;
+	}
+	
+	// Failed process
+	public ProxyJobDriverDirective handle(String serializedCAS) throws JobDriverException {
+		String location = "handle";
+		ProxyJobDriverDirective retVal = new ProxyJobDriverDirective();
+		try {
+			//TODO
+			boolean isKillJob = false;
+			boolean isKillProcess = false;
+			boolean isKillWorkItem = false;
+			retVal = new ProxyJobDriverDirective(isKillJob, isKillProcess, isKillWorkItem);
+		} 
+		catch (Exception e) {
 			logger.error(location, ILogger.null_id, e);
 		}
 		return retVal;

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java?rev=1647840&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java Wed Dec 24 20:30:28 2014
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.fsm.wi;
+
+import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.MetaCasHelper;
+import org.apache.uima.ducc.container.common.Standardize;
+import org.apache.uima.ducc.container.common.fsm.iface.IAction;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.JobDriverHelper;
+import org.apache.uima.ducc.container.jd.cas.CasManager;
+import org.apache.uima.ducc.container.jd.cas.CasManagerStats.RetryReason;
+import org.apache.uima.ducc.container.jd.classload.ProxyJobDriverDirective;
+import org.apache.uima.ducc.container.jd.classload.ProxyJobDriverErrorHandler;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
+import org.apache.uima.ducc.container.jd.wi.IProcessStatistics;
+import org.apache.uima.ducc.container.jd.wi.IWorkItem;
+import org.apache.uima.ducc.container.net.iface.IMetaCas;
+
+public class ActionProcessFailure implements IAction {
+
+	private static Logger logger = Logger.getLogger(ActionProcessFailure.class, IComponent.Id.JD.name());
+	
+	public ActionProcessFailure() {
+		super();
+	}
+	
+	@Override
+	public String getName() {
+		return ActionProcessFailure.class.getName();
+	}
+	
+	private void retryWorkItem(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {
+		String location = "retryWorkItem";
+		
+		MessageBuffer mb = new MessageBuffer();
+		mb.append(Standardize.Label.seqNo.get()+metaCas.getSystemKey());
+		mb.append(Standardize.Label.remote.get()+rwp.toString());
+		logger.info(location, ILogger.null_id, mb.toString());
+		//
+		cm.putMetaCas(metaCas, RetryReason.ProcessDown);
+		cm.getCasManagerStats().incEndRetry();
+		JobDriver jd = JobDriver.getInstance();
+		JobDriverHelper jdh = JobDriverHelper.getInstance();
+		IWorkItemStateKeeper wisk = jd.getWorkItemStateKeeper();
+		MetaCasHelper metaCasHelper = new MetaCasHelper(metaCas);
+		IProcessStatistics pStats = jdh.getProcessStatistics(rwp);
+		int seqNo = metaCasHelper.getSystemKey();
+		wisk.retry(seqNo);
+		pStats.retry(wi);
+		wi.resetTods();
+	}
+	
+	private void killWorkItem(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {
+		//TODO
+	}
+	
+	private void killJob(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {
+		//TODO
+	}
+	
+	@Override
+	public void engage(Object objectData) {
+		String location = "engage";
+		logger.debug(location, ILogger.null_id, "");
+		IActionData actionData = (IActionData) objectData;
+		try {
+			IWorkItem wi = actionData.getWorkItem();
+			IMetaCas metaCas = wi.getMetaCas();
+			JobDriver jd = JobDriver.getInstance();
+			CasManager cm = jd.getCasManager();
+			JobDriverHelper jdh = JobDriverHelper.getInstance();
+			IRemoteWorkerProcess rwp = jdh.getRemoteWorkerProcess(wi);
+			if(rwp != null) {
+				if(metaCas != null) {
+					String serializedCas = (String) metaCas.getUserSpaceCas();
+					ProxyJobDriverErrorHandler pjdeh = jd.getProxyJobDriverErrorHandler();
+					ProxyJobDriverDirective pjdd = pjdeh.handle(serializedCas);
+					if(pjdd != null) {
+						MessageBuffer mb = new MessageBuffer();
+						mb.append(Standardize.Label.isKillJob.get()+pjdd.isKillJob());
+						mb.append(Standardize.Label.isKillProcess.get()+pjdd.isKillProcess());
+						mb.append(Standardize.Label.isKillWorkItem.get()+pjdd.isKillWorkItem());
+						logger.info(location, ILogger.null_id, mb.toString());
+						if(pjdd.isKillJob()) {
+							killJob(cm, wi, metaCas, rwp);
+						}
+						else if(pjdd.isKillWorkItem()) {
+							killWorkItem(cm, wi, metaCas, rwp);
+						}
+						else {
+							retryWorkItem(cm, wi, metaCas, rwp);
+						}
+					}
+					else {
+						retryWorkItem(cm, wi, metaCas, rwp);
+					}
+				}
+				else {
+					MessageBuffer mb = new MessageBuffer();
+					mb.append("No CAS found for processing");
+					logger.info(location, ILogger.null_id, mb.toString());
+				}
+			}
+			else {
+				MessageBuffer mb = new MessageBuffer();
+				mb.append("No remote worker process entry found for processing");
+				logger.info(location, ILogger.null_id, mb.toString());
+			}
+		}
+		catch(Exception e) {
+			logger.error(location, ILogger.null_id, e);
+		}
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessFailure.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java (from r1647818, uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java)
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java?p2=uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java&p1=uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java&r1=1647818&r2=1647840&rev=1647840&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionPreempt.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionProcessPreempt.java Wed Dec 24 20:30:28 2014
@@ -35,13 +35,13 @@ import org.apache.uima.ducc.container.jd
 import org.apache.uima.ducc.container.jd.wi.IWorkItem;
 import org.apache.uima.ducc.container.net.iface.IMetaCas;
 
-public class ActionPreempt implements IAction {
+public class ActionProcessPreempt implements IAction {
 
-	private static Logger logger = Logger.getLogger(ActionPreempt.class, IComponent.Id.JD.name());
+	private static Logger logger = Logger.getLogger(ActionProcessPreempt.class, IComponent.Id.JD.name());
 	
 	@Override
 	public String getName() {
-		return ActionPreempt.class.getName();
+		return ActionProcessPreempt.class.getName();
 	}
 	
 	private void preemptWorkItem(CasManager cm, IWorkItem wi, IMetaCas metaCas, IRemoteWorkerProcess rwp) {

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java?rev=1647840&r1=1647839&r2=1647840&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/WiFsm.java Wed Dec 24 20:30:28 2014
@@ -58,7 +58,8 @@ public class WiFsm extends Fsm {
 	public IAction ActionEnd				= new ActionEnd();
 	
 	public IAction ActionInProgress			= new ActionInProgress();
-	public IAction ActionPreempt			= new ActionPreempt();
+	public IAction ActionProcessFailure		= new ActionProcessFailure();
+	public IAction ActionProcessPreempt		= new ActionProcessPreempt();
 	
 	public IAction ActionIgnore 			= new ActionIgnore();
 	public IAction ActionError				= new ActionError();
@@ -89,25 +90,29 @@ public class WiFsm extends Fsm {
 		add(Start, CAS_Unavailable, ActionIgnore, Start);
 		add(Start, Ack_Request, ActionError, Start);
 		add(Start, Process_Preempt, ActionIgnore, Start);
+		add(Start, Process_Failure, ActionIgnore, Start);
 		
 		add(Get_Pending, Get_Request, ActionInProgress, Get_Pending);
 		add(Get_Pending, CAS_Available, ActionSend, CAS_Send);
 		add(Get_Pending, CAS_Unavailable, ActionSend, Start);
 		add(Get_Pending, Ack_Request, ActionError, Get_Pending);
-		add(Get_Pending, Process_Preempt, ActionPreempt, Start);
+		add(Get_Pending, Process_Preempt, ActionProcessPreempt, Start);
+		add(Get_Pending, Process_Failure, ActionProcessFailure, Start);
 		
 		add(CAS_Send, Get_Request, ActionInProgress, CAS_Send);
 		add(CAS_Send, CAS_Available, ActionIgnore, CAS_Send);
 		add(CAS_Send, CAS_Unavailable, ActionIgnore, CAS_Send);
 		add(CAS_Send, Ack_Request, ActionAck, CAS_Active);
-		add(CAS_Send, Process_Preempt, ActionPreempt, Start);
+		add(CAS_Send, Process_Preempt, ActionProcessPreempt, Start);
+		add(CAS_Send, Process_Failure, ActionProcessFailure, Start);
 		
 		add(CAS_Active, Get_Request, ActionError, CAS_Active);
 		add(CAS_Active, CAS_Available, ActionIgnore, CAS_Active);
 		add(CAS_Active, CAS_Unavailable, ActionIgnore, CAS_Active);
 		add(CAS_Active, Ack_Request, ActionError, CAS_Active);
 		add(CAS_Active, End_Request, ActionEnd, Start);
-		add(CAS_Active, Process_Preempt, ActionPreempt, Start);
+		add(CAS_Active, Process_Preempt, ActionProcessPreempt, Start);
+		add(CAS_Active, Process_Failure, ActionProcessFailure, Start);
 		
 		MessageBuffer mb2 = new MessageBuffer();
 		mb2.append(Standardize.Label.exit.name());

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1647840&r1=1647839&r2=1647840&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Wed Dec 24 20:30:28 2014
@@ -153,7 +153,26 @@ public class MessageHandler implements I
 			mb.append(Standardize.Label.pid.get()+processInfo.getPid());
 			logger.info(location, ILogger.null_id, mb.toString());
 			ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
-			//TODO
+			for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
+				IRemoteWorkerThread rwt = entry.getKey();
+				if(rwt.comprises(processInfo)) {
+					MessageBuffer mb1 = new MessageBuffer();
+					mb1.append(Standardize.Label.remote.get()+rwt.toString());
+					mb1.append(Boolean.TRUE.toString());
+					logger.info(location, ILogger.null_id, mb1.toString());
+					IWorkItem wi = entry.getValue();
+					IFsm fsm = wi.getFsm();
+					IEvent event = WiFsm.Process_Failure;
+					Object actionData = new ActionData(wi, rwt, null);
+					fsm.transition(event, actionData);
+				}
+				else {
+					MessageBuffer mb2 = new MessageBuffer();
+					mb2.append(Standardize.Label.remote.get()+rwt.toString());
+					mb2.append(Boolean.FALSE.toString());
+					logger.info(location, ILogger.null_id, mb2.toString());
+				}
+			}
 		}
 		catch(Exception e) {
 			logger.error(location, ILogger.null_id, e);