You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/01/05 19:52:43 UTC

svn commit: r1649611 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/ uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/

Author: cwiklik
Date: Mon Jan  5 18:52:43 2015
New Revision: 1649611

URL: http://svn.apache.org/r1649611
Log:
UIMA-4066 report exceptions to JD

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1649611&r1=1649610&r2=1649611&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Mon Jan  5 18:52:43 2015
@@ -19,6 +19,9 @@
 
 package org.apache.uima.ducc.transport.configuration.jp;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
 import java.net.SocketTimeoutException;
 import java.util.List;
 import java.util.Properties;
@@ -261,27 +264,34 @@ public class HttpWorkerThread implements
     	
     }
 	public void run() {
-		try {
-	    	logger.info("HttpWorkerThread.run()", null, "Starting JP Process Thread Id:"+Thread.currentThread().getId());
-
+		String command="";
+		PostMethod postMethod = null;
+	    logger.info("HttpWorkerThread.run()", null, "Starting JP Process Thread Id:"+Thread.currentThread().getId());
+	   	try {
 			initialize(duccComponent.isUimaASJob());
 			// each thread needs its own PostMethod
-			PostMethod postMethod = new PostMethod(httpClient.getJdUrl());
+			postMethod = new PostMethod(httpClient.getJdUrl());
 			// Set request timeout
 			postMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, duccComponent.getTimeout());
 			//States stateMachine = new States(States.Start);
-//			SMContext ctx = new SMContextImpl(httpClient, States.Start);
-			String command="";
-			
+//				SMContext ctx = new SMContextImpl(httpClient, States.Start);
+				
 			threadReadyCount.countDown();  // this thread is ready
-			
 			// **************************************************************************
 			// now block and wait until all threads finish deploying and initializing UIMA
 			// **************************************************************************
 			threadReadyCount.await();
+	    		
+	   	} catch( Throwable t) {
+	    		logger.error("HttpWorkerThread.run()", null, t);
+	    		
+	   		return;  // non-recovorable error
+	   	}
+			
 			
-			// run forever (or until the process throws IllegalStateException
-	    	logger.info("HttpWorkerThread.run()", null, "Processing Work Items - Thread Id:"+Thread.currentThread().getId());
+		// run forever (or until the process throws IllegalStateException
+	   	logger.info("HttpWorkerThread.run()", null, "Processing Work Items - Thread Id:"+Thread.currentThread().getId());
+		try {
 
 			while (duccComponent.isRunning()) {  //service.running && ctx.state().process(ctx)) {
 
@@ -347,21 +357,31 @@ public class HttpWorkerThread implements
 							
 							transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
 							
-						} catch( RuntimeException ee) {
-							if ( ee.getCause().equals( AnalysisEngineProcessException.class)) {
-								// This is process error. It may contain user defined
-								// exception in the stack trace. To protect against
-								// ClassNotF ound, the entire stack trace was serialized.
-								// Fetch the serialized stack trace and pass it on to
-								// to the JD.
-								transaction.getMetaCas().setUserSpaceException(ee.getMessage());
-							} else {
-								logger.error("run", null, ee);
-							}
-							transaction.getMetaCas().setUserSpaceException("Bob");
-						}  catch( Exception ee) {
-							transaction.getMetaCas().setUserSpaceException("Bob");
+						}  catch( InvocationTargetException ee) {
+							// The only way we would be here is if uimaProcessor.process() method failed.
+							// In this case, the process method serialized stack trace into binary blob
+							// and wrapped it in AnalysisEngineProcessException. The serialized stack 
+							// trace is available via getMessage() call.
+							//logger.error("run", null, ee);
+							//System.out.println("Error>>>> Caused By Class::::"+ee.getCause().getClass().getName());
+							// This is process error. It may contain user defined
+							// exception in the stack trace. To protect against
+						    // ClassNotFound, the entire stack trace was serialized.
+							// Fetch the serialized stack trace and pass it on to
+							// to the JD. The actual serialized stack trace is wrapped in
+							// RuntimeException->AnalysisEngineException.message
+							transaction.getMetaCas().setUserSpaceException(ee.getCause().getCause().getMessage());								
+//							transaction.getMetaCas().setUserSpaceException("Bob");
+							logger.info("run", null, "Work item processing failed - returning serialized exception to the JD");
+							//ee.printStackTrace();
+						} catch( Exception ee) {
+							ByteArrayOutputStream baos = new ByteArrayOutputStream();
+						    ObjectOutputStream oos = new ObjectOutputStream( baos );
+						    oos.writeObject( ee);
+						    oos.close();
+							transaction.getMetaCas().setUserSpaceException(baos.toByteArray());
 							logger.error("run", null, ee);
+//							ee.printStackTrace();
 						}
 						transaction.getMetaCas().setUserSpaceCas(null);
 						transaction.setType(Type.End);

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java?rev=1649611&r1=1649610&r2=1649611&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java Mon Jan  5 18:52:43 2015
@@ -19,7 +19,9 @@
 
 package org.apache.uima.ducc.user.jp;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.MalformedURLException;
@@ -44,11 +46,13 @@ import org.apache.uima.aae.client.UimaAs
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
 import org.apache.uima.collection.EntityProcessStatus;
 import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
 import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
 import org.xml.sax.Attributes;
 import org.xml.sax.SAXException;
 import org.xml.sax.helpers.DefaultHandler;
@@ -268,6 +272,20 @@ public class UimaASProcessContainer impl
 				metricsList.add(p);
 			}
 			return metricsList;
+		} catch( Throwable e ) {
+			String serializedStackTrace = serialize(e);
+			Logger logger = UIMAFramework.getLogger();
+			logger.log(Level.WARNING, "UimaProcessContainer", e);
+			e.printStackTrace();
+			// repackage so that the code on the other side is protected
+			// against Custom Exception classes that the user code may
+			// throw. Serialize the stack trace and throw a RuntimeException
+			// with a causedby of AnalysisEngineProcessException. The code
+			// on the other side must determine if the exception was caused
+			// by processing error or something else. In case of the latter
+			// it would be java only stack trace.
+			throw new 
+			RuntimeException(serializedStackTrace, new AnalysisEngineProcessException());
 		} finally {
 			if ( cas != null) {
 				cas.release();
@@ -275,6 +293,20 @@ public class UimaASProcessContainer impl
 		}
 	}
     
