You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/06/17 15:41:55 UTC

svn commit: r1861516 - in /uima/uima-ducc/trunk: uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/ uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/

Author: cwiklik
Date: Mon Jun 17 15:41:54 2019
New Revision: 1861516

URL: http://svn.apache.org/viewvc?rev=1861516&view=rev
Log:
UIMA-6040 added support for claspath isolation to Uima As based JPs

Added:
    uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java
    uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java
Modified:
    uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
    uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java?rev=1861516&r1=1861515&r2=1861516&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java Mon Jun 17 15:41:54 2019
@@ -21,21 +21,17 @@ package org.apache.uima.ducc.ps.service.
 import java.io.File;
 import java.io.FileInputStream;
 import java.lang.reflect.Method;
-import java.net.BindException;
 import java.net.InetAddress;
-import java.net.SocketException;
+import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import org.apache.uima.UIMAFramework;
-import org.apache.uima.cas.CAS;
 import org.apache.uima.ducc.ps.service.IScaleable;
 import org.apache.uima.ducc.ps.service.IServiceState;
 import org.apache.uima.ducc.ps.service.ServiceConfiguration;
@@ -55,8 +51,6 @@ import org.apache.uima.ducc.ps.service.p
 import org.apache.uima.ducc.ps.service.processor.uima.utils.UimaResultDefaultSerializer;
 import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
 import org.apache.uima.ducc.ps.service.utils.Utils;
-import org.apache.uima.resource.metadata.impl.TypeSystemDescription_impl;
-import org.apache.uima.util.CasCreationUtils;
 import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 
@@ -66,7 +60,6 @@ public class UimaAsServiceProcessor exte
 	Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
 	// Map to store DuccUimaSerializer instances. Each has affinity to a thread
 	private static Object brokerInstance = null;
-	private UimaAsClientWrapper uimaASClient = null;
 	private String saxonURL = null;
 	private String xslTransform = null;
 	protected Object initializeMonitor = new Object();
@@ -91,6 +84,21 @@ public class UimaAsServiceProcessor exte
 	private String duccHome = null;
 	boolean enablePerformanceBreakdownReporting = false;
 	private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
