You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/06/17 15:41:55 UTC
svn commit: r1861516 - in /uima/uima-ducc/trunk:
uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/
uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/
Author: cwiklik
Date: Mon Jun 17 15:41:54 2019
New Revision: 1861516
URL: http://svn.apache.org/viewvc?rev=1861516&view=rev
Log:
UIMA-6040 added support for claspath isolation to Uima As based JPs
Added:
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java?rev=1861516&r1=1861515&r2=1861516&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java Mon Jun 17 15:41:54 2019
@@ -21,21 +21,17 @@ package org.apache.uima.ducc.ps.service.
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Method;
-import java.net.BindException;
import java.net.InetAddress;
-import java.net.SocketException;
+import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.uima.UIMAFramework;
-import org.apache.uima.cas.CAS;
import org.apache.uima.ducc.ps.service.IScaleable;
import org.apache.uima.ducc.ps.service.IServiceState;
import org.apache.uima.ducc.ps.service.ServiceConfiguration;
@@ -55,8 +51,6 @@ import org.apache.uima.ducc.ps.service.p
import org.apache.uima.ducc.ps.service.processor.uima.utils.UimaResultDefaultSerializer;
import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
import org.apache.uima.ducc.ps.service.utils.Utils;
-import org.apache.uima.resource.metadata.impl.TypeSystemDescription_impl;
-import org.apache.uima.util.CasCreationUtils;
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
@@ -66,7 +60,6 @@ public class UimaAsServiceProcessor exte
Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
// Map to store DuccUimaSerializer instances. Each has affinity to a thread
private static Object brokerInstance = null;
- private UimaAsClientWrapper uimaASClient = null;
private String saxonURL = null;
private String xslTransform = null;
protected Object initializeMonitor = new Object();
@@ -91,6 +84,21 @@ public class UimaAsServiceProcessor exte
private String duccHome = null;
boolean enablePerformanceBreakdownReporting = false;
private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
+
+ public static final String AE_NAME = "AeName";
+ public static final String AE_CONTEXT = "AeContext";
+ public static final String AE_ANALYSIS_TIME = "AeAnalysisTime";
+ public static final String AE_CAS_PROCESSED = "AeProcessedCasCount";
+
+
+ private static String M_PROCESS="process";
+ private static String M_STOP="stop";
+ private static String M_INITIALIZE="initialize";
+
+ private Method processMethod;
+ private Method stopMethod;
+ private Object processorInstance;
+
static {
// try to get platform MBean Server (Java 1.5 only)
try {
@@ -150,8 +158,25 @@ public class UimaAsServiceProcessor exte
monitor =
new RemoteStateObserver(serviceConfiguration, logger);
}
+
+ public void dump(ClassLoader cl, int numLevels) {
+ int n = 0;
+ for (URLClassLoader ucl = (URLClassLoader) cl; ucl != null
+ && ++n <= numLevels; ucl = (URLClassLoader) ucl.getParent()) {
+ System.out.println("Class-loader " + n + " has "
+ + ucl.getURLs().length + " urls:");
+ for (URL u : ucl.getURLs()) {
+ System.out.println(" " + u);
+ }
+ }
+ }
+
@Override
public void initialize() throws ServiceInitializationException {
+ // Save current context cl and inject System classloader as
+ // a context cl before calling user code.
+ ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+
try {
duccHome = System.getProperty("DUCC_HOME");
String pid = Utils.getPID("Queue");
@@ -189,17 +214,45 @@ public class UimaAsServiceProcessor exte
argList.add("-t");
argList.add(System.getProperty("ducc.deploy.JpThreadCount"));
}
-
- enableMetricsIfNewUimaAs();
+
resultSerializer = new UimaResultDefaultSerializer();
- uimaASClient = new UimaAsClientWrapper();
+
+ Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+ dump(ClassLoader.getSystemClassLoader(), 3);
+ enableMetricsIfNewUimaAs();
+
+ // load proxy class from uima-ducc-user.jar to access uima classes. The UimaWrapper is a convenience/wrapper
+ // class so that we dont have to use reflection on Uima classes.
+ Class<?> classToLaunch =
+ ClassLoader.getSystemClassLoader().loadClass("org.apache.uima.ducc.user.common.main.UimaAsWrapper");
+
+ processorInstance = classToLaunch.newInstance();
+
+ Method initMethod = processorInstance.getClass().getMethod(M_INITIALIZE, String.class,String.class,String.class, int.class, boolean.class);
+
+ processMethod = processorInstance.getClass().getMethod(M_PROCESS, new Class[] {String.class});
+
+ stopMethod = processorInstance.getClass().getMethod(M_STOP);
+ //public void initialize(String analysisEngineDescriptor, String xslTransform, String saxonURL, int scaleout, boolean deserialize) throws Exception {
+
+
+
+
+
+
+
+
+ // uimaASClient = new UimaAsClientWrapper();
scaleout = generateDescriptorsAndGetScaleout(argList.toArray(new String[argList.size()])); // Also converts the DD if necessary
if ( scaleout == 0 ) {
scaleout = 1;
}
+ // initialize AE via UimaWrapper
+ initMethod.invoke(processorInstance, deploymentDescriptors[0], xsltPath, saxonPath, scaleout, (serviceConfiguration.getJpType() != null));
+
initialized = true;
}
- doDeploy();
+ //doDeploy();
}
if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) {
super.delay(logger, DEFAULT_INIT_DELAY);
@@ -211,6 +264,9 @@ public class UimaAsServiceProcessor exte
logger.log(Level.WARNING, null, e);
monitor.onStateChange(IServiceState.State.FailedInitialization.toString(), new Properties());
throw new ServiceInitializationException("",e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(savedCL);
+
}
}
@@ -239,74 +295,43 @@ public class UimaAsServiceProcessor exte
matcher.appendTail(sb);
return sb.toString();
}
-
- private CAS getCAS(String serializedTask) throws Exception {
- CAS cas = uimaASClient.getCAS();
- // DUCC JP services are given a serialized CAS ... others just the doc-text for
- // a CAS
- if (serviceConfiguration.getJpType() != null) {
- // Use thread dedicated UimaSerializer to de-serialize the CAS
- getUimaSerializer().deserializeCasFromXmi(serializedTask, cas);
- } else {
- cas.setDocumentText(serializedTask);
- cas.setDocumentLanguage("en");
- }
- return cas;
- }
-
@Override
public IProcessResult process(String serializedTask) {
- CAS cas = null;
+ // CAS cas = null;
IProcessResult result;
+ // Save current context cl and inject System classloader as
+ // a context cl before calling user code.
+ ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+
+ Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+
try {
- cas = getCAS(serializedTask);
List<PerformanceMetrics> casMetrics = new ArrayList<>();
if (enablePerformanceBreakdownReporting) {
- List<?> perfMetrics = new ArrayList<>();
-
try {
- uimaASClient.sendAndReceive(cas, perfMetrics);
- successCount.incrementAndGet();
- errorCountSinceLastSuccess.set(0);
-
- } catch (Exception t) {
- logger.log(Level.WARNING, "", t);
- IWindowStats stats =
- new ProcessWindowStats(errorCount.incrementAndGet(),
- successCount.get(),
- errorCountSinceLastSuccess.incrementAndGet());
- Action action =
- errorHandler.handleProcessError(t, this, stats);
- result = new UimaProcessResult(t, action);
- return result;
- }
-
- for (Object metrics : perfMetrics) {
- Method nameMethod = metrics.getClass().getDeclaredMethod("getName");
- String name = (String) nameMethod.invoke(metrics);
- Method uniqueNameMethod = metrics.getClass().getDeclaredMethod("getUniqueName");
- String uniqueName = (String) uniqueNameMethod.invoke(metrics);
- Method analysisTimeMethod = metrics.getClass().getDeclaredMethod("getAnalysisTime");
- long analysisTime = (long) analysisTimeMethod.invoke(metrics);
- Method currentTaskCountMethod = metrics.getClass().getDeclaredMethod("getNumProcessed");
- long currentTaskCount = (long)currentTaskCountMethod.invoke(metrics);
- boolean aggregate = uniqueName.startsWith("/" + name);
- int pos = uniqueName.indexOf("/", 1);
- if (pos > -1 && scaleout > 1 && name != null && aggregate) {
- String st = uniqueName.substring(pos);
- uniqueName = "/" + name + st;
+ // Use basic data structures for returning performance metrics from
+ // a processor process(). The PerformanceMetrics class is not
+ // visible in the code packaged in uima-ducc-user.jar so return
+ // metrics in Properties object for each AE.
+ List<Properties> metrics;
+
+ // *****************************************************
+ // PROCESS
+ // *****************************************************
+ metrics = (List<Properties>) processMethod.invoke(processorInstance, serializedTask);
+ for( Properties p : metrics ) {
+ // there is Properties object for each AE, so create an
+ // instance of PerformanceMetrics and initialize it
+ PerformanceMetrics pm =
+ new PerformanceMetrics(p.getProperty(AE_NAME),
+ p.getProperty(AE_CONTEXT),
+ Long.valueOf(p.getProperty(AE_ANALYSIS_TIME)),
+ Long.valueOf(p.getProperty(AE_CAS_PROCESSED)));
+ casMetrics.add(pm);
}
- PerformanceMetrics pm = new PerformanceMetrics(name, uniqueName, analysisTime, currentTaskCount);
- casMetrics.add(pm);
- }
- } else {
- // delegate processing to the UIMA-AS service and wait for a reply
- try {
-
- uimaASClient.sendAndReceive(cas);
successCount.incrementAndGet();
errorCountSinceLastSuccess.set(0);
@@ -318,14 +343,9 @@ public class UimaAsServiceProcessor exte
errorCountSinceLastSuccess.incrementAndGet());
Action action =
errorHandler.handleProcessError(t, this, stats);
-
result = new UimaProcessResult(t, action);
return result;
- }
- PerformanceMetrics pm = new PerformanceMetrics(
- "Performance Metrics Not Supported For DD Jobs and UIMA-AS <= v2.6.0",
- "Performance Metrics Not Supported For DD Jobs and UIMA-AS <= v2.6.0 ", 0, 0);
- casMetrics.add(pm);
+ }
}
return new UimaProcessResult(resultSerializer.serialize(casMetrics));
@@ -335,38 +355,10 @@ public class UimaAsServiceProcessor exte
result = new UimaProcessResult(t, Action.TERMINATE);
return result;
} finally {
- if (cas != null) {
- cas.release();
- }
+ Thread.currentThread().setContextClassLoader(savedCL);
}
}
- private void doDeploy() throws Exception {
- // deploy singleUIMA-AS Version instance of embedded broker
- try {
- // below code runs once to create broker, uima-as client and
- // uima-as service
- if (brokerInstance == null) {
- deployBroker(duccHome);
- // Broker is running
- brokerRunning = true;
-
- int i = 0;
- // Deploy UIMA-AS services
- for (String dd : deploymentDescriptors) {
- // Deploy UIMA-AS service. Keep the deployment id so
- // that we can undeploy uima-as service on stop.
- ids[i] = uimaASClient.deployService(dd);
- }
- // send GetMeta to UIMA-AS service and wait for a reply
- uimaASClient.initialize();
- }
- } catch (Throwable e) {
- logger.log(Level.WARNING, "UimaAsServiceProcesser", e);
- throw new RuntimeException(e);
-
- }
- }
private Properties loadDuccProperties() throws Exception {
Properties duccProperties = new Properties();
duccProperties.load(new FileInputStream(System.getProperty("ducc.deploy.configuration")));
@@ -451,116 +443,32 @@ public class UimaAsServiceProcessor exte
@Override
public void stop() {
synchronized (UimaAsServiceProcessor.class) {
- if (brokerRunning) {
- logger.log(Level.INFO, "Stopping UIMA-AS Client");
- System.out.println("Stopping UIMA-AS Client");
- try {
- // Prevent UIMA-AS from exiting
- System.setProperty("dontKill", "true");
- uimaASClient.stop();
- System.out.println("UIMA-AS Client Stopped");
- Method brokerStopMethod = classToLaunch.getMethod("stop");
- brokerStopMethod.invoke(brokerInstance);
-
- Method waitMethod = classToLaunch.getMethod("waitUntilStopped");
- waitMethod.invoke(brokerInstance);
- brokerRunning = false;
- System.out.println("Internal Broker Stopped");
- super.stop();
-
- } catch (Exception e) {
- logger.log(Level.WARNING, "stop", e);
- }
-
- }
- }
-
- }
-
- private void deployBroker(String duccHome) throws Exception {
- // Save current context class loader. When done loading the broker jars
- // this class loader will be restored
- ClassLoader currentCL = Thread.currentThread().getContextClassLoader();
- HashMap<String, String> savedPropsMap = null;
-
- try {
- // setup a classpath for Ducc broker
- String[] brokerClasspath = new String[] {
- duccHome + File.separator + "apache-uima" + File.separator + "apache-activemq" + File.separator
- + "lib" + File.separator + "*",
- duccHome + File.separator + "apache-uima" + File.separator + "apache-activemq" + File.separator
- + "lib" + File.separator + "optional" + File.separator + "*" };
-
- // isolate broker in its own Class loader
- URLClassLoader ucl = Utils.create(brokerClasspath);
- Thread.currentThread().setContextClassLoader(ucl);
- savedPropsMap = Utils.hideLoggingProperties(); // Ensure DUCC doesn't try to use the user's logging setup
-
- classToLaunch = ucl.loadClass("org.apache.activemq.broker.BrokerService");
- if (System.getProperty("ducc.debug") != null) {
- Utils.dump(ucl, 4);
+
+ logger.log(Level.INFO,this.getClass().getName()+" stop() called");
+ // save current context cl and inject System classloader as
+ // a context cl before calling user code. This is done in
+ // user code needs to load resources
+ ClassLoader savedCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+ if ( logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,"",">>>> stop():::Context Classloader Switch - Executing code from System Classloader");
}
- brokerInstance = classToLaunch.newInstance();
-
- Method setDedicatedTaskRunnerMethod = classToLaunch.getMethod("setDedicatedTaskRunner", boolean.class);
- setDedicatedTaskRunnerMethod.invoke(brokerInstance, false);
-
- Method setPersistentMethod = classToLaunch.getMethod("setPersistent", boolean.class);
- setPersistentMethod.invoke(brokerInstance, false);
-
- int port = 61626; // try to start the colocated broker with this port first
- String brokerURL = "tcp://localhost:";
- // loop until a valid port is found for the broker
- while (true) {
- try {
- Method addConnectorMethod = classToLaunch.getMethod("addConnector", String.class);
- addConnectorMethod.invoke(brokerInstance, brokerURL + port);
-
- Method startMethod = classToLaunch.getMethod("start");
- startMethod.invoke(brokerInstance);
-
- Method waitUntilStartedMethod = classToLaunch.getMethod("waitUntilStarted");
- waitUntilStartedMethod.invoke(brokerInstance);
- System.setProperty("DefaultBrokerURL", brokerURL + port);
- System.setProperty("BrokerURI", brokerURL + port);
- // Needed to resolve ${broker.name} placeholder in DD generated by DUCC
- System.setProperty(brokerPropertyName, brokerURL + port);
-
- break; // got a valid port for the broker
- } catch (Exception e) {
- if (isBindException(e)) {
- port++;
- } else {
- throw new RuntimeException(e);
- }
+ try {
+ System.setProperty("dontKill", "true");
+ stopMethod.invoke(processorInstance);
+ super.stop();
+
+ } catch( Exception e) {
+ logger.log(Level.WARNING, "stop", e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(savedCL);
+ if ( logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,"",">>>> stop():::Context Classloader Switch - Restored Ducc Classloader");
}
}
- } catch (Exception e) {
- throw e;
- } finally {
- // restore context class loader
- Thread.currentThread().setContextClassLoader(currentCL);
- brokerLatch.countDown();
- Utils.restoreLoggingProperties(savedPropsMap); // May not be necessary as user's logger has been established
- }
-
- }
-
- private boolean isBindException(Throwable e) {
- if (e == null) {
- return false;
}
- if (e instanceof BindException) {
- return true;
- } else if (e instanceof SocketException && "Address already in use".equals(e.getMessage())) {
- return true;
- } else if (e.getCause() != null) {
- return isBindException(e.getCause());
- } else {
- return false;
- }
}
private static void printUsageMessage() {
@@ -570,111 +478,16 @@ public class UimaAsServiceProcessor exte
+ "-xslt path-to-dd2spring-xslt\n" + " or\n"
+ "path to Spring XML Configuration File which is the output of running dd2spring\n");
}
- public static void main(String[] args) {
- try {
- UimaAsServiceProcessor processor =
- new UimaAsServiceProcessor(args, null);
- processor.initialize();
- CAS cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null);
- cas.setDocumentLanguage("en");
- cas.setDocumentText("Test");
- UimaSerializer serializer =
- new UimaSerializer();
- String serializedCas = serializer.serializeCasToXmi(cas);
-
- IProcessResult result =
- processor.process(serializedCas);
- System.out.println("Client Received Result - Success:"+(result.getResult()!=null));
- processor.stop();
- } catch( Exception e) {
- e.printStackTrace();
- }
-
- }
- private class UimaAsClientWrapper {
- private Object uimaASClient;
- private Class<?> clientClz;
-
- public UimaAsClientWrapper() throws Exception {
- clientClz = Class
- .forName("org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl");
- uimaASClient = clientClz.newInstance();
-
- }
-
- public String deployService(String aDeploymentDescriptorPath) throws Exception {
- Map<String, Object> appCtx = new HashMap<>();
-
- Class<?> clz = Class.forName("org.apache.uima.aae.client.UimaAsynchronousEngine");
-
- appCtx.put((String) clz.getField("DD2SpringXsltFilePath").get(uimaASClient), xslTransform.replace('/', FS));
- appCtx.put((String) clz.getField("SaxonClasspath").get(uimaASClient), saxonURL.replace('/', FS));
- appCtx.put((String) clz.getField("CasPoolSize").get(uimaASClient), scaleout);
-
- String containerId = null;
- // use UIMA-AS client to deploy the service using provided
- // Deployment Descriptor
- ClassLoader duccCl = Thread.currentThread().getContextClassLoader();
- ClassLoader cl = this.getClass().getClassLoader();
- Thread.currentThread().setContextClassLoader(cl);
- Method deployMethod = uimaASClient.getClass().getDeclaredMethod("deploy", String.class, Map.class);
- containerId = (String) deployMethod.invoke(uimaASClient,
- new Object[] { aDeploymentDescriptorPath, appCtx });
- Thread.currentThread().setContextClassLoader(duccCl);
- return containerId;
- }
-
- private void initialize() throws Exception {
-
- String endpoint = System.getProperty(queuePropertyName);
- String brokerURL = System.getProperty(brokerPropertyName);
- Map<String, Object> appCtx = new HashMap<>();
- Class<?> clz = Class.forName("org.apache.uima.aae.client.UimaAsynchronousEngine");
-
- appCtx.put((String) clz.getField("ServerUri").get(uimaASClient), brokerURL);
- appCtx.put((String) clz.getField("ENDPOINT").get(uimaASClient), endpoint);
- appCtx.put((String) clz.getField("CasPoolSize").get(uimaASClient), scaleout);
- appCtx.put((String) clz.getField("Timeout").get(uimaASClient), 0);
- appCtx.put((String) clz.getField("GetMetaTimeout").get(uimaASClient), 0);
- appCtx.put((String) clz.getField("CpcTimeout").get(uimaASClient), 1100);
- Method initMethod = uimaASClient.getClass().getMethod("initialize", Map.class);
- initMethod.invoke(uimaASClient, new Object[] { appCtx });
-
- // blocks until the client initializes
- Method getMetaMethod = uimaASClient.getClass().getMethod("getMetaData");
- Object meta = getMetaMethod.invoke(uimaASClient);
-
- Method nameMethod = meta.getClass().getMethod("getName");
- aeName = (String) nameMethod.invoke(meta);
- }
- public CAS getCAS() throws Exception {
- Method getCasMethod = uimaASClient.getClass().getMethod("getCAS");
- return (CAS) getCasMethod.invoke(uimaASClient);
- }
- public void sendAndReceive(CAS cas, List<?> perfMetrics) throws Exception {
- Method sendMethod = uimaASClient.getClass().getMethod("sendAndReceiveCAS", CAS.class,
- List.class);
- sendMethod.invoke(uimaASClient, new Object[] { cas, perfMetrics });
- }
- public void sendAndReceive(CAS cas) throws Exception {
- Method sendMethod = uimaASClient.getClass().getDeclaredMethod("sendAndReceiveCAS", CAS.class);
- sendMethod.invoke(uimaASClient, new Object[] { cas });
- }
- public void stop() throws Exception {
- Method clientStopMethod = uimaASClient.getClass().getDeclaredMethod("stop");
- clientStopMethod.invoke(uimaASClient);
- }
-
-
- }
private class UimaAsVersionWrapper {
Class<?> clz = null;
Method m = null;
public UimaAsVersionWrapper() throws Exception {
- clz = Class.forName("org.apache.uima.aae.UimaAsVersion");
+ clz =
+ Thread.currentThread().getContextClassLoader().loadClass("org.apache.uima.aae.UimaAsVersion");
+ // Class.forName("org.apache.uima.aae.UimaAsVersion");
}
public String getFullVersion() throws Exception {
Added: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java?rev=1861516&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/AbstractWrapper.java Mon Jun 17 15:41:54 2019
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.common.main;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+import javax.xml.parsers.FactoryConfigurationError;
+
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.impl.XmiCasDeserializer;
+import org.apache.uima.internal.util.XMLUtils;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+
+public class AbstractWrapper {
+
+ protected void deserializeCasFromXmi(String anXmlStr, CAS aCAS)
+ throws FactoryConfigurationError, SAXException, IOException {
+
+ Reader reader = new StringReader(anXmlStr);
+ XMLReader xmlReader = XMLUtils.createXMLReader();
+ XmiCasDeserializer deser = new XmiCasDeserializer(aCAS.getTypeSystem());
+ ContentHandler handler = deser.getXmiCasHandler(aCAS);
+ xmlReader.setContentHandler(handler);
+ xmlReader.parse(new InputSource(reader));
+ }
+
+
+}
Added: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java?rev=1861516&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaAsWrapper.java Mon Jun 17 15:41:54 2019
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.user.common.main;
+
+import java.net.BindException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.ducc.user.common.BasicUimaMetricsGenerator;
+import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
+
+public class UimaAsWrapper extends AbstractWrapper {
+ public static final String brokerPropertyName = "ducc.broker.name";
+ public static final String queuePropertyName = "ducc.queue.name";
+ public static final String duccNodeName = "DUCC_NODENAME";
+ private static final char FS = System.getProperty("file.separator").charAt(0);
+
+ private BrokerService broker = null;
+ private UimaAsynchronousEngine client = null;
+ private String serviceId = null;
+ private String aeName = null;
+ private boolean deserializeFromXMI;
+
+ public void initialize(String analysisEngineDescriptor, String xslTransform, String saxonURL, int scaleout, boolean deserialize) throws Exception {
+ synchronized (UimaAsWrapper.class) {
+ deserializeFromXMI = deserialize;
+ // A single thread should deploy a broker and a service.
+ synchronized(UimaAsWrapper.class ) {
+ if ( broker == null ) {
+ deployBroker();
+ Map<String, Object> appCtx = new HashMap<>();
+ appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath, xslTransform.replace('/', FS));
+ appCtx.put(UimaAsynchronousEngine.SaxonClasspath, saxonURL.replace('/', FS));
+ appCtx.put(UimaAsynchronousEngine.CasPoolSize, scaleout);
+
+ client = new BaseUIMAAsynchronousEngine_impl();
+
+ serviceId = deployService(analysisEngineDescriptor, appCtx);
+ ProcessingResourceMetaData meta = client.getMetaData();
+ aeName = meta.getName();
+ }
+ }
+ }
+
+ }
+ public synchronized List<Properties> process(String serializedTask) throws Exception {
+
+ CAS cas = client.getCAS();
+ cas.reset();
+ try {
+ if (deserializeFromXMI) {
+ super.deserializeCasFromXmi(serializedTask, cas);
+ } else {
+ cas.setDocumentText(serializedTask);
+ cas.setDocumentLanguage("en");
+ }
+
+ List<AnalysisEnginePerformanceMetrics> metrics = new ArrayList<>();
+
+ client.sendAndReceiveCAS(cas,metrics);
+ List<Properties> analysisManagementObjects = new ArrayList<>();
+ for( AnalysisEnginePerformanceMetrics aeMetrics : metrics ) {
+ Properties p = new Properties();
+ p.setProperty(BasicUimaMetricsGenerator.AE_NAME, aeMetrics.getName());
+ p.setProperty(BasicUimaMetricsGenerator.AE_CONTEXT, aeMetrics.getUniqueName());
+ p.setProperty(BasicUimaMetricsGenerator.AE_ANALYSIS_TIME, String.valueOf(aeMetrics.getAnalysisTime()));
+ p.setProperty(BasicUimaMetricsGenerator.AE_CAS_PROCESSED, String.valueOf(aeMetrics.getNumProcessed()));
+ analysisManagementObjects.add(p);
+ }
+ return analysisManagementObjects;
+
+ } finally {
+ cas.release();
+ }
+
+ }
+ public void stop() throws Exception {
+ try {
+ if ( Objects.nonNull(client)) {
+ client.stop();
+ }
+ } finally {
+ if ( Objects.nonNull(broker)) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ }
+ }
+ private HashMap<String,String> hideLoggingProperties() {
+ String[] propsToSave = { "log4j.configuration",
+ "java.util.logging.config.file",
+ "java.util.logging.config.class",
+ "org.apache.uima.logger.class"};
+ HashMap<String, String> savedPropsMap = new HashMap<String,String>();
+ for (String prop : propsToSave) {
+ String val = System.getProperty(prop);
+ if (val != null) {
+ savedPropsMap.put(prop, val);
+ System.getProperties().remove(prop);
+ //System.out.println("!!!! Saved prop " + prop + " = " + val);
+ }
+ }
+ return savedPropsMap;
+ }
+
+ private void restoreLoggingProperties(HashMap<String,String> savedPropsMap) {
+ for (String prop : savedPropsMap.keySet()) {
+ System.setProperty(prop, savedPropsMap.get(prop));
+ }
+ }
+
+ private void deployBroker() throws Exception {
+ HashMap<String, String> savedPropsMap = null;
+
+ savedPropsMap = hideLoggingProperties(); // Ensure DUCC doesn't try to use the user's logging setup
+ try {
+ broker = new BrokerService();
+ broker.setDedicatedTaskRunner(false);
+ broker.setPersistent(false);
+ // try to first start the embedded broker on port 616226. If not
+ // available loop until a valid port is found
+ startBroker("tcp://localhost:", 61626);
+
+ } finally {
+ restoreLoggingProperties(savedPropsMap); // May not be necessary as user's logger has been established
+ }
+ }
+ private String deployService(String analysisEngineDescriptor, Map<String, Object> appCtx) throws Exception {
+
+ String containerId = client.deploy(analysisEngineDescriptor, appCtx);
+
+ String endpoint = System.getProperty(queuePropertyName);
+ String brokerURL = System.getProperty(brokerPropertyName);
+
+ appCtx.put(UimaAsynchronousEngine.ServerUri, brokerURL);
+ appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
+ appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+
+ client.initialize(appCtx);
+
+ return containerId;
+ }
+ private void startBroker(String brokerURL, int startPort) throws Exception {
+ // loop until a valid port is found for the broker
+ while (true) {
+
+ try {
+ broker.addConnector(brokerURL+startPort);
+ broker.start();
+ broker.waitUntilStarted();
+ System.setProperty("DefaultBrokerURL", brokerURL + startPort);
+ System.setProperty("BrokerURI", brokerURL + startPort);
+ // Needed to resolve ${broker.name} placeholder in DD generated by DUCC
+ System.setProperty(brokerPropertyName, brokerURL + startPort);
+
+ break; // got a valid port for the broker
+
+ } catch (Exception e) {
+ if (isBindException(e)) {
+ startPort++;
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ private boolean isBindException(Throwable e) {
+ if (e == null) {
+ return false;
+ }
+
+ if (e instanceof BindException) {
+ return true;
+ } else if (e instanceof SocketException && "Address already in use".equals(e.getMessage())) {
+ return true;
+ } else if (e.getCause() != null) {
+ return isBindException(e.getCause());
+ } else {
+ return false;
+ }
+ }
+}
Modified: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java?rev=1861516&r1=1861515&r2=1861516&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/UimaWrapper.java Mon Jun 17 15:41:54 2019
@@ -18,35 +18,24 @@
*/
package org.apache.uima.ducc.user.common.main;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-import javax.xml.parsers.FactoryConfigurationError;
-
import org.apache.uima.UIMAFramework;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
-import org.apache.uima.cas.impl.XmiCasDeserializer;
import org.apache.uima.ducc.user.common.BasicUimaMetricsGenerator;
import org.apache.uima.ducc.user.common.UimaUtils;
-import org.apache.uima.internal.util.XMLUtils;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.util.CasPool;
-import org.xml.sax.ContentHandler;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
-import org.xml.sax.XMLReader;
-public class UimaWrapper {
+public class UimaWrapper extends AbstractWrapper {
private CasPool casPool = null;
private ResourceManager rm = UIMAFramework.newDefaultResourceManager();
// Platform MBean server if one is available (Java 1.5 only)
@@ -91,7 +80,7 @@ public class UimaWrapper {
CAS cas = casPool.getCas();
try {
if (deserializeFromXMI) {
- deserializeCasFromXmi(serializedTask, cas);
+ super.deserializeCasFromXmi(serializedTask, cas);
} else {
cas.setDocumentText(serializedTask);
cas.setDocumentLanguage("en");
@@ -112,17 +101,6 @@ public class UimaWrapper {
}
- public void deserializeCasFromXmi(String anXmlStr, CAS aCAS)
- throws FactoryConfigurationError, SAXException, IOException {
-
- XMLReader xmlReader = XMLUtils.createXMLReader();
- Reader reader = new StringReader(anXmlStr);
- XmiCasDeserializer deser = new XmiCasDeserializer(aCAS.getTypeSystem());
- ContentHandler handler = deser.getXmiCasHandler(aCAS);
- xmlReader.setContentHandler(handler);
- xmlReader.parse(new InputSource(reader));
- }
-
public void stop(ThreadLocal<AnalysisEngine> threadLocal) {
AnalysisEngine ae = threadLocal.get();
if (ae != null) {