You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/11/10 23:11:07 UTC

svn commit: r1637989 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-container/src: main/java/org/apache/uima/ducc/container/jd/cas/ main/java/org/apache/uima/ducc/container/jd/fsm/wi/ main/java/org/apache/uima/ducc/container/jd/mh/ main/java/org/apache/u...

Author: degenaro
Date: Mon Nov 10 22:11:07 2014
New Revision: 1637989

URL: http://svn.apache.org/r1637989
Log:
UIMA-4069 Redesign of JD toward the main goal of classpath separation for container (system) code.

Job Driver error handler support for retry work item and killJob with JUnit test cases.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java Mon Nov 10 22:11:07 2014
@@ -19,6 +19,7 @@
 package org.apache.uima.ducc.container.jd.cas;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class CasManagerStats {
@@ -35,6 +36,8 @@ public class CasManagerStats {
 	private AtomicInteger endFailure = new AtomicInteger(0);
 	private AtomicInteger endRetry = new AtomicInteger(0);
 	
+	private AtomicBoolean killJob = new AtomicBoolean(false);
+	
 	private ConcurrentHashMap<String,AtomicInteger> retryReasonsMap = new ConcurrentHashMap<String,AtomicInteger>();
 	
 	public void setCrTotal(int value) {
@@ -115,4 +118,12 @@ public class CasManagerStats {
 	public int getEndRetry() {
 		return endRetry.get();
 	}
+	
+	public void setKillJob() {
+		killJob.set(true);
+	}
+	
+	public boolean isKillJob() {
+		return killJob.get();
+	}
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java Mon Nov 10 22:11:07 2014
@@ -45,8 +45,18 @@ public class ActionEnd implements IActio
 		return ActionEnd.class.getName();
 	}
 	
-	private void retry(CasManager cm, IWorkItem wi, IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
-		String location = "retry";
+	private void killJob(CasManager cm, IWorkItem wi, IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
+		String location = "killJob";
+		cm.getCasManagerStats().setKillJob();
+		MessageBuffer mb = new MessageBuffer();
+		mb.append(Standardize.Label.transNo.get()+trans.getTransactionId().toString());
+		mb.append(Standardize.Label.seqNo.get()+metaCas.getSystemKey());
+		mb.append(Standardize.Label.remote.get()+rwi.toString());
+		logger.info(location, IEntityId.null_id, mb.toString());
+	}
+	
+	private void retryWorkItem(CasManager cm, IWorkItem wi, IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
+		String location = "retryWorkItem";
 		cm.putMetaCas(metaCas, RetryReason.UserErrorRetry);
 		cm.getCasManagerStats().incEndRetry();
 		MessageBuffer mb = new MessageBuffer();
@@ -56,8 +66,8 @@ public class ActionEnd implements IActio
 		logger.info(location, IEntityId.null_id, mb.toString());
 	}
 	
-	private void failure(CasManager cm, IWorkItem wi, IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
-		String location = "failure";
+	private void killWorkItem(CasManager cm, IWorkItem wi, IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
+		String location = "killWorkItem";
 		cm.getCasManagerStats().incEndFailure();
 		MessageBuffer mb = new MessageBuffer();
 		mb.append(Standardize.Label.transNo.get()+trans.getTransactionId().toString());
@@ -66,8 +76,8 @@ public class ActionEnd implements IActio
 		logger.info(location, IEntityId.null_id, mb.toString());
 	}
 	
-	private void success(CasManager cm, IWorkItem wi, IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
-		String location = "success";
+	private void successWorkItem(CasManager cm, IWorkItem wi, IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
+		String location = "successWorkItem";
 		cm.getCasManagerStats().incEndSuccess();
 		wi.setTodEnd();
 		updateStatistics(wi);
@@ -98,19 +108,22 @@ public class ActionEnd implements IActio
 					ProxyJobDriverErrorHandler pjdeh = jd.getProxyJobDriverErrorHandler();
 					ProxyJobDriverDirective pjdd = pjdeh.handle(cas, exception);
 					if(pjdd != null) {
+						if(pjdd.isKillJob()) {
+							killJob(cm, wi, trans, metaCas, rwi);
+						}
 						if(pjdd.isKillWorkItem()) {
-							failure(cm, wi, trans, metaCas, rwi);
+							killWorkItem(cm, wi, trans, metaCas, rwi);
 						}
 						else {
-							retry(cm, wi, trans, metaCas, rwi);
+							retryWorkItem(cm, wi, trans, metaCas, rwi);
 						}
 					}
 					else {
-						failure(cm, wi, trans, metaCas, rwi);
+						killWorkItem(cm, wi, trans, metaCas, rwi);
 					}
 				}
 				else {
-					success(cm, wi, trans, metaCas, rwi);
+					successWorkItem(cm, wi, trans, metaCas, rwi);
 				}
 				wi.resetTods();
 			}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java Mon Nov 10 22:11:07 2014
@@ -69,6 +69,9 @@ public class Dispatcher {
 			oi.setWorkItemEndSuccesses(cms.getEndSuccess());
 			oi.setWorkItemEndFailures(cms.getEndFailure());
 			oi.setWorkItemEndRetrys(cms.getEndRetry());
+			if(cms.isKillJob()) {
+				oi.setKillJob();
+			}
 			oi.setWorkItemPreemptions(cms.getNumberOfPreemptions());
 			oi.setWorkItemFinishedMillisMin(wis.getMillisMin());
 			oi.setWorkItemFinishedMillisMax(wis.getMillisMax());
@@ -81,6 +84,7 @@ public class Dispatcher {
 			mb.append(Standardize.Label.crFetches.get()+oi.getWorkItemCrFetches());
 			mb.append(Standardize.Label.endSuccess.get()+oi.getWorkItemEndSuccesses());
 			mb.append(Standardize.Label.endFailure.get()+oi.getWorkItemEndFailures());
+			mb.append(Standardize.Label.killJob.get()+oi.isKillJob());
 			mb.append(Standardize.Label.preemptions.get()+oi.getWorkItemPreemptions());
 			mb.append(Standardize.Label.finishedMillisMin.get()+oi.getWorkItemFinishedMillisMin());
 			mb.append(Standardize.Label.finishedMillisMax.get()+oi.getWorkItemFinishedMillisMax());

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java Mon Nov 10 22:11:07 2014
@@ -75,4 +75,9 @@ public interface IOperatingInfo {
 	
 	public void setWorkItemTodMostRecentStart(long value);
 	public long getWorkItemTodMostRecentStart();
+	
+	//
+	
+	public void setKillJob();
+	public boolean isKillJob();
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java Mon Nov 10 22:11:07 2014
@@ -42,6 +42,8 @@ public class OperatingInfo implements IO
 	
 	private long todMostRecentStart = 0;
 	
+	private boolean killJob = false;
+	
 	@Override
 	public void setWorkItemCrTotal(int value) {
 		crTotal = value;
@@ -207,4 +209,14 @@ public class OperatingInfo implements IO
 		return todMostRecentStart;
 	}
 
+	@Override
+	public void setKillJob() {
+		killJob = true;
+	}
+
+	@Override
+	public boolean isKillJob() {
+		return killJob;
+	}
+
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java Mon Nov 10 22:11:07 2014
@@ -337,6 +337,13 @@ public class TestDispatcher extends ATes
 			asExpected("CASes error count == "+expectedErrorsTest04);
 			assertTrue(endSuccess+endFailure == 100);
 			asExpected("CASes failure+success count == 100");
+			boolean killJob = oi.isKillJob();
+			if(endFailure >= 15) {
+				assertTrue(killJob == true);
+			}
+			else {
+				assertTrue(killJob == false);
+			}
 		}
 		catch(Exception e) {
 			e.printStackTrace();
@@ -360,10 +367,10 @@ public class TestDispatcher extends ATes
 	}
 	
 	
-	// multiple node:pid:tid with errors & retrys
+	// multiple node:pid:tid with errors
 	
 	@Test
-	public void test_05() {
+	public void test_05a() {
 		if(isDisabled(this.getClass().getName())) {
 			return;
 		}
@@ -376,8 +383,6 @@ public class TestDispatcher extends ATes
 			jdCfg.setUserClasspath(Utilities.userCP);
 			jdCfg.setCrXml(crXml);
 			jdCfg.setCrCfg(crCfg);
-			String eh = "org.apache.uima.ducc.user.jd.test.helper.TestJdContainerErrorHandlerRandomRetry";
-			jdCfg.setErrorHandlerClassName(eh);
 			JobDriver.setInstance(jdCfg);
 			int size = JobDriver.getInstance().getMap().size();
 			debug("map size:"+size);
@@ -412,15 +417,83 @@ public class TestDispatcher extends ATes
 			asExpected("CASes fetched count == 100");
 			long endSuccess = oi.getWorkItemEndSuccesses();
 			long endFailure = oi.getWorkItemEndFailures();
-			long endRetry = oi.getWorkItemEndRetrys();
 			debug("injected errors: "+inject);
 			debug("end success: "+endSuccess);
 			debug("end failure: "+endFailure);
-			debug("end retry: "+endRetry);
+			assertTrue(endFailure == expectedErrorsTest05);
+			asExpected("CASes error count == "+expectedErrorsTest05);
 			assertTrue(endSuccess+endFailure == 100);
 			asExpected("CASes failure+success count == 100");
-			assertTrue(endRetry > 0);
-			asExpected("CASes retry count == "+endRetry);
+			boolean killJob = oi.isKillJob();
+			assertTrue(killJob == false);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("Exception");
+		}
+	}
+	
+	@Test
+	public void test_05b() {
+		if(isDisabled(this.getClass().getName())) {
+			return;
+		}
+		try {
+			URL urlXml = this.getClass().getResource("/CR100.xml");
+			File file = new File(urlXml.getFile());
+			String crXml = file.getAbsolutePath();
+			String crCfg = null;
+			IJobDriverConfig jdCfg = new JobDriverConfig();
+			jdCfg.setUserClasspath(Utilities.userCP);
+			jdCfg.setCrXml(crXml);
+			jdCfg.setCrCfg(crCfg);
+			//
+			String ehcp = "KillJobLimit="+2;
+			jdCfg.setErrorHandlerConfigurationParameters(ehcp);
+			//
+			JobDriver.setInstance(jdCfg);
+			int size = JobDriver.getInstance().getMap().size();
+			debug("map size:"+size);
+			Dispatcher dispatcher = new Dispatcher();
+			ThreadInfoFactory tif = new ThreadInfoFactory(2,2,2);
+			ThreadInfo ti = tif.getRandom();
+			debug("random:"+ti.toKey());
+			int casNo = -1;
+			IMetaCas metaCasPrevious = null;
+			IMetaCas metaCas = transGet(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+			assertTrue(metaCas != null);
+			int inject = 0;
+			while(metaCas != null) {
+				transAck(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+				if(randomErrorTest05()) {
+					Exception exception = new RuntimeException("injected error test #05");
+					metaCas.setUserSpaceException(exception);
+					inject++;
+				}
+				transEnd(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+				casNo--;
+				metaCasPrevious = metaCas;
+				assertTrue(metaCasPrevious != null);
+				ti = tif.getRandom();
+				debug("random:"+ti.toKey());
+				metaCas = transGet(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+			}
+			assertTrue(metaCasPrevious.getSystemKey().equals("100"));
+			asExpected("CASes processed count == 100");
+			IOperatingInfo oi = dispatcher.handleGetOperatingInfo();
+			assertTrue(oi.getWorkItemCrFetches() == 100);
+			asExpected("CASes fetched count == 100");
+			long endSuccess = oi.getWorkItemEndSuccesses();
+			long endFailure = oi.getWorkItemEndFailures();
+			debug("injected errors: "+inject);
+			debug("end success: "+endSuccess);
+			debug("end failure: "+endFailure);
+			assertTrue(endFailure == expectedErrorsTest05);
+			asExpected("CASes error count == "+expectedErrorsTest05);
+			assertTrue(endSuccess+endFailure == 100);
+			asExpected("CASes failure+success count == 100");
+			boolean killJob = oi.isKillJob();
+			assertTrue(killJob == true);
 		}
 		catch(Exception e) {
 			e.printStackTrace();
@@ -430,7 +503,9 @@ public class TestDispatcher extends ATes
 	
 	private long seedTest05 = 5;
 	private Random randomTest05 = new Random(seedTest05);
-	private long pctTest05 = 15;
+	private long pctTest05 = 5;
+	
+	private long expectedErrorsTest05 = 7;
 	
 	private boolean randomErrorTest05() {
 		boolean retVal = false;
@@ -440,4 +515,85 @@ public class TestDispatcher extends ATes
 		}
 		return retVal;
 	}
+	
+	// multiple node:pid:tid with errors & retrys
+	
+	@Test
+	public void test_06() {
+		if(isDisabled(this.getClass().getName())) {
+			return;
+		}
+		try {
+			URL urlXml = this.getClass().getResource("/CR100.xml");
+			File file = new File(urlXml.getFile());
+			String crXml = file.getAbsolutePath();
+			String crCfg = null;
+			IJobDriverConfig jdCfg = new JobDriverConfig();
+			jdCfg.setUserClasspath(Utilities.userCP);
+			jdCfg.setCrXml(crXml);
+			jdCfg.setCrCfg(crCfg);
+			String eh = "org.apache.uima.ducc.user.jd.test.helper.TestJdContainerErrorHandlerRandomRetry";
+			jdCfg.setErrorHandlerClassName(eh);
+			JobDriver.setInstance(jdCfg);
+			int size = JobDriver.getInstance().getMap().size();
+			debug("map size:"+size);
+			Dispatcher dispatcher = new Dispatcher();
+			ThreadInfoFactory tif = new ThreadInfoFactory(2,2,2);
+			ThreadInfo ti = tif.getRandom();
+			debug("random:"+ti.toKey());
+			int casNo = -1;
+			IMetaCas metaCasPrevious = null;
+			IMetaCas metaCas = transGet(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+			assertTrue(metaCas != null);
+			int inject = 0;
+			while(metaCas != null) {
+				transAck(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+				if(randomErrorTest06()) {
+					Exception exception = new RuntimeException("injected error test #06");
+					metaCas.setUserSpaceException(exception);
+					inject++;
+				}
+				transEnd(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+				casNo--;
+				metaCasPrevious = metaCas;
+				assertTrue(metaCasPrevious != null);
+				ti = tif.getRandom();
+				debug("random:"+ti.toKey());
+				metaCas = transGet(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+			}
+			assertTrue(metaCasPrevious.getSystemKey().equals("100"));
+			asExpected("CASes processed count == 100");
+			IOperatingInfo oi = dispatcher.handleGetOperatingInfo();
+			assertTrue(oi.getWorkItemCrFetches() == 100);
+			asExpected("CASes fetched count == 100");
+			long endSuccess = oi.getWorkItemEndSuccesses();
+			long endFailure = oi.getWorkItemEndFailures();
+			long endRetry = oi.getWorkItemEndRetrys();
+			debug("injected errors: "+inject);
+			debug("end success: "+endSuccess);
+			debug("end failure: "+endFailure);
+			debug("end retry: "+endRetry);
+			assertTrue(endSuccess+endFailure == 100);
+			asExpected("CASes failure+success count == 100");
+			assertTrue(endRetry > 0);
+			asExpected("CASes retry count == "+endRetry);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("Exception");
+		}
+	}
+	
+	private long seedTest06 = 6;
+	private Random randomTest06 = new Random(seedTest06);
+	private long pctTest06 = 15;
+	
+	private boolean randomErrorTest06() {
+		boolean retVal = false;
+		int n = randomTest06.nextInt(100);
+		if(n < pctTest06) {
+			retVal = true;
+		}
+		return retVal;
+	}
 }