You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2014/12/10 15:26:54 UTC

svn commit: r1644428 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp: ./ iface/ uima/

Author: cwiklik
Date: Wed Dec 10 14:26:53 2014
New Revision: 1644428

URL: http://svn.apache.org/r1644428
Log:
UIMA-4057 Added support for uima-as and core uima containers

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java   (with props)
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.jp;
+
+public class ArgsParser {
+	/**
+	 * scan args for a particular arg, return the following token or the empty
+	 * string if not found
+	 * 
+	 * @param id
+	 *            the arg to search for
+	 * @param args
+	 *            the array of strings
+	 * @return the following token, or a 0 length string if not found
+	 */
+	public static String getArg(String id, String[] args) {
+		for (int i = 0; i < args.length; i++) {
+			if (id.equals(args[i]))
+				return (i + 1 < args.length) ? args[i + 1] : "";
+		}
+		return "";
+	}
+
+	/**
+	 * scan args for a particular arg, return the following token(s) or the
+	 * empty string if not found
+	 * 
+	 * @param id
+	 *            the arg to search for
+	 * @param args
+	 *            the array of strings
+	 * @return the following token, or a 0 length string array if not found
+	 */
+	public static String[] getMultipleArg(String id, String[] args) {
+		String[] retr = {};
+		for (int i = 0; i < args.length; i++) {
+			if (id.equals(args[i])) {
+				String[] temp = new String[retr.length + 1];
+				for (int s = 0; s < retr.length; s++) {
+					temp[s] = retr[s];
+				}
+				retr = temp;
+				retr[retr.length - 1] = (i + 1 < args.length) ? args[i + 1]
+						: null;
+			}
+		}
+		return retr;
+	}
+
+	/**
+	 * scan args for a particular arg, return the following token(s) or the
+	 * empty string if not found
+	 * 
+	 * @param id
+	 *            the arg to search for
+	 * @param args
+	 *            the array of strings
+	 * @return the following token, or a 0 length string array if not found
+	 */
+	public static String[] getMultipleArg2(String id, String[] args) {
+		String[] retr = {};
+		for (int i = 0; i < args.length; i++) {
+			if (id.equals(args[i])) {
+				int j = 0;
+				while ((i + 1 + j < args.length)
+						&& !args[i + 1 + j].startsWith("-")) {
+					String[] temp = new String[retr.length + 1];
+					for (int s = 0; s < retr.length; s++) {
+						temp[s] = retr[s];
+					}
+					retr = temp;
+					retr[retr.length - 1] = args[i + 1 + j++];
+				}
+				return retr;
+			}
+		}
+		return retr;
+	}
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/ArgsParser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.ducc.user.jp;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UimaAsVersion;
+import org.apache.uima.aae.UimaSerializer;
+import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.collection.EntityProcessStatus;
+import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
+import org.apache.uima.util.Level;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+public class UimaASProcessContainer implements IProcessContainer {
+	String endpointName;
+	protected int scaleout;
+	String saxonURL = null;
+	String xslTransform = null;
+	String uimaAsDebug = null;
+	static final BaseUIMAAsynchronousEngine_impl uimaASClient = new BaseUIMAAsynchronousEngine_impl();
+	private static final CountDownLatch brokerLatch = new CountDownLatch(1);
+	
+	protected Object initializeMonitor = new Object();
+	public volatile boolean initialized = false;
+	private static final Class<?> CLASS_NAME = UimaProcessContainer.class;
+	private static final char FS = System.getProperty("file.separator").charAt(
+			0);
+	public static BrokerService broker = null;
+	private UimaSerializer uimaSerializer = new UimaSerializer();
+    private String[] deploymentDescriptors = null;
+	private String[] ids = null;
+    
+	public int initialize(String[] args) throws Exception {
+		// Get DDs and also extract scaleout property from DD
+		deploymentDescriptors = getDescriptors(args);
+		ids = new String[deploymentDescriptors.length];
+	    
+		return scaleout;
+	}
+
+	/**
+	 * This method is called via reflection and deploys both the colocated AMQ broker
+	 * and UIMA-AS service.
+	 * 
+	 * @param args - command line args
+	 * @return
+	 * @throws Exception
+	 */
+//	public void deploy(String[] args) throws Exception {
+	public void deploy() throws Exception {
+		System.out.println("UIMA-AS Version::"+UimaAsVersion.getFullVersionString());
+		synchronized( UimaASProcessContainer.class) {
+			if ( broker == null ) {
+				broker = new BrokerService();
+				broker.setDedicatedTaskRunner(false);
+				broker.setPersistent(false);
+				int port = 61626;  // try to start the colocated broker with this port first
+				String brokerURL = "tcp://localhost:";
+				// loop until a valid port is found for the broker
+				while (true) {
+					try {
+						broker.addConnector(brokerURL + port);
+						broker.start();
+						broker.waitUntilStarted();
+						System.setProperty("DefaultBrokerURL", brokerURL + port);
+						break;   // got a valid port for the broker
+					} catch (IOException e) {
+						if (e.getCause() instanceof BindException) {
+							port++;   // choose the next port and retry
+						} else {
+							throw new RuntimeException(e);
+						}
+
+					}
+
+				}
+				brokerLatch.countDown();
+			}
+		}
+		brokerLatch.await();   // all threads must wait for the broker to start and init
+		// deploy colocated UIMA-AS services from provided deployment
+		// descriptors
+		int i = 0;
+		for (String dd : deploymentDescriptors) {
+			// keep the container id so that we can un-deploy it when shutting
+			// down
+			ids[i] = deployService(dd);
+		}
+		initializeUimaAsClient(endpointName);
+/*
+		// Get DDs and also extract scaleout property from DD
+		String[] deploymentDescriptors = getDescriptors(args);
+		// deploy colocated UIMA-AS services from provided deployment
+		// descriptors
+		String[] ids = new String[deploymentDescriptors.length];
+		int i = 0;
+		for (String dd : deploymentDescriptors) {
+			// keep the container id so that we can un-deploy it when shutting
+			// down
+			ids[i] = deployService(dd);
+		}
+		// initialize and start UIMA-AS client. This sends GetMeta request to
+		// deployed top level service and waits for a reply
+		initializeUimaAsClient(endpointName);
+	
+		return scaleout;
+*/
+	}
+
+	/** 
+	 * This method is called via reflection and stops the UIMA-AS service,
+	 * the client, and the colocated broker.
+	 * 
+	 * @throws Exception
+	 */
+	public void stop() throws Exception {
+		System.out.println("Stopping UIMA_AS Client");
+		try {
+			System.setProperty("dontKill", "true");
+			uimaASClient.stop();
+
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		System.out.println("Stopping Broker");
+		broker.stop();
+		broker.waitUntilStopped();
+	}
+	/**
+	 * This method is called via reflection and delegates processing to the colocated
+	 * UIMA-AS service via synchronous call to sendAndReceive()
+	 * 
+	 * @param xmi - serialized CAS
+	 * @throws Exception
+	 */
+	public List<Properties> process(Object xmi) throws Exception {
+		CAS cas = uimaASClient.getCAS();   // fetch a new CAS from the client's Cas Pool
+		try {
+			XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+			// deserialize the CAS
+			uimaSerializer.deserializeCasFromXmi((String)xmi, cas, deserSharedData, true,
+					-1);
+			List<AnalysisEnginePerformanceMetrics> perfMetrics = new ArrayList<AnalysisEnginePerformanceMetrics>();
+			// delegate processing to the UIMA-AS service and wait for a reply
+			uimaASClient.sendAndReceiveCAS(cas, perfMetrics);
+			// convert UIMA-AS metrics into properties so that we can return this
+			// data in a format which doesnt require UIMA-AS to digest
+			List<Properties> metricsList = new ArrayList<Properties>(); 
+			for( AnalysisEnginePerformanceMetrics metrics : perfMetrics ) {
+				Properties p = new Properties();
+				p.setProperty("name", metrics.getName());
+				p.setProperty("uniqueName", metrics.getUniqueName());
+				p.setProperty("analysisTime",String.valueOf(metrics.getAnalysisTime()) );
+				p.setProperty("numProcessed",String.valueOf(metrics.getNumProcessed()) );
+				metricsList.add(p);
+			}
+			return metricsList;
+		} finally {
+			if ( cas != null) {
+				cas.release();
+			}
+		}
+	}
+    
+
+	private void initializeUimaAsClient(String endpoint) throws Exception {
+
+		String brokerURL = System.getProperty("DefaultBrokerURL");
+		Map<String, Object> appCtx = new HashMap<String, Object>();
+		appCtx.put(UimaAsynchronousEngine.ServerUri, brokerURL);
+		appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
+		appCtx.put(UimaAsynchronousEngine.CasPoolSize, scaleout);
+		appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+		appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
+		appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+		UimaAsTestCallbackListener listener = new UimaAsTestCallbackListener();
+
+		uimaASClient.addStatusCallbackListener(listener);
+		uimaASClient.initialize(appCtx);
+        // blocks until the client initializes
+		waitUntilInitialized();
+
+	}
+
+	private void waitUntilInitialized() throws Exception {
+		synchronized (initializeMonitor) {
+			while (!initialized) {
+				initializeMonitor.wait();
+			}
+		}
+	}
+
+	private String deployService(String aDeploymentDescriptorPath)
+			throws Exception {
+
+		Map<String, Object> appCtx = new HashMap<String, Object>();
+		appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
+				xslTransform.replace('/', FS));
+		appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
+				saxonURL.replace('/', FS));
+		appCtx.put(UimaAsynchronousEngine.CasPoolSize, scaleout);
+		
+		String containerId = null;
+		try {
+			// use UIMA-AS client to deploy the service using provided
+			// Deployment Descriptor
+			containerId = uimaASClient
+					.deploy(aDeploymentDescriptorPath, appCtx);
+
+		} catch (Exception e) {
+			// Any problem here should be fatal
+			throw e;
+		}
+		return containerId;
+	}
+	/**
+	 * Extract descriptors from arg list. Also extract xsl processor and saxon url.
+	 * Parse the DD to fetch scaleout property.
+	 * 
+	 * @param args - java argument list 
+	 * @return - an array of DDs
+	 * 
+	 * @throws Exception
+	 */
+	private String[] getDescriptors(String[] args) throws Exception {
+		UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
+				"UIMA-AS version " + UimaAsVersion.getFullVersionString());
+
+		int nbrOfArgs = args.length;
+		String[] deploymentDescriptors = ArgsParser.getMultipleArg("-d", args);
+		if (deploymentDescriptors.length == 0) {
+			// allow multiple args for one key
+			deploymentDescriptors = ArgsParser.getMultipleArg2("-dd", args);
+		}
+		saxonURL = ArgsParser.getArg("-saxonURL", args);
+		xslTransform = ArgsParser.getArg("-xslt", args);
+		uimaAsDebug = ArgsParser.getArg("-uimaEeDebug", args);
+		endpointName = ArgsParser.getArg("-q", args);
+
+		if (nbrOfArgs < 1
+				|| (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform
+						.equals(""))) {
+			printUsageMessage();
+			return null; // Done here
+		}
+		parseDD(deploymentDescriptors[0]);
+		return deploymentDescriptors;
+	}
+    /**
+     * Parses given Deployment Descriptor to extract scaleout
+     * 
+     * @param ddPath - path to the DD
+     * @throws Exception
+     */
+	public void parseDD(String ddPath) throws Exception {
+		SAXParserFactory parserFactor = SAXParserFactory.newInstance();
+		SAXParser parser = parserFactor.newSAXParser();
+		SAXHandler handler = new SAXHandler();
+		parser.parse(new File(ddPath), handler);
+
+	}
+
+	class SAXHandler extends DefaultHandler {
+
+		String content = null;
+
+		@Override
+		// Triggered when the start of tag is found.
+		public void startElement(String uri, String localName, String qName,
+				Attributes attributes) throws SAXException {
+			if (qName.equals("inputQueue")) {
+				endpointName = attributes.getValue("endpoint");
+			} else if (qName.equals("scaleout")) {
+				scaleout = Integer.parseInt(attributes
+						.getValue("numberOfInstances"));
+			}
+
+		}
+
+		@Override
+		public void endElement(String uri, String localName, String qName)
+				throws SAXException {
+
+		}
+
+	}
+
+
+	protected void finalize() {
+		System.err.println(this + " finalized");
+	}
+
+	private static void printUsageMessage() {
+		System.out
+				.println(" Arguments to the program are as follows : \n"
+						+ "-d path-to-UIMA-Deployment-Descriptor [-d path-to-UIMA-Deployment-Descriptor ...] \n"
+						+ "-saxon path-to-saxon.jar \n"
+						+ "-q top level service queue name \n"
+						+ "-xslt path-to-dd2spring-xslt\n"
+						+ "   or\n"
+						+ "path to Spring XML Configuration File which is the output of running dd2spring\n");
+	}
+
+	protected class UimaAsTestCallbackListener extends
+			UimaAsBaseCallbackListener {
+
+		public void onBeforeProcessCAS(UimaASProcessStatus status,
+				String nodeIP, String pid) {
+			// System.out
+			// .println("runTest: onBeforeProcessCAS() Notification - CAS:"
+			// + status.getCasReferenceId()
+			// + " is being processed on machine:"
+			// + nodeIP
+			// + " by process (PID):" + pid);
+		}
+
+		public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
+			// casSent = status.getCasReferenceId();
+			// System.out
+			// .println("runTest: Received onBeforeMessageSend() Notification With CAS:"
+			// + status.getCasReferenceId());
+		}
+
+		public void onUimaAsServiceExit(EventTrigger cause) {
+			System.out
+					.println("runTest: Received onUimaAsServiceExit() Notification With Cause:"
+							+ cause.name());
+		}
+
+		public synchronized void entityProcessComplete(CAS aCAS,
+				EntityProcessStatus aProcessStatus,
+				List<AnalysisEnginePerformanceMetrics> componentMetricsList) {
+			String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
+					.getCasReferenceId();
+
+			// if (aProcessStatus instanceof UimaASProcessStatus) {
+			// if (aProcessStatus.isException()) {
+			// System.out
+			// .println("--------- Got Exception While Processing CAS"
+			// + casReferenceId);
+			// } else {
+			// System.out.println("Client Received Reply - CAS:"
+			// + casReferenceId);
+			// }
+			// }
+		}
+
+		/**
+		 * Callback method which is called by Uima EE client when a reply to
+		 * process CAS is received. The reply contains either the CAS or an
+		 * exception that occurred while processing the CAS.
+		 */
+		public synchronized void entityProcessComplete(CAS aCAS,
+				EntityProcessStatus aProcessStatus) {
+			String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
+					.getCasReferenceId();
+
+			// if (aProcessStatus instanceof UimaASProcessStatus) {
+			// if (aProcessStatus.isException()) {
+			// System.out
+			// .println("--------- Got Exception While Processing CAS"
+			// + casReferenceId);
+			// } else {
+			// System.out.println("Client Received Reply - CAS:"
+			// + casReferenceId);
+			// }
+			// }
+
+		}
+
+		/**
+		 * Callback method which is called by Uima EE client when the
+		 * initialization of the client is completed successfully.
+		 */
+		public void initializationComplete(EntityProcessStatus aStatus) {
+			synchronized (initializeMonitor) {
+				initialized = true;
+				initializeMonitor.notifyAll();
+			}
+		}
+
+		/**
+		 * Callback method which is called by Uima EE client when a CPC reply is
+		 * received OR exception occured while processing CPC request.
+		 */
+		public void collectionProcessComplete(EntityProcessStatus aStatus) {
+		}
+	}
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaASProcessContainer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1644428&r1=1644427&r2=1644428&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Wed Dec 10 14:26:53 2014
@@ -19,399 +19,430 @@
 
 package org.apache.uima.ducc.user.jp;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.BindException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.xml.parsers.SAXParser;
