You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/05/18 11:20:04 UTC
svn commit: r1831853 -
/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
Author: cwiklik
Date: Fri May 18 11:20:04 2018
New Revision: 1831853
URL: http://svn.apache.org/viewvc?rev=1831853&view=rev
Log:
UIMA-5756 modified to improve service state change updates
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java?rev=1831853&r1=1831852&r2=1831853&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java Fri May 18 11:20:04 2018
@@ -23,20 +23,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+//import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.uima.UIMAFramework;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
-import org.apache.uima.ducc.ps.service.IScaleable;
import org.apache.uima.ducc.ps.service.IServiceState;
import org.apache.uima.ducc.ps.service.ServiceConfiguration;
+//import org.apache.uima.ducc.ps.service.dgen.DeployableGeneration;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
-import org.apache.uima.ducc.ps.service.jmx.JmxAEProcessInitMonitor;
+//import org.apache.uima.ducc.ps.service.jmx.JmxAEProcessInitMonitor;
import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver;
import org.apache.uima.ducc.ps.service.processor.IProcessResult;
@@ -56,7 +56,7 @@ import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
import org.apache.uima.util.XMLInputSource;
-public class UimaServiceProcessor implements IServiceProcessor, IScaleable {
+public class UimaServiceProcessor implements IServiceProcessor {
public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
// Map to store DuccUimaSerializer instances. Each has affinity to a thread
@@ -66,20 +66,20 @@ public class UimaServiceProcessor implem
// stores AE instance pinned to a thread
private ThreadLocal<AnalysisEngine> threadLocal =
new ThreadLocal<> ();
- private ReentrantLock initStateShutdownLock = new ReentrantLock();
+ private ReentrantLock initStateLock = new ReentrantLock();
+ private boolean sendInitializingState = true;
private ResourceManager rm =
UIMAFramework.newDefaultResourceManager();;
private CasPool casPool = null;
private int scaleout=1;
- private JmxAEProcessInitMonitor initStateMonitor;
private String analysisEngineDescriptor;
private AnalysisEngineMetaData analysisEngineMetadata;
// Platform MBean server if one is available (Java 1.5 only)
private static Object platformMBeanServer;
private ServiceConfiguration serviceConfiguration;
- private ScheduledThreadPoolExecutor executor = null;
private IServiceMonitor monitor;
- private volatile boolean notifyOnRunning = true;
+ private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
+
static {
// try to get platform MBean Server (Java 1.5 only)
try {
@@ -108,20 +108,6 @@ public class UimaServiceProcessor implem
private void launchStateInitializationCollector() {
monitor =
new RemoteStateObserver(serviceConfiguration, logger);
-
-// executor = new ScheduledThreadPoolExecutor(1);
-// executor.prestartAllCoreThreads();
- // Instantiate a UIMA AS jmx monitor to poll for status of the AE.
- // This monitor checks if the AE is initializing or ready.
-// initStateMonitor =
-// new JmxAEProcessInitMonitor(monitor, logger);
- /*
- * This will run UimaAEJmxMonitor every 30
- * seconds with an initial delay of 20 seconds. This monitor polls
- * initialization status of AE.
- */
-// executor.scheduleAtFixedRate(initStateMonitor, 20, 30, TimeUnit.SECONDS);
- monitor.onStateChange(IServiceState.State.Initializing.toString(), new Properties());
}
public void setScaleout(int howManyThreads) {
this.scaleout = howManyThreads;
@@ -136,7 +122,20 @@ public class UimaServiceProcessor implem
logger.log(Level.FINE, "Process Thread:"+ Thread.currentThread().getName()+" Initializing AE");
}
-
+ try {
+ // multiple threads may call this method. Send initializing state once
+ initStateLock.lockInterruptibly();
+ if ( sendInitializingState ) {
+ sendInitializingState = false; // send init state once
+ monitor.onStateChange(IServiceState.State.Initializing.toString(), new Properties());
+ }
+ } catch( Exception e) {
+
+ } finally {
+ initStateLock.unlock();
+ }
+
+
HashMap<String,Object> paramsMap = new HashMap<>();
paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, rm);
paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, platformMBeanServer);
@@ -147,7 +146,7 @@ public class UimaServiceProcessor implem
UimaUtils.getXMLInputSource(analysisEngineDescriptor);
String aed = is.getURL().toString();
ResourceSpecifier rSpecifier =
- UimaUtils.getResourceSpecifier(aed); //analysisEngineDescriptor);
+ UimaUtils.getResourceSpecifier(aed);
AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier,
paramsMap);
@@ -172,6 +171,9 @@ public class UimaServiceProcessor implem
logger.log(Level.INFO, "Process Thread:"+ Thread.currentThread().getName()+" Done Initializing AE");
}
+ if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) {
+ monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
+ }
}
private void initializeCasPool(AnalysisEngineMetaData aeMeta) throws ResourceInitializationException {
@@ -189,37 +191,7 @@ public class UimaServiceProcessor implem
@Override
public IProcessResult process(String serializedTask) {
AnalysisEngine ae = null;
- // Dont publish AE initialization state. We are in running state if
- // process is being called
- try {
- initStateShutdownLock.lockInterruptibly();
- if ( notifyOnRunning ) {
- notifyOnRunning = false;
- monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
- }
- } catch( InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- initStateShutdownLock.unlock();
- }
- /*
- try {
- initStateShutdownLock.lockInterruptibly();
- if ( !executor.isTerminating() && !executor.isTerminated() && !executor.isShutdown() ) {
- // send final AE initialization report before we stop the collecting thread
- initStateMonitor.updateAgentWhenRunning();
- executor.shutdown();
- executor.awaitTermination(0, TimeUnit.SECONDS);
- //monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
- }
-
- } catch( InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- initStateShutdownLock.unlock();
- }
-*/
CAS cas = casPool.getCas();
IProcessResult result;
@@ -287,6 +259,28 @@ public class UimaServiceProcessor implem
logger.log(Level.WARNING, "stop", e);
}
}
-
-
+ /*
+ // Build just an AE from parts and return the filename
+ // (DD's are converted in UimaAsProcessContainer.parseDD)
+ protected String buildDeployable(ServiceConfiguration serviceConfiguration) {
+ try {
+ String jpType = serviceConfiguration.getJpType();//getPropertyString("ducc.deploy.JpType");
+ if(jpType == null) {
+ jpType = "uima";
+ }
+ if(jpType.equalsIgnoreCase("uima-as")) {
+ logger.log(Level.WARNING,"ERROR - should not be called for type="+jpType);
+ }
+ else {
+ DeployableGeneration dg = new DeployableGeneration(serviceConfiguration);
+ return dg.generate(true);
+ }
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ logger.log(Level.WARNING,"buildDeployable",e);
+ }
+ return null;
+ }
+*/
}