You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/10 15:26:54 UTC
svn commit: r1644428 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp:
./ iface/ uima/
Author: cwiklik
Date: Wed Dec 10 14:26:53 2014
New Revision: 1644428
URL: http://svn.apache.org/r1644428
Log:
UIMA-4057 Added support for uima-as and core uima containers
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java (with props)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.jp;
+
+public class ArgsParser {
+ /**
+ * scan args for a particular arg, return the following token or the empty
+ * string if not found
+ *
+ * @param id
+ * the arg to search for
+ * @param args
+ * the array of strings
+ * @return the following token, or a 0 length string if not found
+ */
+ public static String getArg(String id, String[] args) {
+ for (int i = 0; i < args.length; i++) {
+ if (id.equals(args[i]))
+ return (i + 1 < args.length) ? args[i + 1] : "";
+ }
+ return "";
+ }
+
+ /**
+ * scan args for a particular arg, return the following token(s) or the
+ * empty string if not found
+ *
+ * @param id
+ * the arg to search for
+ * @param args
+ * the array of strings
+ * @return the following token, or a 0 length string array if not found
+ */
+ public static String[] getMultipleArg(String id, String[] args) {
+ String[] retr = {};
+ for (int i = 0; i < args.length; i++) {
+ if (id.equals(args[i])) {
+ String[] temp = new String[retr.length + 1];
+ for (int s = 0; s < retr.length; s++) {
+ temp[s] = retr[s];
+ }
+ retr = temp;
+ retr[retr.length - 1] = (i + 1 < args.length) ? args[i + 1]
+ : null;
+ }
+ }
+ return retr;
+ }
+
+ /**
+ * scan args for a particular arg, return the following token(s) or the
+ * empty string if not found
+ *
+ * @param id
+ * the arg to search for
+ * @param args
+ * the array of strings
+ * @return the following token, or a 0 length string array if not found
+ */
+ public static String[] getMultipleArg2(String id, String[] args) {
+ String[] retr = {};
+ for (int i = 0; i < args.length; i++) {
+ if (id.equals(args[i])) {
+ int j = 0;
+ while ((i + 1 + j < args.length)
+ && !args[i + 1 + j].startsWith("-")) {
+ String[] temp = new String[retr.length + 1];
+ for (int s = 0; s < retr.length; s++) {
+ temp[s] = retr[s];
+ }
+ retr = temp;
+ retr[retr.length - 1] = args[i + 1 + j++];
+ }
+ return retr;
+ }
+ }
+ return retr;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.ducc.user.jp;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UimaAsVersion;
+import org.apache.uima.aae.UimaSerializer;
+import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.collection.EntityProcessStatus;
+import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
+import org.apache.uima.util.Level;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+public class UimaASProcessContainer implements IProcessContainer {
+ String endpointName;
+ protected int scaleout;
+ String saxonURL = null;
+ String xslTransform = null;
+ String uimaAsDebug = null;
+ static final BaseUIMAAsynchronousEngine_impl uimaASClient = new BaseUIMAAsynchronousEngine_impl();
+ private static final CountDownLatch brokerLatch = new CountDownLatch(1);
+
+ protected Object initializeMonitor = new Object();
+ public volatile boolean initialized = false;
+ private static final Class<?> CLASS_NAME = UimaProcessContainer.class;
+ private static final char FS = System.getProperty("file.separator").charAt(
+ 0);
+ public static BrokerService broker = null;
+ private UimaSerializer uimaSerializer = new UimaSerializer();
+ private String[] deploymentDescriptors = null;
+ private String[] ids = null;
+
+ public int initialize(String[] args) throws Exception {
+ // Get DDs and also extract scaleout property from DD
+ deploymentDescriptors = getDescriptors(args);
+ ids = new String[deploymentDescriptors.length];
+
+ return scaleout;
+ }
+
+ /**
+ * This method is called via reflection and deploys both the colocated AMQ broker
+ * and UIMA-AS service.
+ *
+ * @param args - command line args
+ * @return
+ * @throws Exception
+ */
+// public void deploy(String[] args) throws Exception {
+ public void deploy() throws Exception {
+ System.out.println("UIMA-AS Version::"+UimaAsVersion.getFullVersionString());
+ synchronized( UimaASProcessContainer.class) {
+ if ( broker == null ) {
+ broker = new BrokerService();
+ broker.setDedicatedTaskRunner(false);
+ broker.setPersistent(false);
+ int port = 61626; // try to start the colocated broker with this port first
+ String brokerURL = "tcp://localhost:";
+ // loop until a valid port is found for the broker
+ while (true) {
+ try {
+ broker.addConnector(brokerURL + port);
+ broker.start();
+ broker.waitUntilStarted();
+ System.setProperty("DefaultBrokerURL", brokerURL + port);
+ break; // got a valid port for the broker
+ } catch (IOException e) {
+ if (e.getCause() instanceof BindException) {
+ port++; // choose the next port and retry
+ } else {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ }
+ brokerLatch.countDown();
+ }
+ }
+ brokerLatch.await(); // all threads must wait for the broker to start and init
+ // deploy colocated UIMA-AS services from provided deployment
+ // descriptors
+ int i = 0;
+ for (String dd : deploymentDescriptors) {
+ // keep the container id so that we can un-deploy it when shutting
+ // down
+ ids[i] = deployService(dd);
+ }
+ initializeUimaAsClient(endpointName);
+/*
+ // Get DDs and also extract scaleout property from DD
+ String[] deploymentDescriptors = getDescriptors(args);
+ // deploy colocated UIMA-AS services from provided deployment
+ // descriptors
+ String[] ids = new String[deploymentDescriptors.length];
+ int i = 0;
+ for (String dd : deploymentDescriptors) {
+ // keep the container id so that we can un-deploy it when shutting
+ // down
+ ids[i] = deployService(dd);
+ }
+ // initialize and start UIMA-AS client. This sends GetMeta request to
+ // deployed top level service and waits for a reply
+ initializeUimaAsClient(endpointName);
+
+ return scaleout;
+*/
+ }
+
+ /**
+ * This method is called via reflection and stops the UIMA-AS service,
+ * the client, and the colocated broker.
+ *
+ * @throws Exception
+ */
+ public void stop() throws Exception {
+ System.out.println("Stopping UIMA_AS Client");
+ try {
+ System.setProperty("dontKill", "true");
+ uimaASClient.stop();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ System.out.println("Stopping Broker");
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ /**
+ * This method is called via reflection and delegates processing to the colocated
+ * UIMA-AS service via synchronous call to sendAndReceive()
+ *
+ * @param xmi - serialized CAS
+ * @throws Exception
+ */
+ public List<Properties> process(Object xmi) throws Exception {
+ CAS cas = uimaASClient.getCAS(); // fetch a new CAS from the client's Cas Pool
+ try {
+ XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+ // deserialize the CAS
+ uimaSerializer.deserializeCasFromXmi((String)xmi, cas, deserSharedData, true,
+ -1);
+ List<AnalysisEnginePerformanceMetrics> perfMetrics = new ArrayList<AnalysisEnginePerformanceMetrics>();
+ // delegate processing to the UIMA-AS service and wait for a reply
+ uimaASClient.sendAndReceiveCAS(cas, perfMetrics);
+ // convert UIMA-AS metrics into properties so that we can return this
+ // data in a format which doesnt require UIMA-AS to digest
+ List<Properties> metricsList = new ArrayList<Properties>();
+ for( AnalysisEnginePerformanceMetrics metrics : perfMetrics ) {
+ Properties p = new Properties();
+ p.setProperty("name", metrics.getName());
+ p.setProperty("uniqueName", metrics.getUniqueName());
+ p.setProperty("analysisTime",String.valueOf(metrics.getAnalysisTime()) );
+ p.setProperty("numProcessed",String.valueOf(metrics.getNumProcessed()) );
+ metricsList.add(p);
+ }
+ return metricsList;
+ } finally {
+ if ( cas != null) {
+ cas.release();
+ }
+ }
+ }
+
+
+ private void initializeUimaAsClient(String endpoint) throws Exception {
+
+ String brokerURL = System.getProperty("DefaultBrokerURL");
+ Map<String, Object> appCtx = new HashMap<String, Object>();
+ appCtx.put(UimaAsynchronousEngine.ServerUri, brokerURL);
+ appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
+ appCtx.put(UimaAsynchronousEngine.CasPoolSize, scaleout);
+ appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
+ appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+ UimaAsTestCallbackListener listener = new UimaAsTestCallbackListener();
+
+ uimaASClient.addStatusCallbackListener(listener);
+ uimaASClient.initialize(appCtx);
+ // blocks until the client initializes
+ waitUntilInitialized();
+
+ }
+
+ private void waitUntilInitialized() throws Exception {
+ synchronized (initializeMonitor) {
+ while (!initialized) {
+ initializeMonitor.wait();
+ }
+ }
+ }
+
+ private String deployService(String aDeploymentDescriptorPath)
+ throws Exception {
+
+ Map<String, Object> appCtx = new HashMap<String, Object>();
+ appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
+ xslTransform.replace('/', FS));
+ appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
+ saxonURL.replace('/', FS));
+ appCtx.put(UimaAsynchronousEngine.CasPoolSize, scaleout);
+
+ String containerId = null;
+ try {
+ // use UIMA-AS client to deploy the service using provided
+ // Deployment Descriptor
+ containerId = uimaASClient
+ .deploy(aDeploymentDescriptorPath, appCtx);
+
+ } catch (Exception e) {
+ // Any problem here should be fatal
+ throw e;
+ }
+ return containerId;
+ }
+ /**
+ * Extract descriptors from arg list. Also extract xsl processor and saxon url.
+ * Parse the DD to fetch scaleout property.
+ *
+ * @param args - java argument list
+ * @return - an array of DDs
+ *
+ * @throws Exception
+ */
+ private String[] getDescriptors(String[] args) throws Exception {
+ UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
+ "UIMA-AS version " + UimaAsVersion.getFullVersionString());
+
+ int nbrOfArgs = args.length;
+ String[] deploymentDescriptors = ArgsParser.getMultipleArg("-d", args);
+ if (deploymentDescriptors.length == 0) {
+ // allow multiple args for one key
+ deploymentDescriptors = ArgsParser.getMultipleArg2("-dd", args);
+ }
+ saxonURL = ArgsParser.getArg("-saxonURL", args);
+ xslTransform = ArgsParser.getArg("-xslt", args);
+ uimaAsDebug = ArgsParser.getArg("-uimaEeDebug", args);
+ endpointName = ArgsParser.getArg("-q", args);
+
+ if (nbrOfArgs < 1
+ || (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform
+ .equals(""))) {
+ printUsageMessage();
+ return null; // Done here
+ }
+ parseDD(deploymentDescriptors[0]);
+ return deploymentDescriptors;
+ }
+ /**
+ * Parses given Deployment Descriptor to extract scaleout
+ *
+ * @param ddPath - path to the DD
+ * @throws Exception
+ */
+ public void parseDD(String ddPath) throws Exception {
+ SAXParserFactory parserFactor = SAXParserFactory.newInstance();
+ SAXParser parser = parserFactor.newSAXParser();
+ SAXHandler handler = new SAXHandler();
+ parser.parse(new File(ddPath), handler);
+
+ }
+
+ class SAXHandler extends DefaultHandler {
+
+ String content = null;
+
+ @Override
+ // Triggered when the start of tag is found.
+ public void startElement(String uri, String localName, String qName,
+ Attributes attributes) throws SAXException {
+ if (qName.equals("inputQueue")) {
+ endpointName = attributes.getValue("endpoint");
+ } else if (qName.equals("scaleout")) {
+ scaleout = Integer.parseInt(attributes
+ .getValue("numberOfInstances"));
+ }
+
+ }
+
+ @Override
+ public void endElement(String uri, String localName, String qName)
+ throws SAXException {
+
+ }
+
+ }
+
+
+ protected void finalize() {
+ System.err.println(this + " finalized");
+ }
+
+ private static void printUsageMessage() {
+ System.out
+ .println(" Arguments to the program are as follows : \n"
+ + "-d path-to-UIMA-Deployment-Descriptor [-d path-to-UIMA-Deployment-Descriptor ...] \n"
+ + "-saxon path-to-saxon.jar \n"
+ + "-q top level service queue name \n"
+ + "-xslt path-to-dd2spring-xslt\n"
+ + " or\n"
+ + "path to Spring XML Configuration File which is the output of running dd2spring\n");
+ }
+
+ protected class UimaAsTestCallbackListener extends
+ UimaAsBaseCallbackListener {
+
+ public void onBeforeProcessCAS(UimaASProcessStatus status,
+ String nodeIP, String pid) {
+ // System.out
+ // .println("runTest: onBeforeProcessCAS() Notification - CAS:"
+ // + status.getCasReferenceId()
+ // + " is being processed on machine:"
+ // + nodeIP
+ // + " by process (PID):" + pid);
+ }
+
+ public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
+ // casSent = status.getCasReferenceId();
+ // System.out
+ // .println("runTest: Received onBeforeMessageSend() Notification With CAS:"
+ // + status.getCasReferenceId());
+ }
+
+ public void onUimaAsServiceExit(EventTrigger cause) {
+ System.out
+ .println("runTest: Received onUimaAsServiceExit() Notification With Cause:"
+ + cause.name());
+ }
+
+ public synchronized void entityProcessComplete(CAS aCAS,
+ EntityProcessStatus aProcessStatus,
+ List<AnalysisEnginePerformanceMetrics> componentMetricsList) {
+ String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
+ .getCasReferenceId();
+
+ // if (aProcessStatus instanceof UimaASProcessStatus) {
+ // if (aProcessStatus.isException()) {
+ // System.out
+ // .println("--------- Got Exception While Processing CAS"
+ // + casReferenceId);
+ // } else {
+ // System.out.println("Client Received Reply - CAS:"
+ // + casReferenceId);
+ // }
+ // }
+ }
+
+ /**
+ * Callback method which is called by Uima EE client when a reply to
+ * process CAS is received. The reply contains either the CAS or an
+ * exception that occurred while processing the CAS.
+ */
+ public synchronized void entityProcessComplete(CAS aCAS,
+ EntityProcessStatus aProcessStatus) {
+ String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
+ .getCasReferenceId();
+
+ // if (aProcessStatus instanceof UimaASProcessStatus) {
+ // if (aProcessStatus.isException()) {
+ // System.out
+ // .println("--------- Got Exception While Processing CAS"
+ // + casReferenceId);
+ // } else {
+ // System.out.println("Client Received Reply - CAS:"
+ // + casReferenceId);
+ // }
+ // }
+
+ }
+
+ /**
+ * Callback method which is called by Uima EE client when the
+ * initialization of the client is completed successfully.
+ */
+ public void initializationComplete(EntityProcessStatus aStatus) {
+ synchronized (initializeMonitor) {
+ initialized = true;
+ initializeMonitor.notifyAll();
+ }
+ }
+
+ /**
+ * Callback method which is called by Uima EE client when a CPC reply is
+ * received OR exception occured while processing CPC request.
+ */
+ public void collectionProcessComplete(EntityProcessStatus aStatus) {
+ }
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1644428&r1=1644427&r2=1644428&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Wed Dec 10 14:26:53 2014
@@ -19,399 +19,430 @@
package org.apache.uima.ducc.user.jp;
-import java.io.File;
import java.io.IOException;
-import java.net.BindException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
-import javax.xml.parsers.SAXParser;
-import javax.xml.parsers.SAXParserFactory;
-
-import org.apache.activemq.broker.BrokerService;
import org.apache.uima.UIMAFramework;
-import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
-import org.apache.uima.aae.UimaAsVersion;
+import org.apache.uima.aae.AsynchAECasManager_impl;
+import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.UimaSerializer;
-import org.apache.uima.aae.client.UimaASProcessStatus;
-import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
-import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
-import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.analysis_engine.AnalysisEngine;
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
-import org.apache.uima.collection.EntityProcessStatus;
+import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
+import org.apache.uima.ducc.user.jp.uima.UimaAnalysisEngineInstancePoolWithThreadAffinity;
+import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceInitializationException;
-import org.apache.uima.util.Level;
-import org.xml.sax.Attributes;
-import org.xml.sax.SAXException;
-import org.xml.sax.helpers.DefaultHandler;
-
-public class UimaProcessContainer {
- String endpointName;
- protected int scaleout;
- String saxonURL = null;
- String xslTransform = null;
- String uimaAsDebug = null;
- static final BaseUIMAAsynchronousEngine_impl uimaASClient = new BaseUIMAAsynchronousEngine_impl();
- protected Object initializeMonitor = new Object();
- public volatile boolean initialized = false;
- private static final Class CLASS_NAME = UimaProcessContainer.class;
- private static final char FS = System.getProperty("file.separator").charAt(
- 0);
- public static BrokerService broker = null;
+import org.apache.uima.resource.ResourceManager;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resource.metadata.Import;
+import org.apache.uima.resource.metadata.impl.Import_impl;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.factory.impl.CasPoolImpl;
+import org.apache.uima.resourceSpecifier.impl.CasPoolTypeImpl;
+import org.apache.uima.util.CasPool;
+import org.apache.uima.util.InvalidXMLException;
+import org.apache.uima.util.XMLInputSource;
+import org.apache.xmlbeans.SchemaType;
+
+public class UimaProcessContainer implements IProcessContainer {
+ public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
private UimaSerializer uimaSerializer = new UimaSerializer();
- public int deploy(String[] args) throws Exception {
- System.out.println("UIMA-AS Version::"+UimaAsVersion.getFullVersionString());
+ Semaphore sharedInitSemaphore = new Semaphore(1);
+ // this map enforces thread affinity to specific thread. Needed to make
+ // sure that a thread used to initialized the AE is used to call process().
+ // Some AEs depend on ThreadLocal storage.
+ UimaAnalysisEngineInstancePoolWithThreadAffinity instanceMap = new UimaAnalysisEngineInstancePoolWithThreadAffinity();
+ AnalysisEngineMetaData analysisEngineMetadata;
+ private static CasPool casPool = null;
+ /** Class and Method handles for reflection */
+// private static Class<?> mbeanServerClass;
+//
+// private static Class<?> objectNameClass;
+//
+// private static Constructor<?> objectNameConstructor;
+//
+// private static Method isRegistered;
+//
+// private static Method registerMBean;
+//
+// private static Method unregisterMBean;
+//
+// /**
+// * Set to true if we can find the required JMX classes and methods
+// */
+ private static boolean jmxAvailable;
+//
+ AtomicInteger counter = new AtomicInteger();
+ private int scaleout=1;
+ private String analysisEngineDescriptor=null;
+ private static CountDownLatch latch = new CountDownLatch(1);
+ /**
+ * The platform MBean server if one is available (Java 1.5 only)
+ */
+ private static Object platformMBeanServer;
+
+ /** Get class/method handles */
+ static {
+ try {
+ Class<?> mbeanServerClass = Class.forName("javax.management.MBeanServer");
+ Class<?> objectNameClass = Class.forName("javax.management.ObjectName");
+ Constructor<?> objectNameConstructor = objectNameClass.getConstructor(new Class[] { String.class });
+ Method isRegistered = mbeanServerClass.getMethod("isRegistered", new Class[] { objectNameClass });
+ Method registerMBean = mbeanServerClass.getMethod("registerMBean", new Class[] { Object.class,
+ objectNameClass });
+ Method unregisterMBean = mbeanServerClass.getMethod("unregisterMBean",
+ new Class[] { objectNameClass });
+ jmxAvailable = true;
+ } catch (ClassNotFoundException e) {
+ // JMX not available
+ jmxAvailable = false;
+ } catch (NoSuchMethodException e) {
+ // JMX not available
+ jmxAvailable = false;
+ }
+
+ // try to get platform MBean Server (Java 1.5 only)
+ try {
+ Class<?> managementFactory = Class.forName("java.lang.management.ManagementFactory");
+ Method getPlatformMBeanServer = managementFactory.getMethod("getPlatformMBeanServer",
+ new Class[0]);
+ platformMBeanServer = getPlatformMBeanServer.invoke(null, (Object[]) null);
+ } catch (Exception e) {
+ platformMBeanServer = null;
+ }
+ }
+ public int initialize(String[] args ) throws Exception {
+ analysisEngineDescriptor = ArgsParser.getArg("-aed", args);
+ scaleout = Integer.valueOf(ArgsParser.getArg("-t", args));
+ return scaleout;
+ }
+ public void deploy() throws Exception {
+
+// String jmxName = "org.apache.uima:type=ee.jms.services,s=" + getComponentName() + " Uima EE Service,";
+
+ ResourceSpecifier rSpecifier = null;
+ HashMap<String,Object> paramsMap =
+ new HashMap<String,Object>();
+ paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, UIMAFramework.newDefaultResourceManager());
+ paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, platformMBeanServer);
- broker = new BrokerService();
- broker.setDedicatedTaskRunner(false);
- broker.setPersistent(false);
- int port = 61626;
- String brokerURL = "tcp://localhost:";
- while (true) {
- try {
- broker.addConnector(brokerURL + port);
- broker.start();
- broker.waitUntilStarted();
- System.setProperty("DefaultBrokerURL", brokerURL + port);
- break;
- } catch (IOException e) {
- if (e.getCause() instanceof BindException) {
- port++;
- } else {
- e.printStackTrace();
- break;
+ try {
+ // Acquire single-permit semaphore to serialize instantiation of
+ // AEs.
+ // This is done to control access to non-thread safe structures in
+ // the
+ // core. The sharedInitSemaphore is a static and is shared by all
+ // instances
+ // of this class.
+ System.out.println("Available Permits:"+sharedInitSemaphore.availablePermits());
+ sharedInitSemaphore.acquire();
+ // Parse the descriptor in the calling thread.
+ rSpecifier = UimaClassFactory
+ .produceResourceSpecifier(analysisEngineDescriptor);
+ AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier,
+ paramsMap);
+
+ instanceMap.checkin(ae);
+ if (instanceMap.size() == scaleout) {
+ try {
+ analysisEngineMetadata = ae.getAnalysisEngineMetaData();
+ casPool = new CasPool(scaleout, analysisEngineMetadata,
+ UIMAFramework.newDefaultResourceManager());
+ latch.countDown();
+ } catch (Exception e) {
+ throw new ResourceInitializationException(e);
}
-
}
- }
- String[] deploymentDescriptors = initialize(args);
- // deploy colocated UIMA-AS services from provided deployment
- // descriptors
- String[] ids = new String[deploymentDescriptors.length];
- int i = 0;
- for (String dd : deploymentDescriptors) {
- // keep the container id so that we can un-deploy it when shutting
- // down
- ids[i] = deployService(dd);
- }
- // initialize and start UIMA-AS client. This sends GetMeta request to
- // deployed top level service and waits for a reply
- initializeUimaAsClient(endpointName);
- return scaleout;
- }
-
- public void stop() throws Exception {
- System.out.println("Stopping UIMA_AS Client");
- try {
- System.setProperty("dontKill", "true");
- uimaASClient.stop();
-
} catch (Exception e) {
- e.printStackTrace();
+ latch.countDown();
+ } finally {
+ sharedInitSemaphore.release();
+
}
- System.out.println("Stopping Broker");
- broker.stop();
- broker.waitUntilStopped();
+
}
- public void initializeUimaAsClient(String endpoint) throws Exception {
-
- String brokerURL = System.getProperty("DefaultBrokerURL");
- Map<String, Object> appCtx = new HashMap<String, Object>();
- appCtx.put(UimaAsynchronousEngine.ServerUri, brokerURL);
- appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
- appCtx.put(UimaAsynchronousEngine.CasPoolSize, scaleout);
- appCtx.put(UimaAsynchronousEngine.Timeout, 0);
- appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
- appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
- UimaAsTestCallbackListener listener = new UimaAsTestCallbackListener();
-
- uimaASClient.addStatusCallbackListener(listener);
- uimaASClient.initialize(appCtx);
-
- waitUntilInitialized();
-
- }
+ public void stop() throws Exception {
- protected void waitUntilInitialized() throws Exception {
- synchronized (initializeMonitor) {
- while (!initialized) {
- initializeMonitor.wait();
- }
- }
}
- protected String deployService(String aDeploymentDescriptorPath)
- throws Exception {
-
- Map<String, Object> appCtx = new HashMap<String, Object>();
- appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
- xslTransform.replace('/', FS));
- appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
- saxonURL.replace('/', FS));
- String containerId = null;
+ public List<Properties> process(Object xmi) throws Exception {
+ AnalysisEngine ae = null;
+ latch.await();
+ CAS cas = casPool.getCas();
+ int num = counter.incrementAndGet();
try {
- containerId = uimaASClient
- .deploy(aDeploymentDescriptorPath, appCtx);
-
- } catch (ResourceInitializationException e) {
-
- System.out.println(">>>>>>>>>>> Exception ---:"
- + e.getClass().getName());
- } catch (Exception e) {
- System.out.println(">>>>>>>>>>> runTest: Exception:"
- + e.getClass().getName());
- throw e;
- }
- return containerId;
- }
-
- public void process(Object xmi) throws Exception {
- CAS cas = uimaASClient.getCAS();
- XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
-
- uimaSerializer.deserializeCasFromXmi((String)xmi, cas, deserSharedData, true,
- -1);
-
- uimaASClient.sendAndReceiveCAS(cas);
- cas.release();
- }
+ XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+ // deserialize the CAS
+ uimaSerializer.deserializeCasFromXmi((String) xmi, cas,
+ deserSharedData, true, -1);
+ // delegate processing to the UIMA-AS service and wait for a reply
+ ae = instanceMap.checkout();
+
+// List<AnalysisEnginePerformanceMetrics> beforeAnalysis = getMetrics(ae);
+ ae.process(cas);
+// List<AnalysisEnginePerformanceMetrics> afterAnalysis = getMetrics(ae);
+ // get the delta
+// List<AnalysisEnginePerformanceMetrics> casMetrics = getAEMetricsForCAS(
+// afterAnalysis, beforeAnalysis);
+
+ // convert UIMA-AS metrics into properties so that we can return
+ // this
+ // data in a format which doesnt require UIMA-AS to digest
+ List<Properties> metricsList = new ArrayList<Properties>();
+ /*
+ for (AnalysisEnginePerformanceMetrics metrics : casMetrics) {
+ Properties p = new Properties();
+ p.setProperty("name", metrics.getName());
+ p.setProperty("uniqueName", metrics.getUniqueName());
+ p.setProperty("analysisTime",
+ String.valueOf(metrics.getAnalysisTime()));
+ p.setProperty("numProcessed",
+ String.valueOf(metrics.getNumProcessed()));
+ metricsList.add(p);
+ }
+ */
+ System.out.println("Thread:"+Thread.currentThread().getId()+" Processed "+num+" CASes");
+ return metricsList;
+ } finally {
+ if (ae != null) {
+ instanceMap.checkin(ae);
+ }
+ if (cas != null) {
+ casPool.releaseCas(cas);
+// cas.release();
+ }
- public String[] initialize(String[] args) throws Exception {
- UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
- "UIMA-AS version " + UimaAsVersion.getFullVersionString());
-
- int nbrOfArgs = args.length;
- String[] deploymentDescriptors = getMultipleArg("-d", args);
- if (deploymentDescriptors.length == 0) {
- // allow multiple args for one key
- deploymentDescriptors = getMultipleArg2("-dd", args);
}
- saxonURL = getArg("-saxonURL", args);
- xslTransform = getArg("-xslt", args);
- uimaAsDebug = getArg("-uimaEeDebug", args);
- endpointName = getArg("-q", args);
-
- if (nbrOfArgs < 1
- || (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform
- .equals(""))) {
- printUsageMessage();
- return null; // Done here
- }
- parseDD(deploymentDescriptors[0]);
- return deploymentDescriptors;
}
- public void parseDD(String ddPath) throws Exception {
- SAXParserFactory parserFactor = SAXParserFactory.newInstance();
- SAXParser parser = parserFactor.newSAXParser();
- SAXHandler handler = new SAXHandler();
- parser.parse(new File(ddPath), handler);
-
- }
-
- class SAXHandler extends DefaultHandler {
-
- String content = null;
+ private List<AnalysisEnginePerformanceMetrics> getMetrics(AnalysisEngine ae)
+ throws Exception {
+ List<AnalysisEnginePerformanceMetrics> analysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
+ synchronized(UimaProcessContainer.class) {
+ // Fetch AE's management information that includes per component
+ // performance stats
+ // These stats are internally maintained in a Map. If the AE is an
+ // aggregate
+ // the Map will contain AnalysisEngineManagement instance for each AE.
+ AnalysisEngineManagement aem = ae.getManagementInterface();
+ if (aem.getComponents().size() > 0) {
+ // Flatten the hierarchy by recursively (if this AE is an aggregate)
+ // extracting
+ // primitive AE's AnalysisEngineManagement instance and placing it
+ // in
+ // afterAnalysisManagementObjects List.
+ getLeafManagementObjects(aem, analysisManagementObjects);
+ // System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
+ // System.out.println("-----------------Simple1:"+aem.getName());
+ } else {
+ String path = produceUniqueName(aem);
+ System.out.println(Thread.currentThread().getId()+" -----------------Unique2:"+aem.getUniqueMBeanName());
+ System.out.println(Thread.currentThread().getId()+" -----------------Simple2:"+aem.getName());
+ System.out.println(Thread.currentThread().getId()+" -----------------Path:"+path);
+ analysisManagementObjects.add(deepCopyMetrics(aem, path));
- @Override
- // Triggered when the start of tag is found.
- public void startElement(String uri, String localName, String qName,
- Attributes attributes) throws SAXException {
- if (qName.equals("inputQueue")) {
- endpointName = attributes.getValue("endpoint");
- } else if (qName.equals("scaleout")) {
- scaleout = Integer.parseInt(attributes
- .getValue("numberOfInstances"));
}
-
- }
-
- @Override
- public void endElement(String uri, String localName, String qName)
- throws SAXException {
-
+
}
-
+ return analysisManagementObjects;
}
- /**
- * scan args for a particular arg, return the following token or the empty
- * string if not found
- *
- * @param id
- * the arg to search for
- * @param args
- * the array of strings
- * @return the following token, or a 0 length string if not found
- */
- private static String getArg(String id, String[] args) {
- for (int i = 0; i < args.length; i++) {
- if (id.equals(args[i]))
- return (i + 1 < args.length) ? args[i + 1] : "";
- }
- return "";
+ private void getLeafManagementObjects(AnalysisEngineManagement aem,
+ List<AnalysisEnginePerformanceMetrics> result) {
+ getLeafManagementObjects(aem, result, "");
}
/**
- * scan args for a particular arg, return the following token(s) or the
- * empty string if not found
+ * Recursively
*
- * @param id
- * the arg to search for
- * @param args
- * the array of strings
- * @return the following token, or a 0 length string array if not found
+ * @param aem
+ * @param result
+ * @param uimaFullyQualifiedAEContext
*/
- private static String[] getMultipleArg(String id, String[] args) {
- String[] retr = {};
- for (int i = 0; i < args.length; i++) {
- if (id.equals(args[i])) {
- String[] temp = new String[retr.length + 1];
- for (int s = 0; s < retr.length; s++) {
- temp[s] = retr[s];
+ private void getLeafManagementObjects(AnalysisEngineManagement aem,
+ List<AnalysisEnginePerformanceMetrics> result,
+ String uimaFullyQualifiedAEContext) {
+
+ if (aem.getComponents().isEmpty()) {
+ // skip Flow Controller
+ if (!aem.getName().equals("Fixed Flow Controller")) {
+ // is this primitive AE delegate in an aggregate. If so the
+ // mbean unique name will have "p0=" string. An examples mbean
+ // name looks like this:
+ // org.apache.uima:type=ee.jms.services,s=Top Level Aggregate
+ // TAE Uima EE Service,p0=Top Level Aggregate TAE
+ // Components,p1=SecondLevelAggregateCM
+ // Components,p2=ThirdLevelAggregateCM
+ // Components,name=Multiplier1
+ if (aem.getUniqueMBeanName().indexOf("p0=") > -1) {
+ // check id the parent aggregate has been scaled up by
+ // looking at the last char in its name. If it is a number
+ // strip it from the name
+ if (Character.isDigit(uimaFullyQualifiedAEContext
+ .charAt(uimaFullyQualifiedAEContext.length() - 1))
+ && uimaFullyQualifiedAEContext.lastIndexOf(" ") > -1) {
+ String indx = uimaFullyQualifiedAEContext
+ .substring(uimaFullyQualifiedAEContext
+ .lastIndexOf(" "));
+ if (indx != null) {
+ int value = -1;
+ try {
+ value = Integer.parseInt(indx.trim());
+ // Prepend "X Components" to the unique name
+ // with X stripped.
+ uimaFullyQualifiedAEContext = value
+ + " Components "
+ + uimaFullyQualifiedAEContext
+ .substring(
+ 0,
+ uimaFullyQualifiedAEContext
+ .lastIndexOf(" "));
+ } catch (NumberFormatException ex) {
+
+ }
+ }
+ }
}
- retr = temp;
- retr[retr.length - 1] = (i + 1 < args.length) ? args[i + 1]
- : null;
+ result.add(deepCopyMetrics(aem, uimaFullyQualifiedAEContext));
+ }
+ } else {
+ for (AnalysisEngineManagement child : (Iterable<AnalysisEngineManagement>) aem
+ .getComponents().values()) {
+ getLeafManagementObjects(child, result, produceUniqueName(aem));
}
}
- return retr;
}
- /**
- * scan args for a particular arg, return the following token(s) or the
- * empty string if not found
- *
- * @param id
- * the arg to search for
- * @param args
- * the array of strings
- * @return the following token, or a 0 length string array if not found
- */
- private static String[] getMultipleArg2(String id, String[] args) {
- String[] retr = {};
- for (int i = 0; i < args.length; i++) {
- if (id.equals(args[i])) {
- int j = 0;
- while ((i + 1 + j < args.length)
- && !args[i + 1 + j].startsWith("-")) {
- String[] temp = new String[retr.length + 1];
- for (int s = 0; s < retr.length; s++) {
- temp[s] = retr[s];
+ private String produceUniqueName(AnalysisEngineManagement aem) {
+ String[] parts = aem.getUniqueMBeanName().split(",");
+ StringBuffer sb = new StringBuffer();
+ for (String part : parts) {
+ int pos;
+ if ((pos = part.indexOf("=")) > -1 && part.startsWith("p")) {
+ String n = part.substring(pos + 1, part.indexOf(" Components"));
+ if (part.startsWith("p0=") && n.indexOf(" ") > -1) {
+ String indx = n.substring(n.lastIndexOf(" "));
+ if (indx != null) {
+ int instanceNumber = -1;
+ try {
+ instanceNumber = Integer.parseInt(indx.trim());
+ sb.append(instanceNumber).append(" Components ");
+ n = n.substring(0, n.lastIndexOf(" "));
+ } catch (NumberFormatException nfe) {
+ }
}
- retr = temp;
- retr[retr.length - 1] = args[i + 1 + j++];
}
- return retr;
+ sb.append("/").append(n.trim());
+ } else if (part.trim().startsWith("name=")) {
+ sb.append("/").append(
+ part.substring(part.trim().indexOf("=") + 1));
}
}
- return retr;
- }
-
- protected void finalize() {
- System.err.println(this + " finalized");
- }
-
- private static void printUsageMessage() {
- System.out
- .println(" Arguments to the program are as follows : \n"
- + "-d path-to-UIMA-Deployment-Descriptor [-d path-to-UIMA-Deployment-Descriptor ...] \n"
- + "-saxon path-to-saxon.jar \n"
- + "-q top level service queue name \n"
- + "-xslt path-to-dd2spring-xslt\n"
- + " or\n"
- + "path to Spring XML Configuration File which is the output of running dd2spring\n");
+ return sb.toString();
}
- protected class UimaAsTestCallbackListener extends
- UimaAsBaseCallbackListener {
+ private AnalysisEnginePerformanceMetrics deepCopyMetrics(
+ AnalysisEngineManagement aem, String uimaFullyQualifiedAEContext) {
+ String index = "";
+
+ // Create a unique name with each AE name is separated with "/". Prepend
+ // "X Components" where
+ // X is a instance number of a scaled AE. Also, strip the X from the AE
+ // name. The instance number
+ // is added to each scaled up component during initialization of the
+ // uima-as. We need to prepend
+ // "X Components" to allow DUCC JD to parse the unique name correctly (
+ // basically for backwards
+ // compatibility.
+ int pos = aem.getUniqueMBeanName().lastIndexOf("name=");
+ if (pos > -1) {
+ // get the name of the component. In case of nested component this
+ // will be the KEY from AE descriptor
+ String tmp = aem.getUniqueMBeanName().substring(pos + 5);
+ // in case this is the top level AE, check if it has been scaled up
+ // by extracting its instance number.For example,
+ // NoOpAnnotator 2.
+ int last = tmp.lastIndexOf(" ");
+ if (last > -1) {
+ // extract instance number
+ index = tmp.substring(last);
+
+ try {
+ // check if the instance number is a number. If not silently
+ // handle the exception.
+ Integer.parseInt(index.trim());
+ System.out.println("deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
+ // strip the instance number from the AE name
+ uimaFullyQualifiedAEContext = uimaFullyQualifiedAEContext
+ .substring(0, last + 1);
+ } catch (NumberFormatException nfe) {
- public void onBeforeProcessCAS(UimaASProcessStatus status,
- String nodeIP, String pid) {
- // System.out
- // .println("runTest: onBeforeProcessCAS() Notification - CAS:"
- // + status.getCasReferenceId()
- // + " is being processed on machine:"
- // + nodeIP
- // + " by process (PID):" + pid);
- }
-
- public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
- // casSent = status.getCasReferenceId();
- // System.out
- // .println("runTest: Received onBeforeMessageSend() Notification With CAS:"
- // + status.getCasReferenceId());
- }
-
- public void onUimaAsServiceExit(EventTrigger cause) {
- System.out
- .println("runTest: Received onUimaAsServiceExit() Notification With Cause:"
- + cause.name());
- }
-
- public synchronized void entityProcessComplete(CAS aCAS,
- EntityProcessStatus aProcessStatus,
- List<AnalysisEnginePerformanceMetrics> componentMetricsList) {
- String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
- .getCasReferenceId();
-
- // if (aProcessStatus instanceof UimaASProcessStatus) {
- // if (aProcessStatus.isException()) {
- // System.out
- // .println("--------- Got Exception While Processing CAS"
- // + casReferenceId);
- // } else {
- // System.out.println("Client Received Reply - CAS:"
- // + casReferenceId);
- // }
- // }
- }
-
- /**
- * Callback method which is called by Uima EE client when a reply to
- * process CAS is received. The reply contains either the CAS or an
- * exception that occurred while processing the CAS.
- */
- public synchronized void entityProcessComplete(CAS aCAS,
- EntityProcessStatus aProcessStatus) {
- String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
- .getCasReferenceId();
-
- // if (aProcessStatus instanceof UimaASProcessStatus) {
- // if (aProcessStatus.isException()) {
- // System.out
- // .println("--------- Got Exception While Processing CAS"
- // + casReferenceId);
- // } else {
- // System.out.println("Client Received Reply - CAS:"
- // + casReferenceId);
- // }
- // }
+ } catch( Exception e) {
+ System.out.println(Thread.currentThread().getId()+" deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
+ }
+ } else {
- }
+ if (!uimaFullyQualifiedAEContext.endsWith(tmp)) {
+ uimaFullyQualifiedAEContext += "/" + tmp;
+ }
+ }
+ }
+ // Primitive AE will not have "X Components" prefix, but it is required
+ // by the DUCC JD to be there. Prepend it to the unique name.
+ if (uimaFullyQualifiedAEContext.indexOf(" Components ") == -1) {
+ uimaFullyQualifiedAEContext = index + " Components "
+ + uimaFullyQualifiedAEContext;
+ }
+ return new AnalysisEnginePerformanceMetrics(aem.getName(),
+ uimaFullyQualifiedAEContext, aem.getAnalysisTime(),
+ aem.getNumberOfCASesProcessed());
+
+
+ }
- /**
- * Callback method which is called by Uima EE client when the
- * initialization of the client is completed successfully.
- */
- public void initializationComplete(EntityProcessStatus aStatus) {
- synchronized (initializeMonitor) {
- initialized = true;
- initializeMonitor.notifyAll();
+ private List<AnalysisEnginePerformanceMetrics> getAEMetricsForCAS(
+ List<AnalysisEnginePerformanceMetrics> afterAnalysisManagementObjects,
+ List<AnalysisEnginePerformanceMetrics> beforeAnalysisManagementObjects)
+ throws Exception {
+ // Create a List to hold per CAS analysisTime and total number of CASes
+ // processed
+ // by each AE. This list will be serialized and sent to the client
+ List<AnalysisEnginePerformanceMetrics> performanceList = new ArrayList<AnalysisEnginePerformanceMetrics>();
+ // Diff the before process() performance metrics with post process
+ // performance
+ // metrics
+ for (AnalysisEnginePerformanceMetrics after : afterAnalysisManagementObjects) {
+ for (AnalysisEnginePerformanceMetrics before : beforeAnalysisManagementObjects) {
+ if (before.getUniqueName().equals(after.getUniqueName())) {
+
+ AnalysisEnginePerformanceMetrics metrics = new AnalysisEnginePerformanceMetrics(
+ after.getName(), after.getUniqueName(),
+ after.getAnalysisTime() - before.getAnalysisTime(),
+ after.getNumProcessed());
+ // System.out.println("********************"+metrics.getUniqueName());
+ // System.out.println("********************"+metrics.getName());
+ performanceList.add(metrics);
+ break;
+ }
}
}
+ return performanceList;
- /**
- * Callback method which is called by Uima EE client when a CPC reply is
- * received OR exception occured while processing CPC request.
- */
- public void collectionProcessComplete(EntityProcessStatus aStatus) {
- }
}
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.ducc.user.jp.iface;
+
+import java.util.List;
+import java.util.Properties;
+
+public interface IProcessContainer {
+ public int initialize(String[] args) throws Exception;
+ public void deploy() throws Exception;
+ public void stop() throws Exception;
+ public List<Properties> process(Object xmi) throws Exception;
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.jp.uima;
+
+public class CasManager {
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.jp.uima;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+import org.apache.uima.analysis_engine.AnalysisEngine;
+
+
+public class UimaAnalysisEngineInstancePoolWithThreadAffinity {
+ private static final Class<?> CLASS_NAME = UimaAnalysisEngineInstancePoolWithThreadAffinity.class;
+
+ private volatile boolean destroyAEInstanceIfFree=false;
+ private Semaphore lock = new Semaphore(1);
+
+ private Map<Long, AnalysisEngine> aeInstanceMap = new HashMap<Long,AnalysisEngine>();
+
+ public int size() {
+ return aeInstanceMap.size();
+ }
+
+ public void checkin(AnalysisEngine anAnalysisEngine) throws Exception {
+ try {
+ lock.acquireUninterruptibly();
+ // Call destroy() on AE on checkin if the process is in quiesce mode
+ if ( destroyAEInstanceIfFree ) {
+ anAnalysisEngine.destroy();
+ } else {
+ aeInstanceMap.put(Thread.currentThread().getId(), anAnalysisEngine);
+ }
+ } catch( Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ lock.release();
+ }
+ }
+
+ public boolean exists() {
+ return aeInstanceMap.containsKey(Thread.currentThread().getId());
+ }
+
+ /**
+ * Pins each process thread to a specific and dedicated AE instance. All AE instances are managed
+ * in a HashMap with thread name as a key. AE instance is not removed from the HashMap before it
+ * is returned to the client.
+ *
+ * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#checkout()
+ **/
+ public AnalysisEngine checkout() throws Exception {
+ try {
+ lock.acquireUninterruptibly();
+ if ( !exists() ) {
+ throw new RuntimeException("AE instance not found in AE pool. Most likely due to service quiescing");
+ }
+ // AEs are instantiated and initialized in the the main thread and placed in the temporary list.
+ // First time in the process() method, each thread will remove AE instance from the temporary
+ // list
+ // and place it in the permanent instanceMap. The key to the instanceMap is the thread name.
+ // Each
+ // thread will always process a CAS using its own and dedicated AE instance.
+ return (AnalysisEngine) aeInstanceMap.remove(Thread.currentThread().getId());
+
+ } catch( Exception e) {
+ throw e;
+ } finally {
+ lock.release();
+ }
+
+
+
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#destroy()
+ */
+ public void destroy() throws Exception {
+ // set the flag so that any AE instance returned from PrimitiveController
+ // will be destroyed.
+ destroyAEInstanceIfFree = true;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java
------------------------------------------------------------------------------
svn:mime-type = text/plain