You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/05/18 11:20:04 UTC

svn commit: r1831853 - /uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java

Author: cwiklik
Date: Fri May 18 11:20:04 2018
New Revision: 1831853

URL: http://svn.apache.org/viewvc?rev=1831853&view=rev
Log:
UIMA-5756 modified to improve service state change updates

Modified:
    uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java?rev=1831853&r1=1831852&r2=1831853&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java Fri May 18 11:20:04 2018
@@ -23,20 +23,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+//import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.analysis_engine.AnalysisEngine;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
-import org.apache.uima.ducc.ps.service.IScaleable;
 import org.apache.uima.ducc.ps.service.IServiceState;
 import org.apache.uima.ducc.ps.service.ServiceConfiguration;
+//import org.apache.uima.ducc.ps.service.dgen.DeployableGeneration;
 import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
 import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
-import org.apache.uima.ducc.ps.service.jmx.JmxAEProcessInitMonitor;
+//import org.apache.uima.ducc.ps.service.jmx.JmxAEProcessInitMonitor;
 import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
 import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver;
 import org.apache.uima.ducc.ps.service.processor.IProcessResult;
@@ -56,7 +56,7 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 import org.apache.uima.util.XMLInputSource;
 
-public class UimaServiceProcessor implements IServiceProcessor, IScaleable {
+public class UimaServiceProcessor implements IServiceProcessor { 
 	public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
 	Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
    // Map to store DuccUimaSerializer instances. Each has affinity to a thread
@@ -66,20 +66,20 @@ public class UimaServiceProcessor implem
 	// stores AE instance pinned to a thread
 	private ThreadLocal<AnalysisEngine> threadLocal = 
 			new ThreadLocal<> ();
-    private ReentrantLock initStateShutdownLock = new ReentrantLock();
+    private ReentrantLock initStateLock = new ReentrantLock();
+    private boolean sendInitializingState = true;
 	private ResourceManager rm = 
 			UIMAFramework.newDefaultResourceManager();;
     private CasPool casPool = null;
 	private int scaleout=1;
-	private JmxAEProcessInitMonitor initStateMonitor;
     private String analysisEngineDescriptor;
     private AnalysisEngineMetaData analysisEngineMetadata;
 	// Platform MBean server if one is available (Java 1.5 only)
 	private static Object platformMBeanServer;
 	private ServiceConfiguration serviceConfiguration;
-	private ScheduledThreadPoolExecutor executor = null;
 	private IServiceMonitor monitor;
-	private volatile boolean notifyOnRunning = true;
+	private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
+	
 	static {
 		// try to get platform MBean Server (Java 1.5 only)
 		try {
@@ -108,20 +108,6 @@ public class UimaServiceProcessor implem
 	private void launchStateInitializationCollector() {
 		monitor =
 				new RemoteStateObserver(serviceConfiguration, logger);
-		
-//		executor = new ScheduledThreadPoolExecutor(1);
-//		executor.prestartAllCoreThreads();
-		// Instantiate a UIMA AS jmx monitor to poll for status of the AE.
-		// This monitor checks if the AE is initializing or ready.
-//		initStateMonitor = 
-//				new JmxAEProcessInitMonitor(monitor, logger);
-		/*
-		 * This will run UimaAEJmxMonitor every 30
-		 * seconds with an initial delay of 20 seconds. This monitor polls
-		 * initialization status of AE.
-		 */
-//		executor.scheduleAtFixedRate(initStateMonitor, 20, 30, TimeUnit.SECONDS);
-		monitor.onStateChange(IServiceState.State.Initializing.toString(), new Properties());
 	}
 	public void setScaleout(int howManyThreads) {
 		this.scaleout = howManyThreads;
@@ -136,7 +122,20 @@ public class UimaServiceProcessor implem
 			logger.log(Level.FINE, "Process Thread:"+ Thread.currentThread().getName()+" Initializing AE");
 			
 		}
-		
+		try {
+			// multiple threads may call this method. Send initializing state once
+			initStateLock.lockInterruptibly();
+			if ( sendInitializingState ) {
+				sendInitializingState = false; // send init state once
+				monitor.onStateChange(IServiceState.State.Initializing.toString(), new Properties());
+			}
+		} catch( Exception e) {
+			
+		} finally {
+			initStateLock.unlock();
+		}
+
+
 	    HashMap<String,Object> paramsMap = new HashMap<>();
         paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, rm);
 	    paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, platformMBeanServer);
@@ -147,7 +146,7 @@ public class UimaServiceProcessor implem
 					UimaUtils.getXMLInputSource(analysisEngineDescriptor);
 			String aed = is.getURL().toString();
 			ResourceSpecifier rSpecifier =
-			    UimaUtils.getResourceSpecifier(aed); //analysisEngineDescriptor);
+			    UimaUtils.getResourceSpecifier(aed);
 			
 			AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier,
 					paramsMap);
@@ -172,6 +171,9 @@ public class UimaServiceProcessor implem
 			logger.log(Level.INFO, "Process Thread:"+ Thread.currentThread().getName()+" Done Initializing AE");
 			
 		}
+		if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) {
+			monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
+		}
 	}
 
 	private void initializeCasPool(AnalysisEngineMetaData aeMeta) throws ResourceInitializationException {
@@ -189,37 +191,7 @@ public class UimaServiceProcessor implem
 	@Override
 	public IProcessResult process(String serializedTask) {
 		AnalysisEngine ae = null;
-		// Dont publish AE initialization state. We are in running state if
-		// process is being called
-		try {
-			initStateShutdownLock.lockInterruptibly();
-			if ( notifyOnRunning ) {
-				notifyOnRunning = false;
-				monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
-			}
 
-		} catch( InterruptedException e) {
-			Thread.currentThread().interrupt();
-		} finally {
-			initStateShutdownLock.unlock();
-		}
-		/*
-		try {
-			initStateShutdownLock.lockInterruptibly();
-			if ( !executor.isTerminating() && !executor.isTerminated() && !executor.isShutdown() ) {
-				// send final AE initialization report before we stop the collecting thread
-				initStateMonitor.updateAgentWhenRunning();
-				executor.shutdown();
-				executor.awaitTermination(0, TimeUnit.SECONDS);
-		//monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
-			}
-
-		} catch( InterruptedException e) {
-			Thread.currentThread().interrupt();
-		} finally {
-			initStateShutdownLock.unlock();
-		}
-*/		
 		CAS cas = casPool.getCas();
 		IProcessResult result;
 		
@@ -287,6 +259,28 @@ public class UimaServiceProcessor implem
 			logger.log(Level.WARNING, "stop", e);
 		} 
 	}
-	
-
+	/*
+	  // Build just an AE from parts and return the filename
+	  // (DD's are converted in UimaAsProcessContainer.parseDD)
+	  protected String buildDeployable(ServiceConfiguration serviceConfiguration) {
+		  try {
+			  String jpType = serviceConfiguration.getJpType();//getPropertyString("ducc.deploy.JpType");
+			  if(jpType == null) {
+				  jpType = "uima";
+			  }
+			  if(jpType.equalsIgnoreCase("uima-as")) {
+				  logger.log(Level.WARNING,"ERROR - should not be called for type="+jpType);
+			  }
+			  else {
+				  DeployableGeneration dg = new DeployableGeneration(serviceConfiguration);
+				  return dg.generate(true);
+			  }
+		  }
+		  catch(Exception e) {
+			  e.printStackTrace();
+			  logger.log(Level.WARNING,"buildDeployable",e);
+		  }
+		  return null;
+	  }
+*/
 }