+	
+	public static final String AE_NAME = "AeName";
+	public static final String AE_CONTEXT = "AeContext";
+	public static final String AE_ANALYSIS_TIME = "AeAnalysisTime";
+	public static final String AE_CAS_PROCESSED = "AeProcessedCasCount";
+
+
+	private static String M_PROCESS="process";
+	private static String M_STOP="stop";
+	private static String M_INITIALIZE="initialize";
+
+	private Method processMethod;
+	private Method stopMethod;
+	private Object processorInstance;
+
 	static {
 		// try to get platform MBean Server (Java 1.5 only)
 		try {
@@ -150,8 +158,25 @@ public class UimaAsServiceProcessor exte
 		monitor =
 				new RemoteStateObserver(serviceConfiguration, logger);
 	}
+	
+	public void dump(ClassLoader cl, int numLevels) {
+		int n = 0;
+		for (URLClassLoader ucl = (URLClassLoader) cl; ucl != null
+				&& ++n <= numLevels; ucl = (URLClassLoader) ucl.getParent()) {
+			System.out.println("Class-loader " + n + " has "
+					+ ucl.getURLs().length + " urls:");
+			for (URL u : ucl.getURLs()) {
+				System.out.println("  " + u);
+			}
+		}
+	}
+	
 	@Override
 	public void initialize() throws ServiceInitializationException {
+	   	// Save current context cl and inject System classloader as
+		// a context cl before calling user code. 
+		ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+
 		try {
 			duccHome = System.getProperty("DUCC_HOME");
 			String pid = Utils.getPID("Queue");
@@ -189,17 +214,45 @@ public class UimaAsServiceProcessor exte
 						argList.add("-t");
 						argList.add(System.getProperty("ducc.deploy.JpThreadCount"));
 					}
-				
-					enableMetricsIfNewUimaAs();
+					
 					resultSerializer = new UimaResultDefaultSerializer();
-					uimaASClient = new UimaAsClientWrapper(); 
+					
+					Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+					dump(ClassLoader.getSystemClassLoader(), 3);
+					enableMetricsIfNewUimaAs();
+					
+					// load proxy class from uima-ducc-user.jar to access uima classes. The UimaWrapper is a convenience/wrapper
+					// class so that we dont have to use reflection on Uima classes.
+					Class<?> classToLaunch = 
+							ClassLoader.getSystemClassLoader().loadClass("org.apache.uima.ducc.user.common.main.UimaAsWrapper");
+		
+					processorInstance = classToLaunch.newInstance();
+
+					Method initMethod = processorInstance.getClass().getMethod(M_INITIALIZE, String.class,String.class,String.class, int.class, boolean.class);
+
+					processMethod = processorInstance.getClass().getMethod(M_PROCESS, new Class[] {String.class});
+					
+					stopMethod = processorInstance.getClass().getMethod(M_STOP);
+					//public void initialize(String analysisEngineDescriptor, String xslTransform, String saxonURL,  int scaleout,  boolean deserialize) throws Exception {
+
+
+					
+					
+					
+					
+					
+					
+				//	uimaASClient = new UimaAsClientWrapper(); 
 					scaleout = generateDescriptorsAndGetScaleout(argList.toArray(new String[argList.size()])); // Also converts the DD if necessary
 					if ( scaleout == 0 ) {
 						scaleout = 1;
 					}
+					// initialize AE via UimaWrapper
+					initMethod.invoke(processorInstance, deploymentDescriptors[0], xsltPath, saxonPath, scaleout, (serviceConfiguration.getJpType() != null));
+
 					initialized = true;
 				}
-				doDeploy();
+				//doDeploy();
 			}
 			if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) {
 				super.delay(logger, DEFAULT_INIT_DELAY);
@@ -211,6 +264,9 @@ public class UimaAsServiceProcessor exte
 			logger.log(Level.WARNING, null, e);
 			monitor.onStateChange(IServiceState.State.FailedInitialization.toString(), new Properties());
 			throw new ServiceInitializationException("",e);
+		} finally {
+			Thread.currentThread().setContextClassLoader(savedCL);
+
 		}
 
 	}
@@ -239,74 +295,43 @@ public class UimaAsServiceProcessor exte
         matcher.appendTail(sb);
         return sb.toString();
 	}
