You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/03/03 20:47:51 UTC
svn commit: r1663769 - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/
uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/
uima-ducc-transport/src/main/java/org/apache/ui...
Author: cwiklik
Date: Tue Mar 3 19:47:51 2015
New Revision: 1663769
URL: http://svn.apache.org/r1663769
Log:
UIMA-4066 Added support for investment reset.
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java (with props)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccService.java Tue Mar 3 19:47:51 2015
@@ -18,6 +18,7 @@
*/
package org.apache.uima.ducc.common.main;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.uima.ducc.common.compo
import org.apache.uima.ducc.common.exception.DuccComponentInitializationException;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.user.common.investment.Investment;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@@ -55,11 +57,14 @@ import org.springframework.context.annot
public class DuccService extends AbstractDuccComponent {
public static final String DUCC_PROPERTY_FILE="ducc.deploy.configuration";
public static final String DUCC_DEPLOY_COMPONENTS="ducc.deploy.components";
-
+ //private Investment investment = null;
private Main main;
private static DuccLogger globalLogger = null;
private ApplicationContext context;
Map<String,AbstractDuccComponent> duccComponents = null;
+
+ private Object investmentInstance;
+
private String[] args = null;
public DuccService() {
super("");
@@ -190,6 +195,24 @@ public class DuccService extends Abstrac
return null;
}
/**
+ * This method returns an instance of IJobProcessor which would only exist
+ * in a JP and UIMA-based AP.
+ *
+ * @return - IJobProcessor instance
+ */
+ public IJobProcessor getJobProcessorComponent() {
+ // Extract all Ducc components from Spring container
+ Map<String,AbstractDuccComponent> duccComponents =
+ context.getBeansOfType(AbstractDuccComponent.class);
+ // scan for component which implements IJobProcessor interface.
+ for(Map.Entry<String, AbstractDuccComponent> duccComponent: duccComponents.entrySet()) {
+ if ( duccComponent.getValue() instanceof IJobProcessor) {
+ return (IJobProcessor)duccComponent.getValue();
+ }
+ }
+ return null;
+ }
+ /**
* This method is only called when launching a JP.
* @param instanceType
* @return
@@ -311,4 +334,12 @@ public class DuccService extends Abstrac
}
return "";
}
+ public void registerInvestmentInstance(Object instance) {
+ this.investmentInstance = instance;
+ }
+ public void registerInvestmentResetCallback(Object o, Method m) throws Exception {
+ Method investmentInstanceMethod =
+ investmentInstance.getClass().getDeclaredMethod("setJobComponent", Object.class, Method.class);
+ investmentInstanceMethod.invoke(investmentInstance, o,m);
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Tue Mar 3 19:47:51 2015
@@ -25,7 +25,9 @@ import java.lang.reflect.InvocationTarge
import java.lang.reflect.Method;
import java.net.SocketTimeoutException;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,14 +54,18 @@ public class HttpWorkerThread implements
private Object processorInstance = null;
private static AtomicInteger IdGenerator =
new AtomicInteger();
+ private Map<String, IMetaCasTransaction> transactionMap =
+ new ConcurrentHashMap<String, IMetaCasTransaction>();
+
public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient,
Object processorInstance, CountDownLatch workerThreadCount,
- CountDownLatch threadReadyCount) {
+ CountDownLatch threadReadyCount, Map<String, IMetaCasTransaction> transactionMap) {
this.duccComponent = component;
this.httpClient = httpClient;
this.processorInstance = processorInstance;
this.workerThreadCount = workerThreadCount;
this.threadReadyCount = threadReadyCount;
+ this.transactionMap = transactionMap;
}
@SuppressWarnings("unchecked")
public void run() {
@@ -67,6 +73,7 @@ public class HttpWorkerThread implements
PostMethod postMethod = null;
logger.info("HttpWorkerThread.run()", null, "Starting JP Process Thread Id:"+Thread.currentThread().getId());
Method processMethod = null;
+ Method getKeyMethod = null;
boolean error=false;
// ***** DEPLOY ANALYTICS ***********
// First, deploy analytics in a provided process container. Use java reflection to call
@@ -74,6 +81,7 @@ public class HttpWorkerThread implements
// loaded from ducc-user jar provided in system classpath
try {
processMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("process", Object.class);
+ getKeyMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("getKey", String.class);
synchronized(HttpWorkerThread.class) {
Method deployMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("deploy");
@@ -191,17 +199,36 @@ public class HttpWorkerThread implements
// the Agent will wait for 1 minute (default) before killing
// this process via kill -9
try {
+ // To support investment reset we need to store transaction
+ // object under a known key. This key is stored in the CAS.
+ // In order to get to it, we need to deserialize the CAS
+ // in the user container. When an asynchronous investment
+ // reset call is made from the user code, it will contain
+ // that key to allow us to look up original transaction so that
+ // we can send reset request to the JD.
+ String key = (String)
+ getKeyMethod.invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas());
+ if ( key != null ) {
+ // add transaction under th
+ transactionMap.put(key, transaction);
+ }
+
// ********** PROCESS() **************
- // using java reflection, call process to analyze the CAS
+ // using java reflection, call process to analyze the CAS. While
+ // we are blocking, user code may issue investment reset asynchronously.
List<Properties> metrics = (List<Properties>)processMethod.
invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas());
// ***********************************
-
+ if ( key != null ) {
+ // process ended we no longer expect investment reset from user
+ // so remove transaction from the map
+ transactionMap.remove(key);
+ }
+
logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
IPerformanceMetrics metricsWrapper =
new PerformanceMetrics();
metricsWrapper.set(metrics);
-
transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
} catch( InvocationTargetException ee) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Tue Mar 3 19:47:51 2015
@@ -20,7 +20,10 @@
package org.apache.uima.ducc.transport.configuration.jp;
import java.lang.reflect.Method;
+import java.net.SocketTimeoutException;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -29,11 +32,15 @@ import java.util.concurrent.ScheduledThr
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.component.IJobProcessor;
import org.apache.uima.ducc.common.container.FlagsHelper;
import org.apache.uima.ducc.common.main.DuccService;
import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public class JobProcessComponent extends AbstractDuccComponent
@@ -56,10 +63,12 @@ implements IJobProcessor{
ScheduledThreadPoolExecutor executor = null;
ExecutorService tpe = null;
private volatile boolean uimaASJob=false;
+ Map<String, IMetaCasTransaction> transactionMap =
+ new ConcurrentHashMap<String, IMetaCasTransaction>();
+
private DuccHttpClient httpClient = null;
private Object processorInstance=null;
-// private String[] args = null;
public JobProcessComponent(String componentName, CamelContext ctx,JobProcessConfiguration jpc) {
super(componentName,ctx);
this.configuration = jpc;
@@ -115,6 +124,37 @@ implements IJobProcessor{
public int getTimeout() {
return this.timeout;
}
+
+ public void resetInvestment(String key) throws Exception {
+ if ( httpClient != null && transactionMap.containsKey(key) ) {
+ // Fetch a transaction object associated with a WI id (key)
+ IMetaCasTransaction transaction = transactionMap.get(key);
+ PostMethod postMethod = new PostMethod(httpClient.getJdUrl());
+ // Dont return serialized CAS to reduce the msg size
+ transaction.getMetaCas().setUserSpaceCas(null);
+ transaction.setType(Type.InvestmentReset);
+
+ // Set request timeout
+ postMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, getTimeout());
+ // Retries timeouts, otherwise throws RuntimeException. Don't rethrow
+ // the original Exception as it may contain classes that are not
+ // loaded into the user container from which this call originated.
+ while( isRunning() ) {
+ try {
+ logger.info("resetInvestment", null, "User Requested Investment Reset - sending request to JD - WI:"+transaction.getMetaCas().getSystemKey()+" user key:"+key);
+ httpClient.execute(transaction, postMethod);
+ break;
+ } catch(SocketTimeoutException e) {
+ logger.info("resetInvestment", null, "Timeout while waiting for Investment Reset response from JD - retrying - WI:"+transaction.getMetaCas().getSystemKey());
+ } catch(Exception e) {
+ logger.info("resetInvestment", null, "Error while trying send Investment Reset request to JD. Returning to the caller (no retries) WI:"+transaction.getMetaCas().getSystemKey());
+ logger.info("resetInvestment", null, e);
+ throw new RuntimeException("Unable to deliver Investment Reset request to JD due to "+e.getCause().getMessage());
+ }
+
+ }
+ }
+ }
/**
* This method is called by super during ducc framework boot
* sequence. It creates all the internal components and worker threads
@@ -123,12 +163,17 @@ implements IJobProcessor{
*/
public void start(DuccService service, String[] args) throws Exception {
super.start(service, args);
-
+
try {
if ( args == null || args.length ==0 || args[0] == null || args[0].trim().length() == 0) {
logger.warn("start", null, "Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
throw new RuntimeException("Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
}
+ // this class implements resetInvestment method
+ Method m = this.getClass().getDeclaredMethod("resetInvestment", String.class);
+ // register this class and its method to handle investment reset
+ service.registerInvestmentResetCallback(this, m);
+
String processJmxUrl = super.getProcessJmxUrl();
// tell the agent that this process is initializing
agent.notify(ProcessState.Initializing, processJmxUrl);
@@ -210,7 +255,7 @@ implements IJobProcessor{
// Create and start worker threads that pull Work Items from the JD
Future<?>[] threadHandles = new Future<?>[scaleout];
for (int j = 0; j < scaleout; j++) {
- threadHandles[j] = tpe.submit(new HttpWorkerThread(this, httpClient, processorInstance, workerThreadCount, threadReadyCount));
+ threadHandles[j] = tpe.submit(new HttpWorkerThread(this, httpClient, processorInstance, workerThreadCount, threadReadyCount, transactionMap));
}
// wait until all process threads initialize
threadReadyCount.await();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceComponent.java Tue Mar 3 19:47:51 2015
@@ -247,4 +247,9 @@ public class ServiceComponent extends Ab
}
}
+ public void resetInvestment(String key) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java?rev=1663769&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java Tue Mar 3 19:47:51 2015
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.common.investment;
+
+import java.lang.reflect.Method;
+
+public class Investment {
+ static Object instance;
+ static Method method;
+ /**
+ * This is called by Ducc framework JobProcessComponent to register a method
+ * to call when a user wants to reset investment.
+ *
+ * @param Object o - instance of JobProcessComponent
+ * @param Method m - JobProcessComponent method to call to reset
+ * investment
+ *
+ */
+ public void setJobComponent( Object o, Method m) {
+ instance = o;
+ method = m;
+ }
+ /**
+ * This method is called from the user code to request investment reset.
+ *
+ * @param key - unique key identifying a Work Item. This comes from
+ * a CAS.
+ * @throws Exception
+ */
+ public static void reset(String key) throws Exception {
+ method.invoke(instance, key);
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/investment/Investment.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java Tue Mar 3 19:47:51 2015
@@ -25,6 +25,7 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import org.apache.uima.ducc.user.common.investment.Investment;
import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
/**
@@ -33,6 +34,8 @@ import org.apache.uima.ducc.user.jp.ifac
*/
public class DuccJobService {
boolean DEBUG = false;
+ private Investment investment = null;
+
public static URLClassLoader create(String classPath)
throws MalformedURLException {
return create(classPath.split(":"));
@@ -91,7 +94,8 @@ public class DuccJobService {
System.setProperty("ducc.user.log4j.saved.configuration",log4jConfigurationFile);
System.getProperties().remove("log4j.configuration");
}
-
+ investment = new Investment();
+
// cache current context classloader
ClassLoader sysCL = Thread.currentThread().getContextClassLoader();
// Fetch a classpath for the fenced Ducc container
@@ -114,7 +118,11 @@ public class DuccJobService {
// initialize Ducc fenced container. It calls component's Configuration class
Method bootMethod = classToLaunch.getMethod("boot", String[].class);
- bootMethod.invoke(duccContainerInstance, (Object) args);
+ try {
+ bootMethod.invoke(duccContainerInstance, (Object) args);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
// below property is set by component's Configuration class. It can also
// be provided on the command line in case a custom processor is needed.
@@ -139,6 +147,11 @@ public class DuccJobService {
Method setProcessorMethod = classToLaunch.getMethod("setProcessor",
Object.class, String[].class);
setProcessorMethod.invoke(duccContainerInstance, pc, args);
+
+ Method registerInvestmentInstanceMethod = classToLaunch.getMethod("registerInvestmentInstance",
+ Object.class);
+ registerInvestmentInstanceMethod.invoke(duccContainerInstance, investment);
+
// Call DuccService.start() to initialize the process
// and begin processing
Method startMethod = classToLaunch.getMethod("start");// ,
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/DuccAbstractProcessContainer.java Tue Mar 3 19:47:51 2015
@@ -25,8 +25,17 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.ducc.CasHelper;
import org.apache.uima.ducc.user.common.DuccUimaSerializer;
import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
+import org.apache.uima.resource.metadata.FsIndexDescription;
+import org.apache.uima.resource.metadata.TypePriorities;
+import org.apache.uima.resource.metadata.TypeSystemDescription;
+import org.apache.uima.util.CasCreationUtils;
public abstract class DuccAbstractProcessContainer implements IProcessContainer{
// Container implementation must implement the following methods
@@ -34,6 +43,7 @@ public abstract class DuccAbstractProces
protected abstract int doInitialize(Properties p, String[] arg) throws Exception;
protected abstract void doStop() throws Exception;
protected abstract List<Properties> doProcess(Object subject) throws Exception;
+ protected AnalysisEngineMetaData analysisEngineMetadata;
protected Throwable lastError = null;
protected int scaleout=1;
@@ -41,6 +51,35 @@ public abstract class DuccAbstractProces
protected static Map<Long, DuccUimaSerializer> serializerMap =
new HashMap<Long, DuccUimaSerializer>();
+ /**
+ * This method is called to fetch a WorkItem ID from a given CAS which
+ * is required to support investment reset.
+ *
+ */
+ public String getKey(String xmi) throws Exception {
+ if ( analysisEngineMetadata == null ) {
+ // WorkItem ID (key) is only supported for pieces 'n parts
+ return null;
+ }
+ Properties props = new Properties();
+ props.setProperty(UIMAFramework.CAS_INITIAL_HEAP_SIZE, "1000");
+
+ TypeSystemDescription tsd = analysisEngineMetadata.getTypeSystem();
+ TypePriorities tp = analysisEngineMetadata.getTypePriorities();
+ FsIndexDescription[] fsid = analysisEngineMetadata.getFsIndexes();
+ CAS cas;
+ synchronized( CasCreationUtils.class) {
+ cas = CasCreationUtils.createCas(tsd, tp, fsid, props);
+ }
+ // deserialize the CAS
+ XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+ getUimaSerializer().
+ deserializeCasFromXmi((String)xmi, cas, deserSharedData, true,-1);
+
+ String key = CasHelper.getId(cas);
+ cas.release();
+ return key;
+ }
public int getScaleout( ){
return scaleout;
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java Tue Mar 3 19:47:51 2015
@@ -58,7 +58,6 @@ public class UimaProcessContainer extend
// sure that a thread used to initialized the AE is used to call process().
// Some AEs depend on ThreadLocal storage.
UimaAnalysisEngineInstancePoolWithThreadAffinity instanceMap = new UimaAnalysisEngineInstancePoolWithThreadAffinity();
- AnalysisEngineMetaData analysisEngineMetadata;
private static CasPool casPool = null;
AtomicInteger counter = new AtomicInteger();
@@ -147,6 +146,9 @@ public class UimaProcessContainer extend
instanceMap.checkin(ae);
if (instanceMap.size() == scaleout) {
try {
+ Properties props = new Properties();
+ props.setProperty(UIMAFramework.CAS_INITIAL_HEAP_SIZE, "1000");
+
analysisEngineMetadata = ae.getAnalysisEngineMetaData();
casPool = new CasPool(scaleout, analysisEngineMetadata,rm);
latch.countDown();
@@ -189,8 +191,6 @@ public class UimaProcessContainer extend
lastError = null;
XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
// deserialize the CAS
-// uimaSerializer.deserializeCasFromXmi((String) xmi, cas,
-// deserSharedData, true, -1);
super.getUimaSerializer().
deserializeCasFromXmi((String)xmi, cas, deserSharedData, true,-1);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java?rev=1663769&r1=1663768&r2=1663769&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/iface/IProcessContainer.java Tue Mar 3 19:47:51 2015
@@ -70,4 +70,12 @@ public interface IProcessContainer {
* @return
*/
public boolean useThreadAffinity();
+
+ /**
+ *
+ * @return
+ * @throws Exception
+ */
+ public String getKey(String cargo) throws Exception;
+
}