+	  private String serialize(Throwable t) throws Exception {
+		  try {
+				//return (String)toXMLMethod.invoke(xstreamInstance, t);
+	          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		      ObjectOutputStream oos = new ObjectOutputStream( baos );
+		      oos.writeObject( t );
+		      oos.close();
+		      return new String(baos.toByteArray());
+		  
+		  } catch( Exception e) {
+			  e.printStackTrace();
+			  throw e;
+		  }
+	  }
 
 	private void initializeUimaAsClient(String endpoint) throws Exception {
 		String brokerURL = System.getProperty("DefaultBrokerURL");

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1649611&r1=1649610&r2=1649611&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Mon Jan  5 18:52:43 2015
@@ -19,6 +19,9 @@
 
 package org.apache.uima.ducc.user.jp;
 
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -32,6 +35,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.analysis_engine.AnalysisEngine;
 import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
@@ -43,11 +47,15 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.util.CasPool;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
 
 public class UimaProcessContainer implements IProcessContainer {
 	public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
 	private DuccUimaSerializer uimaSerializer = new DuccUimaSerializer();
-
+   // private Object xstreamInstance=null;
+   // private Method toXMLMethod = null;
+    
 	Semaphore sharedInitSemaphore = new Semaphore(1);
 	// this map enforces thread affinity to specific thread. Needed to make
 	// sure that a thread used to initialized the AE is used to call process().
@@ -112,15 +120,84 @@ public class UimaProcessContainer implem
 	      platformMBeanServer = null;
 	    }
 	  }