-
-	private CAS getCAS(String serializedTask) throws Exception {
-		CAS cas = uimaASClient.getCAS();
-		// DUCC JP services are given a serialized CAS ... others just the doc-text for
-		// a CAS
-		if (serviceConfiguration.getJpType() != null) {
-			// Use thread dedicated UimaSerializer to de-serialize the CAS
-			getUimaSerializer().deserializeCasFromXmi(serializedTask, cas);
-		} else {
-			cas.setDocumentText(serializedTask);
-			cas.setDocumentLanguage("en");
-		}
-		return cas;
-	}
-
 	
 	@Override
 	public IProcessResult process(String serializedTask) {
-		CAS cas = null;
+	//	CAS cas = null;
 		IProcessResult result;
+	   	// Save current context cl and inject System classloader as
+		// a context cl before calling user code. 
+		ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+
+		Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+
 		try {
 
-			cas = getCAS(serializedTask);
 			List<PerformanceMetrics> casMetrics = new ArrayList<>();
 
 			if (enablePerformanceBreakdownReporting) {
-				List<?> perfMetrics = new ArrayList<>();
-
 				try {
-					uimaASClient.sendAndReceive(cas, perfMetrics);
-					successCount.incrementAndGet();
-					errorCountSinceLastSuccess.set(0);
-
-				} catch (Exception t) {
-					logger.log(Level.WARNING, "", t);
-					IWindowStats stats = 
-							new ProcessWindowStats(errorCount.incrementAndGet(), 
-									successCount.get(), 
-									errorCountSinceLastSuccess.incrementAndGet());
-					Action action = 
-							errorHandler.handleProcessError(t, this, stats);
-					result = new UimaProcessResult(t, action);
-					return result;
-				}
-
-				for (Object metrics : perfMetrics) {
-					Method nameMethod = metrics.getClass().getDeclaredMethod("getName");
-					String name = (String) nameMethod.invoke(metrics);
-					Method uniqueNameMethod = metrics.getClass().getDeclaredMethod("getUniqueName");
-					String uniqueName = (String) uniqueNameMethod.invoke(metrics);
-					Method analysisTimeMethod = metrics.getClass().getDeclaredMethod("getAnalysisTime");
-					long analysisTime = (long) analysisTimeMethod.invoke(metrics);
-					Method currentTaskCountMethod = metrics.getClass().getDeclaredMethod("getNumProcessed");
-					long currentTaskCount = (long)currentTaskCountMethod.invoke(metrics);
-					boolean aggregate = uniqueName.startsWith("/" + name);
-					int pos = uniqueName.indexOf("/", 1);
-					if (pos > -1 && scaleout > 1 && name != null && aggregate) {
-						String st = uniqueName.substring(pos);
-						uniqueName = "/" + name + st;
+					// Use basic data structures for returning performance metrics from 
+					// a processor process(). The PerformanceMetrics class is not 
+					// visible in the code packaged in uima-ducc-user.jar so return
+					// metrics in Properties object for each AE.
+					List<Properties> metrics;
+
+					// *****************************************************
+					// PROCESS
+					// *****************************************************
+					metrics = (List<Properties>) processMethod.invoke(processorInstance, serializedTask);
+					for( Properties p : metrics ) {
+						// there is Properties object for each AE, so create an
+						// instance of PerformanceMetrics and initialize it
+						PerformanceMetrics pm = 
+								new PerformanceMetrics(p.getProperty(AE_NAME), 
+										p.getProperty(AE_CONTEXT),
+										Long.valueOf(p.getProperty(AE_ANALYSIS_TIME)), 
+										Long.valueOf(p.getProperty(AE_CAS_PROCESSED)));
+						casMetrics.add(pm);
 					}
-					PerformanceMetrics pm = new PerformanceMetrics(name, uniqueName, analysisTime, currentTaskCount);
-					casMetrics.add(pm);
-				}
-			} else {
-				// delegate processing to the UIMA-AS service and wait for a reply
-				try {
-					
-					uimaASClient.sendAndReceive(cas);
 					successCount.incrementAndGet();
 					errorCountSinceLastSuccess.set(0);
 
@@ -318,14 +343,9 @@ public class UimaAsServiceProcessor exte
 									errorCountSinceLastSuccess.incrementAndGet());
 					Action action = 
 							errorHandler.handleProcessError(t, this, stats);
-
 					result = new UimaProcessResult(t, action);
 					return result;
-				}                                                                                
-				PerformanceMetrics pm = new PerformanceMetrics(
-						"Performance Metrics Not Supported For DD Jobs and UIMA-AS <= v2.6.0",
-						"Performance Metrics Not Supported For DD Jobs and UIMA-AS <= v2.6.0 ", 0, 0);
-				casMetrics.add(pm);
+				}
 
 			}
 			return new UimaProcessResult(resultSerializer.serialize(casMetrics));
@@ -335,38 +355,10 @@ public class UimaAsServiceProcessor exte
 			result = new UimaProcessResult(t, Action.TERMINATE);
 			return result;
 		} finally {
-			if (cas != null) {
-				cas.release();
-			}
+			Thread.currentThread().setContextClassLoader(savedCL);
 		}
 	}
