You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/03/03 20:47:51 UTC

svn commit: r1663769 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/ uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/ uima-ducc-transport/src/main/java/org/apache/ui...

Author: cwiklik
Date: Tue Mar  3 19:47:51 2015
New Revision: 1663769

URL: http://svn.apache.org/r1663769
Log:
UIMA-4066 Added support for investment reset. 

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java   (with props)
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java Tue Mar  3 19:47:51 2015
@@ -18,6 +18,7 @@
 */
 package org.apache.uima.ducc.common.main;
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.uima.ducc.common.compo
 import org.apache.uima.ducc.common.exception.DuccComponentInitializationException;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.user.common.investment.Investment;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.AnnotationConfigApplicationContext;
 
@@ -55,11 +57,14 @@ import org.springframework.context.annot
 public class DuccService extends AbstractDuccComponent {
 	public static final String DUCC_PROPERTY_FILE="ducc.deploy.configuration";
 	public static final String DUCC_DEPLOY_COMPONENTS="ducc.deploy.components";
-
+    //private Investment investment = null;
 	private Main main;
     private static DuccLogger globalLogger = null;
     private ApplicationContext context;
     Map<String,AbstractDuccComponent> duccComponents = null;
+    
+    private Object investmentInstance;
+    
     private String[] args = null;
 	public DuccService() {
 		super("");
@@ -190,6 +195,24 @@ public class DuccService extends Abstrac
 	  return null;
 	}
 	/**
+	 * This method returns an instance of IJobProcessor which would only exist
+	 * in a JP and UIMA-based AP.
+	 * 
+	 * @return - IJobProcessor instance
+	 */
+	public IJobProcessor getJobProcessorComponent() {
+	    //  Extract all Ducc components from Spring container
+	    Map<String,AbstractDuccComponent> duccComponents = 
+	      context.getBeansOfType(AbstractDuccComponent.class);
+	    // scan for component which implements IJobProcessor interface.
+	    for(Map.Entry<String, AbstractDuccComponent> duccComponent: duccComponents.entrySet()) {
+	      if ( duccComponent.getValue() instanceof IJobProcessor) {
+	        return (IJobProcessor)duccComponent.getValue();
+	      }
+	    }
+		return null;
+	}
+	/**
 	 * This method is only called when launching a JP.
 	 * @param instanceType
 	 * @return
@@ -311,4 +334,12 @@ public class DuccService extends Abstrac
         }
 		return "";
 	}
+	public void registerInvestmentInstance(Object instance) {
+		this.investmentInstance = instance;
+	}
+	public void registerInvestmentResetCallback(Object o, Method m) throws Exception {
+		Method investmentInstanceMethod = 
+				investmentInstance.getClass().getDeclaredMethod("setJobComponent", Object.class, Method.class);
+		investmentInstanceMethod.invoke(investmentInstance, o,m);
+	}
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Tue Mar  3 19:47:51 2015
@@ -25,7 +25,9 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.net.SocketTimeoutException;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -52,14 +54,18 @@ public class HttpWorkerThread implements
 	private Object processorInstance = null;
     private static AtomicInteger IdGenerator =
     		new AtomicInteger();
+    private Map<String, IMetaCasTransaction> transactionMap =
+    		new ConcurrentHashMap<String, IMetaCasTransaction>();
+
 	public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
 			Object processorInstance, CountDownLatch workerThreadCount,
-			CountDownLatch threadReadyCount) {
+			CountDownLatch threadReadyCount, Map<String, IMetaCasTransaction> transactionMap) {
 		this.duccComponent = component;
 		this.httpClient = httpClient;
 		this.processorInstance = processorInstance;
 		this.workerThreadCount = workerThreadCount;
 		this.threadReadyCount = threadReadyCount;
+		this.transactionMap = transactionMap;
 	}
 	@SuppressWarnings("unchecked")
 	public void run() {
@@ -67,6 +73,7 @@ public class HttpWorkerThread implements
 		PostMethod postMethod = null;
 	    logger.info("HttpWorkerThread.run()", null, "Starting JP Process Thread Id:"+Thread.currentThread().getId());
 	    Method processMethod = null;
+	    Method getKeyMethod = null;
 	    boolean error=false;
 	    // ***** DEPLOY ANALYTICS ***********
 	    // First, deploy analytics in a provided process container. Use java reflection to call
@@ -74,6 +81,7 @@ public class HttpWorkerThread implements
 	    // loaded from ducc-user jar provided in system classpath
 	    try {
 			processMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("process", Object.class);	
+			getKeyMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("getKey", String.class);	
 			
 			synchronized(HttpWorkerThread.class) {
 				Method deployMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("deploy");
@@ -191,17 +199,36 @@ public class HttpWorkerThread implements
 						// the Agent will wait for 1 minute (default) before killing
 						// this process via kill -9
 						try {
+							// To support investment reset we need to store transaction
+							// object under a known key. This key is stored in the CAS.
+							// In order to get to it, we need to deserialize the CAS 
+							// in the user container. When an asynchronous investment
+							// reset call is made from the user code, it will contain
+							// that key to allow us to look up original transaction so that
+							// we can send reset request to the JD.
+							String key = (String)
+									getKeyMethod.invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas());
+							if ( key != null ) {
+								// add transaction under th
+								transactionMap.put(key, transaction);
+							}
+							
 							//    ********** PROCESS() **************
-							// using java reflection, call process to analyze the CAS
+							// using java reflection, call process to analyze the CAS. While 
+							// we are blocking, user code may issue investment reset asynchronously.
 							 List<Properties> metrics = (List<Properties>)processMethod.
 							   invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas());
 							//    ***********************************
-							 
+							if ( key != null ) {
+                                // process ended we no longer expect investment reset from user
+								// so remove transaction from the map
+								transactionMap.remove(key);
+							}
+							
 		                    logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
 							IPerformanceMetrics metricsWrapper =
 									new PerformanceMetrics();
 							metricsWrapper.set(metrics);
-							
 							transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
 							
 						}  catch( InvocationTargetException ee) {

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Tue Mar  3 19:47:51 2015
@@ -20,7 +20,10 @@
 package org.apache.uima.ducc.transport.configuration.jp;
 
 import java.lang.reflect.Method;
+import java.net.SocketTimeoutException;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -29,11 +32,15 @@ import java.util.concurrent.ScheduledThr
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.uima.ducc.common.component.AbstractDuccComponent;
 import org.apache.uima.ducc.common.component.IJobProcessor;
 import org.apache.uima.ducc.common.container.FlagsHelper;
 import org.apache.uima.ducc.common.main.DuccService;
 import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
 public class JobProcessComponent extends AbstractDuccComponent 
@@ -56,10 +63,12 @@ implements IJobProcessor{
 	ScheduledThreadPoolExecutor executor = null;
 	ExecutorService tpe = null;
     private volatile boolean uimaASJob=false;
+    Map<String, IMetaCasTransaction> transactionMap =
+    		new ConcurrentHashMap<String, IMetaCasTransaction>();
+    
     
 	private DuccHttpClient httpClient = null;
     private Object processorInstance=null;
-//    private String[] args = null;
 	public JobProcessComponent(String componentName, CamelContext ctx,JobProcessConfiguration jpc) {
 		super(componentName,ctx);
 		this.configuration = jpc;
@@ -115,6 +124,37 @@ implements IJobProcessor{
 	public int getTimeout() {
 		return this.timeout;
 	}
+	
+	public void resetInvestment(String key) throws Exception {
+		if ( httpClient != null && transactionMap.containsKey(key) ) {
+			// Fetch a transaction object associated with a WI id (key)
+			IMetaCasTransaction transaction = transactionMap.get(key);
+			PostMethod postMethod = new PostMethod(httpClient.getJdUrl());
+			// Dont return serialized CAS to reduce the msg size
+			transaction.getMetaCas().setUserSpaceCas(null);
+			transaction.setType(Type.InvestmentReset);
+			
+			// Set request timeout
+			postMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, getTimeout());
+ 			// Retries timeouts, otherwise throws RuntimeException. Don't rethrow
+			// the original Exception as it may contain classes that are not
+			// loaded into the user container from which this call originated.
+			while( isRunning() ) {
+    			try {
+    				logger.info("resetInvestment", null, "User Requested Investment Reset - sending request to JD - WI:"+transaction.getMetaCas().getSystemKey()+" user key:"+key);
+        			httpClient.execute(transaction, postMethod);
+        			break;
+    			} catch(SocketTimeoutException  e) {
+    				logger.info("resetInvestment", null, "Timeout while waiting for Investment Reset response from JD - retrying - WI:"+transaction.getMetaCas().getSystemKey());
+    			} catch(Exception e) {
+    				logger.info("resetInvestment", null, "Error while trying send Investment Reset request to JD. Returning to the caller (no retries) WI:"+transaction.getMetaCas().getSystemKey());
+    				logger.info("resetInvestment", null, e);
+    				throw new RuntimeException("Unable to deliver Investment Reset request to JD due to "+e.getCause().getMessage());
+    			}
+            	
+            }
+		}  
+	}
 	/**
 	 * This method is called by super during ducc framework boot
 	 * sequence. It creates all the internal components and worker threads
@@ -123,12 +163,17 @@ implements IJobProcessor{
 	 */
 	public void start(DuccService service, String[] args) throws Exception {
 		super.start(service, args);
-		
+        
 		try {
 			if ( args == null || args.length ==0 || args[0] == null || args[0].trim().length() == 0) {
 				logger.warn("start", null, "Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
                 throw new RuntimeException("Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
 			}
+			// this class implements resetInvestment method
+			Method m = this.getClass().getDeclaredMethod("resetInvestment", String.class);
+			// register this class and its method to handle investment reset
+			service.registerInvestmentResetCallback(this, m);
+
 			String processJmxUrl = super.getProcessJmxUrl();
 			// tell the agent that this process is initializing
 			agent.notify(ProcessState.Initializing, processJmxUrl);
@@ -210,7 +255,7 @@ implements IJobProcessor{
 		    	// Create and start worker threads that pull Work Items from the JD
 		    	Future<?>[] threadHandles = new Future<?>[scaleout];
 				for (int j = 0; j < scaleout; j++) {
-					threadHandles[j] = tpe.submit(new HttpWorkerThread(this, httpClient, processorInstance, workerThreadCount, threadReadyCount));
+					threadHandles[j] = tpe.submit(new HttpWorkerThread(this, httpClient, processorInstance, workerThreadCount, threadReadyCount, transactionMap));
 				}
 				// wait until all process threads initialize
 				threadReadyCount.await();

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java Tue Mar  3 19:47:51 2015
@@ -247,4 +247,9 @@ public class ServiceComponent extends Ab
 		}
 	}
 
+	public void resetInvestment(String key) throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
 }

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java?rev=1663769&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java Tue Mar  3 19:47:51 2015
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.common.investment;
+
+import java.lang.reflect.Method;
+
+public class Investment {
+	static Object instance;
+	static Method method;
+	/**
+	 * This is called by Ducc framework JobProcessComponent to register a method
+	 * to call when a user wants to reset investment.
+	 * 
+	 * @param Object o - instance of JobProcessComponent
+	 * @param Method m - JobProcessComponent method to call to reset 
+	 *                   investment
+	 * 
+	 */
+	public void setJobComponent( Object o, Method m) {
+		instance = o;
+		method = m;
+	}
+	/**
+	 * This method is called from the user code to request investment reset.
+	 * 
+	 * @param key - unique key identifying a Work Item. This comes from 
+	 *              a CAS.
+	 * @throws Exception
+	 */
+	public static void reset(String key) throws Exception {
+		method.invoke(instance, key);
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java Tue Mar  3 19:47:51 2015
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 
+import org.apache.uima.ducc.user.common.investment.Investment;
 import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
 
 /**
@@ -33,6 +34,8 @@ import org.apache.uima.ducc.user.jp.ifac
  */
 public class DuccJobService {
 	boolean DEBUG = false;
+	private Investment investment = null;
+	
 	public static URLClassLoader create(String classPath)
 			throws MalformedURLException {
 		return create(classPath.split(":"));
@@ -91,7 +94,8 @@ public class DuccJobService {
 			System.setProperty("ducc.user.log4j.saved.configuration",log4jConfigurationFile);
 			System.getProperties().remove("log4j.configuration");
 		}
-
+        investment = new Investment();
+        
         // cache current context classloader
 		ClassLoader sysCL = Thread.currentThread().getContextClassLoader();
 		// Fetch a classpath for the fenced Ducc container
@@ -114,7 +118,11 @@ public class DuccJobService {
 
 		// initialize Ducc fenced container. It calls component's Configuration class
 		Method bootMethod = classToLaunch.getMethod("boot", String[].class);
-		bootMethod.invoke(duccContainerInstance, (Object) args);
+		try {
+			bootMethod.invoke(duccContainerInstance, (Object) args);
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
 
 		// below property is set by component's Configuration class. It can also
 		// be provided on the command line in case a custom processor is needed.
@@ -139,6 +147,11 @@ public class DuccJobService {
 		Method setProcessorMethod = classToLaunch.getMethod("setProcessor",
 				Object.class, String[].class);
 		setProcessorMethod.invoke(duccContainerInstance, pc, args);
+
+		Method registerInvestmentInstanceMethod = classToLaunch.getMethod("registerInvestmentInstance",
+				Object.class);
+		registerInvestmentInstanceMethod.invoke(duccContainerInstance, investment);
+		
         // Call DuccService.start() to initialize the process
 		// and begin processing
 		Method startMethod = classToLaunch.getMethod("start");// ,

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java Tue Mar  3 19:47:51 2015
@@ -25,8 +25,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.ducc.CasHelper;
 import org.apache.uima.ducc.user.common.DuccUimaSerializer;
 import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
+import org.apache.uima.resource.metadata.FsIndexDescription;
+import org.apache.uima.resource.metadata.TypePriorities;
+import org.apache.uima.resource.metadata.TypeSystemDescription;
+import org.apache.uima.util.CasCreationUtils;
 
 public abstract class DuccAbstractProcessContainer implements IProcessContainer{
 	// Container implementation must implement the following methods
@@ -34,6 +43,7 @@ public abstract class DuccAbstractProces
     protected abstract int doInitialize(Properties p, String[] arg) throws Exception;
     protected abstract void doStop() throws Exception;
     protected abstract List<Properties>  doProcess(Object subject) throws Exception;
+    protected 	AnalysisEngineMetaData analysisEngineMetadata;
 
 	protected Throwable lastError = null;
     protected int scaleout=1;
@@ -41,6 +51,35 @@ public abstract class DuccAbstractProces
 	protected static Map<Long, DuccUimaSerializer> serializerMap =
 			new HashMap<Long, DuccUimaSerializer>();
 
+	/**
+	 * This method is called to fetch a WorkItem ID from a given CAS which
+	 * is required to support investment reset. 
+	 *
+	 */
+	public String getKey(String xmi) throws Exception {
+		if ( analysisEngineMetadata == null ) {
+			// WorkItem ID (key) is only supported for pieces 'n parts 
+			return null;
+		} 
+		Properties props = new Properties();
+        props.setProperty(UIMAFramework.CAS_INITIAL_HEAP_SIZE, "1000");
+
+		TypeSystemDescription tsd = analysisEngineMetadata.getTypeSystem();
+		TypePriorities tp = analysisEngineMetadata.getTypePriorities();
+		FsIndexDescription[] fsid = analysisEngineMetadata.getFsIndexes();
+		CAS cas;
+		synchronized( CasCreationUtils.class) {
+			cas = CasCreationUtils.createCas(tsd, tp, fsid, props);
+		}
+		// deserialize the CAS
+		XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+		getUimaSerializer().
+		    deserializeCasFromXmi((String)xmi, cas, deserSharedData, true,-1);
+		
+		String key = CasHelper.getId(cas);
+		cas.release();
+		return key;
+	}
     public int getScaleout( ){
 		return scaleout;
 	}

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Tue Mar  3 19:47:51 2015
@@ -58,7 +58,6 @@ public class UimaProcessContainer extend
 	// sure that a thread used to initialized the AE is used to call process().
 	// Some AEs depend on ThreadLocal storage.
 	UimaAnalysisEngineInstancePoolWithThreadAffinity instanceMap = new UimaAnalysisEngineInstancePoolWithThreadAffinity();
-	AnalysisEngineMetaData analysisEngineMetadata;
    
 	private static CasPool casPool = null;
 	  AtomicInteger counter = new AtomicInteger();
@@ -147,6 +146,9 @@ public class UimaProcessContainer extend
 			instanceMap.checkin(ae);
 			if (instanceMap.size() == scaleout) {
 				try {
+					Properties props = new Properties();
+			        props.setProperty(UIMAFramework.CAS_INITIAL_HEAP_SIZE, "1000");
+
 					analysisEngineMetadata = ae.getAnalysisEngineMetaData();
 					casPool = new CasPool(scaleout, analysisEngineMetadata,rm);
 					latch.countDown();
@@ -189,8 +191,6 @@ public class UimaProcessContainer extend
 			lastError = null;
 			XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
 			// deserialize the CAS
-//			uimaSerializer.deserializeCasFromXmi((String) xmi, cas,
-//					deserSharedData, true, -1);
 			super.getUimaSerializer().
 			    deserializeCasFromXmi((String)xmi, cas, deserSharedData, true,-1);
 

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java Tue Mar  3 19:47:51 2015
@@ -70,4 +70,12 @@ public interface IProcessContainer {
 	 * @return
 	 */
 	public boolean useThreadAffinity();
+	
+	/**
+	 * 
+	 * @return
+	 * @throws Exception
+	 */
+	public String getKey(String cargo) throws Exception;
+	
 }