+/*
+	  private void loadXStream(String duccHome) throws Exception {
+			ClassLoader currentCL = Thread.currentThread().getContextClassLoader();
+				// setup a classpath for Ducc broker
+			String amqOptionalDir = 
+					duccHome+File.separator+"apache-uima"+File.separator+"apache-activemq"+File.separator+"lib"+File.separator+"optional"+File.separator;//+"*"
+
+			File amqOptionalDirFile = new File(amqOptionalDir);
+	        File[] files = amqOptionalDirFile.listFiles();   // Will be null if missing or not a dir
+	        URL[] urls = new URL[1];
+	        if (files != null) {
+	          for (File f : files) {
+	            if (f.getName().startsWith("xstream")) {
+	              urls[0] = f.toURI().toURL();
+	              break;
+	            }
+	          }
+	        }
+
+
+	        // isolate XStream in its own Class loader
+
+			URLClassLoader ucl = new URLClassLoader(urls,ClassLoader.getSystemClassLoader().getParent() );
+			Thread.currentThread().setContextClassLoader(ucl);
+				
+			Class<?> xstreamClass = ucl.loadClass("com.thoughtworks.xstream.XStream");
+
+			Class<?> domClaz = ucl.loadClass("com.thoughtworks.xstream.io.xml.DomDriver");
+			Class<?> sdClaz = ucl.loadClass("com.thoughtworks.xstream.io.HierarchicalStreamDriver");
+			
+			
+		    URL[] urls2 = ucl.getURLs();
+			for( URL u : urls2 ) {
+				System.out.println(">>>>>>>> -----------:"+u.getFile());
+			}
+			//Constructor<?>[] constr = xstreamClass.getConstructors();	
+			Constructor<?> cons = xstreamClass.getConstructor(sdClaz);	
+			xstreamInstance = cons.newInstance(domClaz.newInstance());
+			//xstreamInstance = xstreamClass.newInstance();	
+		    
+			
+			toXMLMethod = xstreamClass.getMethod("toXML", Object.class);
+		    System.out.println("Initialized XStream For Serialization");
+
+		    // Restore class loader
+		    Thread.currentThread().setContextClassLoader(currentCL);
+	  }
+*/		    
+	  private String serialize(Throwable t) throws Exception {
+		  try {
+				//return (String)toXMLMethod.invoke(xstreamInstance, t);
+	          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		      ObjectOutputStream oos = new ObjectOutputStream( baos );
+		      oos.writeObject( t );
+		      oos.close();
+		      return new String(baos.toByteArray());
+		  
+		  } catch( Exception e) {
+			  e.printStackTrace();
+			  throw e;
+		  }
+	  }
 	  public int initialize(String[] args ) throws Exception {
 			analysisEngineDescriptor = ArgsParser.getArg("-aed", args);
 			scaleout = Integer.valueOf(ArgsParser.getArg("-t", args));
+			
+			
+//	        XStream xStream = new XStream(new DomDriver())
+//	        return xStream.toXML(targetToMarshall);
+
+			
          return scaleout;		  
 	  }
 	public void deploy(String duccHome) throws Exception {
 	    
 //		String jmxName = "org.apache.uima:type=ee.jms.services,s=" + getComponentName() + " Uima EE Service,";
-
+		//loadXStream(duccHome);
+		
 		ResourceSpecifier rSpecifier = null;
 	    HashMap<String,Object> paramsMap = 
 				new HashMap<String,Object>();
@@ -198,6 +275,7 @@ public class UimaProcessContainer implem
 			
 			for (AnalysisEnginePerformanceMetrics metrics : casMetrics) {
 				Properties p = new Properties();
+				
 				p.setProperty("name", metrics.getName());
 				p.setProperty("uniqueName", metrics.getUniqueName());
 				p.setProperty("analysisTime",
@@ -209,7 +287,33 @@ public class UimaProcessContainer implem
 			
 //			System.out.println("Thread:"+Thread.currentThread().getId()+" Processed "+num+" CASes");
 			return metricsList;
-		} finally {
+		} catch( Throwable e ) {
+			String serializedStackTrace = serialize(e);
+			Logger logger = UIMAFramework.getLogger();
+			logger.log(Level.WARNING, "UimaProcessContainer", e);
+			e.printStackTrace();
+			// repackage so that the code on the other side is protected
+			// against Custom Exception classes that the user code may
+			// throw. Serialize the stack trace and throw a RuntimeException
+			// with a causedby of AnalysisEngineProcessException. The code
+			// on the other side must determine if the exception was caused
+			// by processing error or something else. In case of the latter
+			// it would be java only stack trace.
+			throw new 
+			RuntimeException(serializedStackTrace, new AnalysisEngineProcessException());
+		}
+//		catch( Throwable t) {
+			//throw new ResourceProcessException(serialized);
+//			byte[] ser = 
+//			XMLEncoder encoder =
+//			           new XMLEncoder(
+//			              new BufferedOutputStream(
+//			                new FileOutputStream(filename)));
+//			        encoder.writeObject(f);
+//			        encoder.close();
+		
+//		}
+		finally {
 			if (ae != null) {
 				instanceMap.checkin(ae);
 			}
@@ -220,8 +324,17 @@ public class UimaProcessContainer implem
 
 		}
 	}
-
-	private List<AnalysisEnginePerformanceMetrics> getMetrics(AnalysisEngine ae)
+/*	
+	private String serialize ( Serializable o ) throws IOException {
+	        ByteArrayOutputStream os = new ByteArrayOutputStream();
+	        XMLEncoder encoder = new XMLEncoder(os);
+			encoder.writeObject(o);
+			encoder.close();   
+			return new String(os.toByteArray());
+	        
+	    }
+*/
+	   private List<AnalysisEnginePerformanceMetrics> getMetrics(AnalysisEngine ae)
 			throws Exception {
 		List<AnalysisEnginePerformanceMetrics> analysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
 		synchronized(UimaProcessContainer.class) {