-	private void doDeploy() throws Exception {
-		// deploy singleUIMA-AS Version instance of embedded broker
-			try {
-				// below code runs once to create broker, uima-as client and
-				// uima-as service
-				if (brokerInstance == null) {
-					deployBroker(duccHome);
-					// Broker is running
-					brokerRunning = true;
-
-					int i = 0;
-					// Deploy UIMA-AS services
-					for (String dd : deploymentDescriptors) {
-						// Deploy UIMA-AS service. Keep the deployment id so
-						// that we can undeploy uima-as service on stop.
-						ids[i] = uimaASClient.deployService(dd);
-					}
-					// send GetMeta to UIMA-AS service and wait for a reply
-					uimaASClient.initialize(); 
-				}
 
-			} catch (Throwable e) {
-				logger.log(Level.WARNING, "UimaAsServiceProcesser", e);
-				throw new RuntimeException(e);
-
-			}
-	}
 	private Properties loadDuccProperties() throws Exception {
 		Properties duccProperties = new Properties();
 		duccProperties.load(new FileInputStream(System.getProperty("ducc.deploy.configuration")));
@@ -451,116 +443,32 @@ public class UimaAsServiceProcessor exte
 	@Override
 	public void stop() {
 		synchronized (UimaAsServiceProcessor.class) {
-			if (brokerRunning) {
-				logger.log(Level.INFO, "Stopping UIMA-AS Client");
-				System.out.println("Stopping UIMA-AS Client");
-				try {
-					// Prevent UIMA-AS from exiting
-					System.setProperty("dontKill", "true");
-					uimaASClient.stop();
-					System.out.println("UIMA-AS Client Stopped");
-					Method brokerStopMethod = classToLaunch.getMethod("stop");
-					brokerStopMethod.invoke(brokerInstance);
-
-					Method waitMethod = classToLaunch.getMethod("waitUntilStopped");
-					waitMethod.invoke(brokerInstance);
-					brokerRunning = false;
-					System.out.println("Internal Broker Stopped");
-					super.stop();
-
-				} catch (Exception e) {
-					logger.log(Level.WARNING, "stop", e);
-				}
-
-			}
-		}
-
-	}
-
-	private void deployBroker(String duccHome) throws Exception {
-		// Save current context class loader. When done loading the broker jars
-		// this class loader will be restored
-		ClassLoader currentCL = Thread.currentThread().getContextClassLoader();
-		HashMap<String, String> savedPropsMap = null;
-
-		try {
-			// setup a classpath for Ducc broker
-			String[] brokerClasspath = new String[] {
-					duccHome + File.separator + "apache-uima" + File.separator + "apache-activemq" + File.separator
-							+ "lib" + File.separator + "*",
-					duccHome + File.separator + "apache-uima" + File.separator + "apache-activemq" + File.separator
-							+ "lib" + File.separator + "optional" + File.separator + "*" };
-
-			// isolate broker in its own Class loader
-			URLClassLoader ucl = Utils.create(brokerClasspath);
-			Thread.currentThread().setContextClassLoader(ucl);
-			savedPropsMap = Utils.hideLoggingProperties(); // Ensure DUCC doesn't try to use the user's logging setup
-
-			classToLaunch = ucl.loadClass("org.apache.activemq.broker.BrokerService");
-			if (System.getProperty("ducc.debug") != null) {
-				Utils.dump(ucl, 4);
+			
+			logger.log(Level.INFO,this.getClass().getName()+" stop() called");
+		   	// save current context cl and inject System classloader as
+			// a context cl before calling user code. This is done in 
+			// user code needs to load resources 
+			ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+			Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+			if ( logger.isLoggable(Level.FINE)) {
+				logger.log(Level.FINE,"",">>>> stop():::Context Classloader Switch - Executing code from System Classloader");
 			}
-			brokerInstance = classToLaunch.newInstance();
-
-			Method setDedicatedTaskRunnerMethod = classToLaunch.getMethod("setDedicatedTaskRunner", boolean.class);
-			setDedicatedTaskRunnerMethod.invoke(brokerInstance, false);
-
-			Method setPersistentMethod = classToLaunch.getMethod("setPersistent", boolean.class);
-			setPersistentMethod.invoke(brokerInstance, false);
-
-			int port = 61626; // try to start the colocated broker with this port first
-			String brokerURL = "tcp://localhost:";
-			// loop until a valid port is found for the broker
-			while (true) {
-				try {
-					Method addConnectorMethod = classToLaunch.getMethod("addConnector", String.class);
-					addConnectorMethod.invoke(brokerInstance, brokerURL + port);
-
-					Method startMethod = classToLaunch.getMethod("start");
-					startMethod.invoke(brokerInstance);
-
-					Method waitUntilStartedMethod = classToLaunch.getMethod("waitUntilStarted");
-					waitUntilStartedMethod.invoke(brokerInstance);
-					System.setProperty("DefaultBrokerURL", brokerURL + port);
-					System.setProperty("BrokerURI", brokerURL + port);
-					// Needed to resolve ${broker.name} placeholder in DD generated by DUCC
-					System.setProperty(brokerPropertyName, brokerURL + port);
-
-					break; // got a valid port for the broker
-				} catch (Exception e) {
-					if (isBindException(e)) {
-						port++;
-					} else {
-						throw new RuntimeException(e);
-					}
+			try {
+				System.setProperty("dontKill", "true");
+				stopMethod.invoke(processorInstance);
+				super.stop();
+
+			} catch( Exception e) {
+				logger.log(Level.WARNING, "stop", e);
+			} finally {
+				Thread.currentThread().setContextClassLoader(savedCL);
+				if ( logger.isLoggable(Level.FINE)) {
+					logger.log(Level.FINE,"",">>>> stop():::Context Classloader Switch - Restored Ducc Classloader");
 				}
 			}
 
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			// restore context class loader
-			Thread.currentThread().setContextClassLoader(currentCL);
-			brokerLatch.countDown();
-			Utils.restoreLoggingProperties(savedPropsMap); // May not be necessary as user's logger has been established
-		}
-
-	}
-
-	private boolean isBindException(Throwable e) {
-		if (e == null) {
-			return false;
 		}
 
-		if (e instanceof BindException) {
-			return true;
-		} else if (e instanceof SocketException && "Address already in use".equals(e.getMessage())) {
-			return true;
-		} else if (e.getCause() != null) {
-			return isBindException(e.getCause());
-		} else {
-			return false;
-		}
 	}
 
 	private static void printUsageMessage() {
@@ -570,111 +478,16 @@ public class UimaAsServiceProcessor exte
 				+ "-xslt path-to-dd2spring-xslt\n" + "   or\n"
 				+ "path to Spring XML Configuration File which is the output of running dd2spring\n");
 	}
-	public static void main(String[] args) {
-		try {
-			UimaAsServiceProcessor processor = 
-					new UimaAsServiceProcessor(args, null);
-			processor.initialize();
-			CAS cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null);
-			cas.setDocumentLanguage("en");
-			cas.setDocumentText("Test");
-			UimaSerializer serializer = 
-					new UimaSerializer();
-			String serializedCas = serializer.serializeCasToXmi(cas);
-
-			IProcessResult result =
-					processor.process(serializedCas);
-			System.out.println("Client Received Result - Success:"+(result.getResult()!=null));
-			processor.stop();
-		} catch( Exception e) {
-			e.printStackTrace();
-		}
-		
-	}
-	private class UimaAsClientWrapper {
-		private Object uimaASClient;
-		private Class<?> clientClz;
-
-		public UimaAsClientWrapper() throws Exception {
-			clientClz = Class
-					.forName("org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl");
-			uimaASClient = clientClz.newInstance();
-			
-		}
-
-		public String deployService(String aDeploymentDescriptorPath) throws Exception {
 
-			Map<String, Object> appCtx = new HashMap<>();
-
-			Class<?> clz = Class.forName("org.apache.uima.aae.client.UimaAsynchronousEngine");
-
-			appCtx.put((String) clz.getField("DD2SpringXsltFilePath").get(uimaASClient), xslTransform.replace('/', FS));
-			appCtx.put((String) clz.getField("SaxonClasspath").get(uimaASClient), saxonURL.replace('/', FS));
-			appCtx.put((String) clz.getField("CasPoolSize").get(uimaASClient), scaleout);
-
-			String containerId = null;
-			// use UIMA-AS client to deploy the service using provided
-			// Deployment Descriptor
-			ClassLoader duccCl = Thread.currentThread().getContextClassLoader();
-			ClassLoader cl = this.getClass().getClassLoader();
-			Thread.currentThread().setContextClassLoader(cl);
-			Method deployMethod = uimaASClient.getClass().getDeclaredMethod("deploy", String.class, Map.class);
-			containerId = (String) deployMethod.invoke(uimaASClient,
-					new Object[] { aDeploymentDescriptorPath, appCtx });
-			Thread.currentThread().setContextClassLoader(duccCl);
-			return containerId;
-		}
-
-		private void initialize() throws Exception {
-
-			String endpoint = System.getProperty(queuePropertyName);
-			String brokerURL = System.getProperty(brokerPropertyName);
-			Map<String, Object> appCtx = new HashMap<>();
-			Class<?> clz = Class.forName("org.apache.uima.aae.client.UimaAsynchronousEngine");
-
-			appCtx.put((String) clz.getField("ServerUri").get(uimaASClient), brokerURL);
-			appCtx.put((String) clz.getField("ENDPOINT").get(uimaASClient), endpoint);
-			appCtx.put((String) clz.getField("CasPoolSize").get(uimaASClient), scaleout);
-			appCtx.put((String) clz.getField("Timeout").get(uimaASClient), 0);
-			appCtx.put((String) clz.getField("GetMetaTimeout").get(uimaASClient), 0);
-			appCtx.put((String) clz.getField("CpcTimeout").get(uimaASClient), 1100);
-			Method initMethod = uimaASClient.getClass().getMethod("initialize", Map.class);
-			initMethod.invoke(uimaASClient, new Object[] { appCtx });
-
-			// blocks until the client initializes
-			Method getMetaMethod = uimaASClient.getClass().getMethod("getMetaData");
-			Object meta = getMetaMethod.invoke(uimaASClient);
-
-			Method nameMethod = meta.getClass().getMethod("getName");
-			aeName = (String) nameMethod.invoke(meta);
-		}
-		public CAS getCAS() throws Exception {
-			Method getCasMethod = uimaASClient.getClass().getMethod("getCAS");
-			return (CAS) getCasMethod.invoke(uimaASClient);
-		}
-		public void sendAndReceive(CAS cas, List<?> perfMetrics) throws Exception {
-			Method sendMethod = uimaASClient.getClass().getMethod("sendAndReceiveCAS", CAS.class,
-					List.class);
-			sendMethod.invoke(uimaASClient, new Object[] { cas, perfMetrics });
-		}
-		public void sendAndReceive(CAS cas) throws Exception {
-			Method sendMethod = uimaASClient.getClass().getDeclaredMethod("sendAndReceiveCAS", CAS.class);
-			sendMethod.invoke(uimaASClient, new Object[] { cas });
-		}
-		public void stop() throws Exception {
-			Method clientStopMethod = uimaASClient.getClass().getDeclaredMethod("stop");
-			clientStopMethod.invoke(uimaASClient);
-		}
-		
-
-	}
 	private class UimaAsVersionWrapper {
 		Class<?> clz = null;
 		Method m = null;
 
 		
 		public UimaAsVersionWrapper() throws Exception {
-			clz = Class.forName("org.apache.uima.aae.UimaAsVersion");
+			clz = 
+					Thread.currentThread().getContextClassLoader().loadClass("org.apache.uima.aae.UimaAsVersion");
+				//	Class.forName("org.apache.uima.aae.UimaAsVersion");
 		}
 
 		public String getFullVersion() throws Exception {

Added: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java?rev=1861516&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java Mon Jun 17 15:41:54 2019
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.common.main;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+import javax.xml.parsers.FactoryConfigurationError;
+
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.impl.XmiCasDeserializer;
+import org.apache.uima.internal.util.XMLUtils;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+
+public class AbstractWrapper {
+
+	protected void deserializeCasFromXmi(String anXmlStr, CAS aCAS)
+			throws FactoryConfigurationError, SAXException, IOException {
+
+		Reader reader = new StringReader(anXmlStr);
+		XMLReader xmlReader = XMLUtils.createXMLReader();
+		XmiCasDeserializer deser = new XmiCasDeserializer(aCAS.getTypeSystem());
+		ContentHandler handler = deser.getXmiCasHandler(aCAS);
+		xmlReader.setContentHandler(handler);
+		xmlReader.parse(new InputSource(reader));
+	}
+
+
+}

Added: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java?rev=1861516&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java Mon Jun 17 15:41:54 2019
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.user.common.main;
+
+import java.net.BindException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.ducc.user.common.BasicUimaMetricsGenerator;
+import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
+
+public class UimaAsWrapper extends AbstractWrapper {
+	public static final String brokerPropertyName = "ducc.broker.name";
+	public static final String queuePropertyName = "ducc.queue.name";
+	public static final String duccNodeName = "DUCC_NODENAME";
+	private static final char FS = System.getProperty("file.separator").charAt(0);
+
+    private BrokerService broker = null;
+    private UimaAsynchronousEngine client = null;
+    private String serviceId = null;
+    private String aeName = null;
+	private boolean deserializeFromXMI;
+
+	public void initialize(String analysisEngineDescriptor, String xslTransform, String saxonURL,  int scaleout,  boolean deserialize) throws Exception {
+		synchronized (UimaAsWrapper.class) {
+			deserializeFromXMI = deserialize;
+			// A single thread should deploy a broker and a service.
+			synchronized(UimaAsWrapper.class ) {
+				if ( broker == null ) {
+					deployBroker();
+					Map<String, Object> appCtx = new HashMap<>();
+					appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath, xslTransform.replace('/', FS));
+					appCtx.put(UimaAsynchronousEngine.SaxonClasspath, saxonURL.replace('/', FS));
+					appCtx.put(UimaAsynchronousEngine.CasPoolSize, scaleout);
+					
+					client = new BaseUIMAAsynchronousEngine_impl();
+					
+					serviceId = deployService(analysisEngineDescriptor, appCtx);
+					ProcessingResourceMetaData meta = client.getMetaData();
+					aeName = meta.getName();
+				}
+			}
+		}
+
+	}
+	public synchronized List<Properties> process(String serializedTask) throws Exception {
+
+		CAS cas = client.getCAS();
+		cas.reset();
+		try {
+			if (deserializeFromXMI) {
+				super.deserializeCasFromXmi(serializedTask, cas);
+			} else {
+				cas.setDocumentText(serializedTask);
+				cas.setDocumentLanguage("en");
+			}
+			
+			List<AnalysisEnginePerformanceMetrics> metrics = new ArrayList<>();
+			
+			client.sendAndReceiveCAS(cas,metrics);
+			List<Properties> analysisManagementObjects = new ArrayList<>();
+			for( AnalysisEnginePerformanceMetrics aeMetrics : metrics ) {
+				Properties p = new Properties();
+				p.setProperty(BasicUimaMetricsGenerator.AE_NAME, aeMetrics.getName());
+				p.setProperty(BasicUimaMetricsGenerator.AE_CONTEXT, aeMetrics.getUniqueName());
+				p.setProperty(BasicUimaMetricsGenerator.AE_ANALYSIS_TIME, String.valueOf(aeMetrics.getAnalysisTime()));
+				p.setProperty(BasicUimaMetricsGenerator.AE_CAS_PROCESSED, String.valueOf(aeMetrics.getNumProcessed()));
+				analysisManagementObjects.add(p);
+			}
+			return analysisManagementObjects;
+
+		} finally {
+			cas.release();
+		}
+
+	}
+	public void stop() throws Exception {
+		try {
+			if ( Objects.nonNull(client)) {
+				client.stop();
+			}
+		} finally {
+			if ( Objects.nonNull(broker)) {
+				broker.stop();
+				broker.waitUntilStopped();
+			}
+
+		}
+	}
+	private HashMap<String,String> hideLoggingProperties() {
+		String[] propsToSave = { "log4j.configuration", 
+				                 "java.util.logging.config.file",
+							     "java.util.logging.config.class",
+							     "org.apache.uima.logger.class"};
+		HashMap<String, String> savedPropsMap = new HashMap<String,String>();
+		for (String prop : propsToSave) {
+			String val = System.getProperty(prop);
+			if (val != null) {
+				savedPropsMap.put(prop,  val);
+				System.getProperties().remove(prop);
+				//System.out.println("!!!! Saved prop " + prop + " = " + val);
+			}
+		}
+		return savedPropsMap;
+	}
+
+	private void restoreLoggingProperties(HashMap<String,String> savedPropsMap) {
+		for (String prop : savedPropsMap.keySet()) {
+			System.setProperty(prop, savedPropsMap.get(prop));
+		}
+	}
+
+	private void deployBroker() throws Exception {
+		HashMap<String, String> savedPropsMap = null;
+
+		savedPropsMap = hideLoggingProperties(); // Ensure DUCC doesn't try to use the user's logging setup
+        try {
+    		broker = new BrokerService();
+    		broker.setDedicatedTaskRunner(false); 
+    		broker.setPersistent(false);
+    		// try to first start the embedded broker on port 616226. If not
+    		// available loop until a valid port is found
+    		startBroker("tcp://localhost:", 61626);
+
+        } finally {
+			restoreLoggingProperties(savedPropsMap); // May not be necessary as user's logger has been established
+        }
+	}
+	private String deployService(String analysisEngineDescriptor, Map<String, Object> appCtx) throws Exception {
+
+		String containerId = client.deploy(analysisEngineDescriptor, appCtx);
+		
+		String endpoint = System.getProperty(queuePropertyName);
+		String brokerURL = System.getProperty(brokerPropertyName);
+
+		appCtx.put(UimaAsynchronousEngine.ServerUri, brokerURL);
+		appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
+		appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+		appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
+		appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+
+		client.initialize(appCtx);
+		
+		return containerId;
+	}
+	private void startBroker(String brokerURL, int startPort) throws Exception {
+		// loop until a valid port is found for the broker
+		while (true) {
+			
+			try {
+				broker.addConnector(brokerURL+startPort);
+				broker.start();
+				broker.waitUntilStarted();
+				System.setProperty("DefaultBrokerURL", brokerURL + startPort);
+				System.setProperty("BrokerURI", brokerURL + startPort);
+				// Needed to resolve ${broker.name} placeholder in DD generated by DUCC
+				System.setProperty(brokerPropertyName, brokerURL + startPort);
+
+				break; // got a valid port for the broker
+				
+			} catch (Exception e) {
+				if (isBindException(e)) {
+					startPort++;
+				} else {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+	}
+	private boolean isBindException(Throwable e) {
+		if (e == null) {
+			return false;
+		}
+
+		if (e instanceof BindException) {
+			return true;
+		} else if (e instanceof SocketException && "Address already in use".equals(e.getMessage())) {
+			return true;
+		} else if (e.getCause() != null) {
+			return isBindException(e.getCause());
+		} else {
+			return false;
+		}
+	}
+}

Modified: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java?rev=1861516&r1=1861515&r2=1861516&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java Mon Jun 17 15:41:54 2019
@@ -18,35 +18,24 @@
 */
 package org.apache.uima.ducc.user.common.main;
 
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
-import javax.xml.parsers.FactoryConfigurationError;
-
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.analysis_engine.AnalysisEngine;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
-import org.apache.uima.cas.impl.XmiCasDeserializer;
 import org.apache.uima.ducc.user.common.BasicUimaMetricsGenerator;
 import org.apache.uima.ducc.user.common.UimaUtils;
-import org.apache.uima.internal.util.XMLUtils;
 import org.apache.uima.resource.Resource;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.resource.ResourceManager;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.util.CasPool;
-import org.xml.sax.ContentHandler;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
-import org.xml.sax.XMLReader;
 
-public class UimaWrapper {
+public class UimaWrapper extends AbstractWrapper {
 	private CasPool casPool = null;
 	private ResourceManager rm = UIMAFramework.newDefaultResourceManager();
 	// Platform MBean server if one is available (Java 1.5 only)
@@ -91,7 +80,7 @@ public class UimaWrapper {
 		CAS cas = casPool.getCas();
 		try {
 			if (deserializeFromXMI) {
-				deserializeCasFromXmi(serializedTask, cas);
+				super.deserializeCasFromXmi(serializedTask, cas);
 			} else {
 				cas.setDocumentText(serializedTask);
 				cas.setDocumentLanguage("en");
@@ -112,17 +101,6 @@ public class UimaWrapper {
 
 	}
 
-	public void deserializeCasFromXmi(String anXmlStr, CAS aCAS)
-			throws FactoryConfigurationError, SAXException, IOException {
-
-		XMLReader xmlReader = XMLUtils.createXMLReader();
-		Reader reader = new StringReader(anXmlStr);
-		XmiCasDeserializer deser = new XmiCasDeserializer(aCAS.getTypeSystem());
-		ContentHandler handler = deser.getXmiCasHandler(aCAS);
-		xmlReader.setContentHandler(handler);
-		xmlReader.parse(new InputSource(reader));
-	}
-
 	public void stop(ThreadLocal<AnalysisEngine> threadLocal) {
 		AnalysisEngine ae = threadLocal.get();
 		if (ae != null) {