-import javax.xml.parsers.SAXParserFactory;
-
-import org.apache.activemq.broker.BrokerService;
 import org.apache.uima.UIMAFramework;
-import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
-import org.apache.uima.aae.UimaAsVersion;
+import org.apache.uima.aae.AsynchAECasManager_impl;
+import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.UimaSerializer;
-import org.apache.uima.aae.client.UimaASProcessStatus;
-import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
-import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
-import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.analysis_engine.AnalysisEngine;
+import org.apache.uima.analysis_engine.AnalysisEngineManagement;
+import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
-import org.apache.uima.collection.EntityProcessStatus;
+import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
+import org.apache.uima.ducc.user.jp.uima.UimaAnalysisEngineInstancePoolWithThreadAffinity;
+import org.apache.uima.resource.Resource;
 import org.apache.uima.resource.ResourceInitializationException;
-import org.apache.uima.util.Level;
-import org.xml.sax.Attributes;
-import org.xml.sax.SAXException;
-import org.xml.sax.helpers.DefaultHandler;
-
-public class UimaProcessContainer {
-	String endpointName;
-	protected int scaleout;
-	String saxonURL = null;
-	String xslTransform = null;
-	String uimaAsDebug = null;
-	static final BaseUIMAAsynchronousEngine_impl uimaASClient = new BaseUIMAAsynchronousEngine_impl();
-	protected Object initializeMonitor = new Object();
-	public volatile boolean initialized = false;
-	private static final Class CLASS_NAME = UimaProcessContainer.class;
-	private static final char FS = System.getProperty("file.separator").charAt(
-			0);
-	public static BrokerService broker = null;
+import org.apache.uima.resource.ResourceManager;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resource.metadata.Import;
+import org.apache.uima.resource.metadata.impl.Import_impl;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.factory.impl.CasPoolImpl;
+import org.apache.uima.resourceSpecifier.impl.CasPoolTypeImpl;
+import org.apache.uima.util.CasPool;
+import org.apache.uima.util.InvalidXMLException;
+import org.apache.uima.util.XMLInputSource;
+import org.apache.xmlbeans.SchemaType;
+
+public class UimaProcessContainer implements IProcessContainer {
+	public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
 	private UimaSerializer uimaSerializer = new UimaSerializer();
 
-	public int deploy(String[] args) throws Exception {
-		System.out.println("UIMA-AS Version::"+UimaAsVersion.getFullVersionString());
+	Semaphore sharedInitSemaphore = new Semaphore(1);
+	// this map enforces thread affinity to specific thread. Needed to make
+	// sure that a thread used to initialized the AE is used to call process().
+	// Some AEs depend on ThreadLocal storage.
+	UimaAnalysisEngineInstancePoolWithThreadAffinity instanceMap = new UimaAnalysisEngineInstancePoolWithThreadAffinity();
+	AnalysisEngineMetaData analysisEngineMetadata;
+	private static CasPool casPool = null;
+	 /** Class and Method handles for reflection */
+//	  private static Class<?> mbeanServerClass;
+//
+//	  private static Class<?> objectNameClass;
+//
+//	  private static Constructor<?> objectNameConstructor;
+//
+//	  private static Method isRegistered;
+//
+//	  private static Method registerMBean;
+//
+//	  private static Method unregisterMBean;
+//
+//	  /**
+//	   * Set to true if we can find the required JMX classes and methods
+//	   */
+	  private static boolean jmxAvailable;
+//
+	  AtomicInteger counter = new AtomicInteger();
+	  private int scaleout=1;
+	  private String analysisEngineDescriptor=null;
+	  private static CountDownLatch latch = new CountDownLatch(1);
+	  /**
+	   * The platform MBean server if one is available (Java 1.5 only)
+	   */
+	  private static Object platformMBeanServer;
+
+	  /** Get class/method handles */
+	  static {
+	    try {
+	      Class<?> mbeanServerClass = Class.forName("javax.management.MBeanServer");
+	      Class<?> objectNameClass = Class.forName("javax.management.ObjectName");
+	      Constructor<?> objectNameConstructor = objectNameClass.getConstructor(new Class[] { String.class });
+	      Method isRegistered = mbeanServerClass.getMethod("isRegistered", new Class[] { objectNameClass });
+	      Method registerMBean = mbeanServerClass.getMethod("registerMBean", new Class[] { Object.class,
+	          objectNameClass });
+	      Method unregisterMBean = mbeanServerClass.getMethod("unregisterMBean",
+	              new Class[] { objectNameClass });
+	      jmxAvailable = true;
+	    } catch (ClassNotFoundException e) {
+	      // JMX not available
+	      jmxAvailable = false;
+	    } catch (NoSuchMethodException e) {
+	      // JMX not available
+	      jmxAvailable = false;
+	    }
+
+	    // try to get platform MBean Server (Java 1.5 only)
+	    try {
+	      Class<?> managementFactory = Class.forName("java.lang.management.ManagementFactory");
+	      Method getPlatformMBeanServer = managementFactory.getMethod("getPlatformMBeanServer",
+	              new Class[0]);
+	      platformMBeanServer = getPlatformMBeanServer.invoke(null, (Object[]) null);
+	    } catch (Exception e) {
+	      platformMBeanServer = null;
+	    }
+	  }
+	  public int initialize(String[] args ) throws Exception {
+			analysisEngineDescriptor = ArgsParser.getArg("-aed", args);
+			scaleout = Integer.valueOf(ArgsParser.getArg("-t", args));
+         return scaleout;		  
+	  }
+	public void deploy() throws Exception {
+	    
+//		String jmxName = "org.apache.uima:type=ee.jms.services,s=" + getComponentName() + " Uima EE Service,";
+
+		ResourceSpecifier rSpecifier = null;
+	    HashMap<String,Object> paramsMap = 
+				new HashMap<String,Object>();
+	     paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, UIMAFramework.newDefaultResourceManager());
+	     paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, platformMBeanServer);
 
-		broker = new BrokerService();
-		broker.setDedicatedTaskRunner(false);
-		broker.setPersistent(false);
-		int port = 61626;
-		String brokerURL = "tcp://localhost:";
-		while (true) {
-			try {
-				broker.addConnector(brokerURL + port);
-				broker.start();
-				broker.waitUntilStarted();
-				System.setProperty("DefaultBrokerURL", brokerURL + port);
-				break;
-			} catch (IOException e) {
-				if (e.getCause() instanceof BindException) {
-					port++;
-				} else {
-					e.printStackTrace();
-					break;
+		try {
+			// Acquire single-permit semaphore to serialize instantiation of
+			// AEs.
+			// This is done to control access to non-thread safe structures in
+			// the
+			// core. The sharedInitSemaphore is a static and is shared by all
+			// instances
+			// of this class.
+			System.out.println("Available Permits:"+sharedInitSemaphore.availablePermits());
+			sharedInitSemaphore.acquire();
+			// Parse the descriptor in the calling thread.
+			rSpecifier = UimaClassFactory
+					.produceResourceSpecifier(analysisEngineDescriptor);
+			AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier,
+					paramsMap);
+
+			instanceMap.checkin(ae);
+			if (instanceMap.size() == scaleout) {
+				try {
+					analysisEngineMetadata = ae.getAnalysisEngineMetaData();
+					casPool = new CasPool(scaleout, analysisEngineMetadata,
+							UIMAFramework.newDefaultResourceManager());
+					latch.countDown();
+				} catch (Exception e) {
+					throw new ResourceInitializationException(e);
 				}
-
 			}
 
-		}
-		String[] deploymentDescriptors = initialize(args);
-		// deploy colocated UIMA-AS services from provided deployment
-		// descriptors
-		String[] ids = new String[deploymentDescriptors.length];
-		int i = 0;
-		for (String dd : deploymentDescriptors) {
-			// keep the container id so that we can un-deploy it when shutting
-			// down
-			ids[i] = deployService(dd);
-		}
-		// initialize and start UIMA-AS client. This sends GetMeta request to
-		// deployed top level service and waits for a reply
-		initializeUimaAsClient(endpointName);
-		return scaleout;
-	}
-
-	public void stop() throws Exception {
-		System.out.println("Stopping UIMA_AS Client");
-		try {
-			System.setProperty("dontKill", "true");
-			uimaASClient.stop();
-
 		} catch (Exception e) {
-			e.printStackTrace();
+			latch.countDown();
+		} finally {
+			sharedInitSemaphore.release();
+			
 		}
-		System.out.println("Stopping Broker");
-		broker.stop();
-		broker.waitUntilStopped();
+		
 	}
 
-	public void initializeUimaAsClient(String endpoint) throws Exception {
-
-		String brokerURL = System.getProperty("DefaultBrokerURL");
-		Map<String, Object> appCtx = new HashMap<String, Object>();
-		appCtx.put(UimaAsynchronousEngine.ServerUri, brokerURL);
-		appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
-		appCtx.put(UimaAsynchronousEngine.CasPoolSize, scaleout);
-		appCtx.put(UimaAsynchronousEngine.Timeout, 0);
-		appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
-		appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
-		UimaAsTestCallbackListener listener = new UimaAsTestCallbackListener();
-
-		uimaASClient.addStatusCallbackListener(listener);
-		uimaASClient.initialize(appCtx);
-
-		waitUntilInitialized();
-
-	}
+	public void stop() throws Exception {
 
-	protected void waitUntilInitialized() throws Exception {
-		synchronized (initializeMonitor) {
-			while (!initialized) {
-				initializeMonitor.wait();
-			}
-		}
 	}
 
-	protected String deployService(String aDeploymentDescriptorPath)
-			throws Exception {
-
-		Map<String, Object> appCtx = new HashMap<String, Object>();
-		appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
-				xslTransform.replace('/', FS));
-		appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
-				saxonURL.replace('/', FS));
-		String containerId = null;
+	public List<Properties> process(Object xmi) throws Exception {
+		AnalysisEngine ae = null;
+		latch.await();
+		CAS cas = casPool.getCas();
+		int num = counter.incrementAndGet();
 		try {
-			containerId = uimaASClient
-					.deploy(aDeploymentDescriptorPath, appCtx);
-
-		} catch (ResourceInitializationException e) {
-
-			System.out.println(">>>>>>>>>>> Exception ---:"
-					+ e.getClass().getName());
-		} catch (Exception e) {
-			System.out.println(">>>>>>>>>>> runTest: Exception:"
-					+ e.getClass().getName());
-			throw e;
-		}
-		return containerId;
-	}
-
-	public void process(Object xmi) throws Exception {
-		CAS cas = uimaASClient.getCAS();
-		XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
-
-		uimaSerializer.deserializeCasFromXmi((String)xmi, cas, deserSharedData, true,
-				-1);
-
-		uimaASClient.sendAndReceiveCAS(cas);
-		cas.release();
-	}
+			XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+			// deserialize the CAS
+			uimaSerializer.deserializeCasFromXmi((String) xmi, cas,
+					deserSharedData, true, -1);
+			// delegate processing to the UIMA-AS service and wait for a reply
+			ae = instanceMap.checkout();
+
+//			List<AnalysisEnginePerformanceMetrics> beforeAnalysis = getMetrics(ae);
+			ae.process(cas);
+//			List<AnalysisEnginePerformanceMetrics> afterAnalysis = getMetrics(ae);
+			// get the delta
+//			List<AnalysisEnginePerformanceMetrics> casMetrics = getAEMetricsForCAS(
+//					 afterAnalysis, beforeAnalysis);
+
+			// convert UIMA-AS metrics into properties so that we can return
+			// this
+			// data in a format which doesnt require UIMA-AS to digest
+			List<Properties> metricsList = new ArrayList<Properties>();
+			/*
+			for (AnalysisEnginePerformanceMetrics metrics : casMetrics) {
+				Properties p = new Properties();
+				p.setProperty("name", metrics.getName());
+				p.setProperty("uniqueName", metrics.getUniqueName());
+				p.setProperty("analysisTime",
+						String.valueOf(metrics.getAnalysisTime()));
+				p.setProperty("numProcessed",
+						String.valueOf(metrics.getNumProcessed()));
+				metricsList.add(p);
+			}
+			*/
+			System.out.println("Thread:"+Thread.currentThread().getId()+" Processed "+num+" CASes");
+			return metricsList;
+		} finally {
+			if (ae != null) {
+				instanceMap.checkin(ae);
+			}
+			if (cas != null) {
+				casPool.releaseCas(cas);
+//				cas.release();
+			}
 
-	public String[] initialize(String[] args) throws Exception {
-		UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
-				"UIMA-AS version " + UimaAsVersion.getFullVersionString());
-
-		int nbrOfArgs = args.length;
-		String[] deploymentDescriptors = getMultipleArg("-d", args);
-		if (deploymentDescriptors.length == 0) {
-			// allow multiple args for one key
-			deploymentDescriptors = getMultipleArg2("-dd", args);
 		}
-		saxonURL = getArg("-saxonURL", args);
-		xslTransform = getArg("-xslt", args);
-		uimaAsDebug = getArg("-uimaEeDebug", args);
-		endpointName = getArg("-q", args);
-
-		if (nbrOfArgs < 1
-				|| (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform
-						.equals(""))) {
-			printUsageMessage();
-			return null; // Done here
-		}
-		parseDD(deploymentDescriptors[0]);
-		return deploymentDescriptors;
 	}
 
-	public void parseDD(String ddPath) throws Exception {
-		SAXParserFactory parserFactor = SAXParserFactory.newInstance();
-		SAXParser parser = parserFactor.newSAXParser();
-		SAXHandler handler = new SAXHandler();
-		parser.parse(new File(ddPath), handler);
-
-	}
-
-	class SAXHandler extends DefaultHandler {
-
-		String content = null;
+	private List<AnalysisEnginePerformanceMetrics> getMetrics(AnalysisEngine ae)
+			throws Exception {
+		List<AnalysisEnginePerformanceMetrics> analysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
+		synchronized(UimaProcessContainer.class) {
+			// Fetch AE's management information that includes per component
+			// performance stats
+			// These stats are internally maintained in a Map. If the AE is an
+			// aggregate
+			// the Map will contain AnalysisEngineManagement instance for each AE.
+			AnalysisEngineManagement aem = ae.getManagementInterface();
+			if (aem.getComponents().size() > 0) {
+				// Flatten the hierarchy by recursively (if this AE is an aggregate)
+				// extracting
+				// primitive AE's AnalysisEngineManagement instance and placing it
+				// in
+				// afterAnalysisManagementObjects List.
+				getLeafManagementObjects(aem, analysisManagementObjects);
+				// System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
+				// System.out.println("-----------------Simple1:"+aem.getName());
+			} else {
+				String path = produceUniqueName(aem);
+				 System.out.println(Thread.currentThread().getId()+" -----------------Unique2:"+aem.getUniqueMBeanName());
+				 System.out.println(Thread.currentThread().getId()+" -----------------Simple2:"+aem.getName());
+				 System.out.println(Thread.currentThread().getId()+" -----------------Path:"+path);
+				analysisManagementObjects.add(deepCopyMetrics(aem, path));
 
-		@Override
-		// Triggered when the start of tag is found.
-		public void startElement(String uri, String localName, String qName,
-				Attributes attributes) throws SAXException {
-			if (qName.equals("inputQueue")) {
-				endpointName = attributes.getValue("endpoint");
-			} else if (qName.equals("scaleout")) {
-				scaleout = Integer.parseInt(attributes
-						.getValue("numberOfInstances"));
 			}
-
-		}
-
-		@Override
-		public void endElement(String uri, String localName, String qName)
-				throws SAXException {
-
+			
 		}
-
+		return analysisManagementObjects;
 	}
 
-	/**
-	 * scan args for a particular arg, return the following token or the empty
-	 * string if not found
-	 * 
-	 * @param id
-	 *            the arg to search for
-	 * @param args
-	 *            the array of strings
-	 * @return the following token, or a 0 length string if not found
-	 */
-	private static String getArg(String id, String[] args) {
-		for (int i = 0; i < args.length; i++) {
-			if (id.equals(args[i]))
-				return (i + 1 < args.length) ? args[i + 1] : "";
-		}
-		return "";
+	private void getLeafManagementObjects(AnalysisEngineManagement aem,
+			List<AnalysisEnginePerformanceMetrics> result) {
+		getLeafManagementObjects(aem, result, "");
 	}
 
 	/**
-	 * scan args for a particular arg, return the following token(s) or the
-	 * empty string if not found
+	 * Recursively
 	 * 
-	 * @param id
-	 *            the arg to search for
-	 * @param args
-	 *            the array of strings
-	 * @return the following token, or a 0 length string array if not found
+	 * @param aem
+	 * @param result
+	 * @param uimaFullyQualifiedAEContext
 	 */
-	private static String[] getMultipleArg(String id, String[] args) {
-		String[] retr = {};
-		for (int i = 0; i < args.length; i++) {
-			if (id.equals(args[i])) {
-				String[] temp = new String[retr.length + 1];
-				for (int s = 0; s < retr.length; s++) {
-					temp[s] = retr[s];
+	private void getLeafManagementObjects(AnalysisEngineManagement aem,
+			List<AnalysisEnginePerformanceMetrics> result,
+			String uimaFullyQualifiedAEContext) {
+
+		if (aem.getComponents().isEmpty()) {
+			// skip Flow Controller
+			if (!aem.getName().equals("Fixed Flow Controller")) {
+				// is this primitive AE delegate in an aggregate. If so the
+				// mbean unique name will have "p0=" string. An examples mbean
+				// name looks like this:
+				// org.apache.uima:type=ee.jms.services,s=Top Level Aggregate
+				// TAE Uima EE Service,p0=Top Level Aggregate TAE
+				// Components,p1=SecondLevelAggregateCM
+				// Components,p2=ThirdLevelAggregateCM
+				// Components,name=Multiplier1
+				if (aem.getUniqueMBeanName().indexOf("p0=") > -1) {
+					// check id the parent aggregate has been scaled up by
+					// looking at the last char in its name. If it is a number
+					// strip it from the name
+					if (Character.isDigit(uimaFullyQualifiedAEContext
+							.charAt(uimaFullyQualifiedAEContext.length() - 1))
+							&& uimaFullyQualifiedAEContext.lastIndexOf(" ") > -1) {
+						String indx = uimaFullyQualifiedAEContext
+								.substring(uimaFullyQualifiedAEContext
+										.lastIndexOf(" "));
+						if (indx != null) {
+							int value = -1;
+							try {
+								value = Integer.parseInt(indx.trim());
+								// Prepend "X Components" to the unique name
+								// with X stripped.
+								uimaFullyQualifiedAEContext = value
+										+ " Components "
+										+ uimaFullyQualifiedAEContext
+												.substring(
+														0,
+														uimaFullyQualifiedAEContext
+																.lastIndexOf(" "));
+							} catch (NumberFormatException ex) {
+
+							}
+						}
+					}
 				}
-				retr = temp;
-				retr[retr.length - 1] = (i + 1 < args.length) ? args[i + 1]
-						: null;
+				result.add(deepCopyMetrics(aem, uimaFullyQualifiedAEContext));
+			}
+		} else {
+			for (AnalysisEngineManagement child : (Iterable<AnalysisEngineManagement>) aem
+					.getComponents().values()) {
+				getLeafManagementObjects(child, result, produceUniqueName(aem));
 			}
 		}
-		return retr;
 	}
 
-	/**
-	 * scan args for a particular arg, return the following token(s) or the
-	 * empty string if not found
-	 * 
-	 * @param id
-	 *            the arg to search for
-	 * @param args
-	 *            the array of strings
-	 * @return the following token, or a 0 length string array if not found
-	 */
-	private static String[] getMultipleArg2(String id, String[] args) {
-		String[] retr = {};
-		for (int i = 0; i < args.length; i++) {
-			if (id.equals(args[i])) {
-				int j = 0;
-				while ((i + 1 + j < args.length)
-						&& !args[i + 1 + j].startsWith("-")) {
-					String[] temp = new String[retr.length + 1];
-					for (int s = 0; s < retr.length; s++) {
-						temp[s] = retr[s];
+	private String produceUniqueName(AnalysisEngineManagement aem) {
+		String[] parts = aem.getUniqueMBeanName().split(",");
+		StringBuffer sb = new StringBuffer();
+		for (String part : parts) {
+			int pos;
+			if ((pos = part.indexOf("=")) > -1 && part.startsWith("p")) {
+				String n = part.substring(pos + 1, part.indexOf(" Components"));
+				if (part.startsWith("p0=") && n.indexOf(" ") > -1) {
+					String indx = n.substring(n.lastIndexOf(" "));
+					if (indx != null) {
+						int instanceNumber = -1;
+						try {
+							instanceNumber = Integer.parseInt(indx.trim());
+							sb.append(instanceNumber).append(" Components ");
+							n = n.substring(0, n.lastIndexOf(" "));
+						} catch (NumberFormatException nfe) {
+						}
 					}
-					retr = temp;
-					retr[retr.length - 1] = args[i + 1 + j++];
 				}
-				return retr;
+				sb.append("/").append(n.trim());
+			} else if (part.trim().startsWith("name=")) {
+				sb.append("/").append(
+						part.substring(part.trim().indexOf("=") + 1));
 			}
 		}
-		return retr;
-	}
-
-	protected void finalize() {
-		System.err.println(this + " finalized");
-	}
-
-	private static void printUsageMessage() {
-		System.out
-				.println(" Arguments to the program are as follows : \n"
-						+ "-d path-to-UIMA-Deployment-Descriptor [-d path-to-UIMA-Deployment-Descriptor ...] \n"
-						+ "-saxon path-to-saxon.jar \n"
-						+ "-q top level service queue name \n"
-						+ "-xslt path-to-dd2spring-xslt\n"
-						+ "   or\n"
-						+ "path to Spring XML Configuration File which is the output of running dd2spring\n");
+		return sb.toString();
 	}
 
-	protected class UimaAsTestCallbackListener extends
-			UimaAsBaseCallbackListener {
+	private AnalysisEnginePerformanceMetrics deepCopyMetrics(
+			AnalysisEngineManagement aem, String uimaFullyQualifiedAEContext) {
+		String index = "";
+		
+			// Create a unique name with each AE name is separated with "/". Prepend
+			// "X Components" where
+			// X is a instance number of a scaled AE. Also, strip the X from the AE
+			// name. The instance number
+			// is added to each scaled up component during initialization of the
+			// uima-as. We need to prepend
+			// "X Components" to allow DUCC JD to parse the unique name correctly (
+			// basically for backwards
+			// compatibility.
+			int pos = aem.getUniqueMBeanName().lastIndexOf("name=");
+			if (pos > -1) {
+				// get the name of the component. In case of nested component this
+				// will be the KEY from AE descriptor
+				String tmp = aem.getUniqueMBeanName().substring(pos + 5);
+				// in case this is the top level AE, check if it has been scaled up
+				// by extracting its instance number.For example,
+				// NoOpAnnotator 2.
+				int last = tmp.lastIndexOf(" ");
+				if (last > -1) {
+					// extract instance number
+					index = tmp.substring(last);
+
+					try {
+						// check if the instance number is a number. If not silently
+						// handle the exception.
+						Integer.parseInt(index.trim());
+						System.out.println("deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
+						// strip the instance number from the AE name
+						uimaFullyQualifiedAEContext = uimaFullyQualifiedAEContext
+								.substring(0, last + 1);
+					} catch (NumberFormatException nfe) {
 
-		public void onBeforeProcessCAS(UimaASProcessStatus status,
-				String nodeIP, String pid) {
-			// System.out
-			// .println("runTest: onBeforeProcessCAS() Notification - CAS:"
-			// + status.getCasReferenceId()
-			// + " is being processed on machine:"
-			// + nodeIP
-			// + " by process (PID):" + pid);
-		}
-
-		public synchronized void onBeforeMessageSend(UimaASProcessStatus status) {
-			// casSent = status.getCasReferenceId();
-			// System.out
-			// .println("runTest: Received onBeforeMessageSend() Notification With CAS:"
-			// + status.getCasReferenceId());
-		}
-
-		public void onUimaAsServiceExit(EventTrigger cause) {
-			System.out
-					.println("runTest: Received onUimaAsServiceExit() Notification With Cause:"
-							+ cause.name());
-		}
-
-		public synchronized void entityProcessComplete(CAS aCAS,
-				EntityProcessStatus aProcessStatus,
-				List<AnalysisEnginePerformanceMetrics> componentMetricsList) {
-			String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
-					.getCasReferenceId();
-
-			// if (aProcessStatus instanceof UimaASProcessStatus) {
-			// if (aProcessStatus.isException()) {
-			// System.out
-			// .println("--------- Got Exception While Processing CAS"
-			// + casReferenceId);
-			// } else {
-			// System.out.println("Client Received Reply - CAS:"
-			// + casReferenceId);
-			// }
-			// }
-		}
-
-		/**
-		 * Callback method which is called by Uima EE client when a reply to
-		 * process CAS is received. The reply contains either the CAS or an
-		 * exception that occurred while processing the CAS.
-		 */
-		public synchronized void entityProcessComplete(CAS aCAS,
-				EntityProcessStatus aProcessStatus) {
-			String casReferenceId = ((UimaASProcessStatus) aProcessStatus)
-					.getCasReferenceId();
-
-			// if (aProcessStatus instanceof UimaASProcessStatus) {
-			// if (aProcessStatus.isException()) {
-			// System.out
-			// .println("--------- Got Exception While Processing CAS"
-			// + casReferenceId);
-			// } else {
-			// System.out.println("Client Received Reply - CAS:"
-			// + casReferenceId);
-			// }
-			// }
+					} catch( Exception e) {
+						System.out.println(Thread.currentThread().getId()+" deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
+					}
+				} else {
 
-		}
+					if (!uimaFullyQualifiedAEContext.endsWith(tmp)) {
+						uimaFullyQualifiedAEContext += "/" + tmp;
+					}
+				}
+			}
+			// Primitive AE will not have "X Components" prefix, but it is required
+			// by the DUCC JD to be there. Prepend it to the unique name.
+			if (uimaFullyQualifiedAEContext.indexOf(" Components ") == -1) {
+				uimaFullyQualifiedAEContext = index + " Components "
+						+ uimaFullyQualifiedAEContext;
+			}
+			return new AnalysisEnginePerformanceMetrics(aem.getName(),
+					uimaFullyQualifiedAEContext, aem.getAnalysisTime(),
+					aem.getNumberOfCASesProcessed());
+			
+		
+	}
 
-		/**
-		 * Callback method which is called by Uima EE client when the
-		 * initialization of the client is completed successfully.
-		 */
-		public void initializationComplete(EntityProcessStatus aStatus) {
-			synchronized (initializeMonitor) {
-				initialized = true;
-				initializeMonitor.notifyAll();
+	private List<AnalysisEnginePerformanceMetrics> getAEMetricsForCAS(
+			List<AnalysisEnginePerformanceMetrics> afterAnalysisManagementObjects,
+			List<AnalysisEnginePerformanceMetrics> beforeAnalysisManagementObjects)
+			throws Exception {
+		// Create a List to hold per CAS analysisTime and total number of CASes
+		// processed
+		// by each AE. This list will be serialized and sent to the client
+		List<AnalysisEnginePerformanceMetrics> performanceList = new ArrayList<AnalysisEnginePerformanceMetrics>();
+		// Diff the before process() performance metrics with post process
+		// performance
+		// metrics
+		for (AnalysisEnginePerformanceMetrics after : afterAnalysisManagementObjects) {
+			for (AnalysisEnginePerformanceMetrics before : beforeAnalysisManagementObjects) {
+				if (before.getUniqueName().equals(after.getUniqueName())) {
+
+					AnalysisEnginePerformanceMetrics metrics = new AnalysisEnginePerformanceMetrics(
+							after.getName(), after.getUniqueName(),
+							after.getAnalysisTime() - before.getAnalysisTime(),
+							after.getNumProcessed());
+					// System.out.println("********************"+metrics.getUniqueName());
+					// System.out.println("********************"+metrics.getName());
+					performanceList.add(metrics);
+					break;
+				}
 			}
 		}
+		return performanceList;
 
-		/**
-		 * Callback method which is called by Uima EE client when a CPC reply is
-		 * received OR exception occured while processing CPC request.
-		 */
-		public void collectionProcessComplete(EntityProcessStatus aStatus) {
-		}
 	}
 }

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.ducc.user.jp.iface;
+
+import java.util.List;
+import java.util.Properties;
+
+public interface IProcessContainer {
+	public int initialize(String[] args) throws Exception;
+	public void deploy() throws Exception;
+	public void stop() throws Exception;
+	public List<Properties> process(Object xmi) throws Exception;
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.jp.uima;
+
+public class CasManager {
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/CasManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java?rev=1644428&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java Wed Dec 10 14:26:53 2014
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.jp.uima;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+import org.apache.uima.analysis_engine.AnalysisEngine;
+
+
+public class UimaAnalysisEngineInstancePoolWithThreadAffinity {
+	  private static final Class<?> CLASS_NAME = UimaAnalysisEngineInstancePoolWithThreadAffinity.class;
+
+	  private volatile boolean destroyAEInstanceIfFree=false;
+	  private Semaphore lock = new Semaphore(1);
+	  
+	  private Map<Long, AnalysisEngine> aeInstanceMap = new HashMap<Long,AnalysisEngine>();
+
+	  public int size() {
+	    return aeInstanceMap.size();
+	  }
+
+	  public void checkin(AnalysisEngine anAnalysisEngine) throws Exception {
+		  try {
+			  lock.acquireUninterruptibly();
+			  // Call destroy() on AE on checkin if the process is in quiesce mode  
+			  if ( destroyAEInstanceIfFree ) {
+			    anAnalysisEngine.destroy();
+			  } else {
+		      aeInstanceMap.put(Thread.currentThread().getId(), anAnalysisEngine);
+			  }
+	 	  } catch( Exception e) {
+			  e.printStackTrace();
+			  throw e;
+		  } finally {
+			  lock.release();
+		  }
+	  }
+
+	  public boolean exists() {
+	    return aeInstanceMap.containsKey(Thread.currentThread().getId());
+	  }
+
+	  /**
+	   * Pins each process thread to a specific and dedicated AE instance. All AE instances are managed
+	   * in a HashMap with thread name as a key. AE instance is not removed from the HashMap before it
+	   * is returned to the client.
+	   * 
+	   * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#checkout()
+	   **/
+	  public AnalysisEngine checkout() throws Exception {
+		  try {
+			  lock.acquireUninterruptibly();
+			  if ( !exists() ) {
+				  throw new RuntimeException("AE instance not found in AE pool. Most likely due to service quiescing");
+			  }
+		    // AEs are instantiated and initialized in the the main thread and placed in the temporary list.
+		    // First time in the process() method, each thread will remove AE instance from the temporary
+		    // list
+		    // and place it in the permanent instanceMap. The key to the instanceMap is the thread name.
+		    // Each
+		    // thread will always process a CAS using its own and dedicated AE instance.
+		    return (AnalysisEngine) aeInstanceMap.remove(Thread.currentThread().getId());
+
+		  } catch( Exception e) {
+			  throw e;
+		  } finally {
+			  lock.release();
+		  }
+
+		  
+		  
+	  }
+	  /*
+	   * (non-Javadoc)
+	   * 
+	   * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#destroy()
+	   */
+	  public void destroy() throws Exception {
+		  //	set the flag so that any AE instance returned from PrimitiveController
+		  //    will be destroyed. 
+		  destroyAEInstanceIfFree = true;
+	  }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/uima/UimaAnalysisEngineInstancePoolWithThreadAffinity.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain