You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/06/05 21:06:58 UTC
svn commit: r1683847 - in /uima/uima-as/trunk:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-core/src/main/java/org/apache/uima/aae/
uimaj-as-core/src/main/java/org/apache/uima/aae/controller/
uimaj-as-core/src/main/jav...
Author: cwiklik
Date: Fri Jun 5 19:06:57 2015
New Revision: 1683847
URL: http://svn.apache.org/r1683847
Log:
UIMA-4265 Implemented support for warming up the pipeline right after the initialization and before opening jms listeners
Added:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java (with props)
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Jun 5 19:06:57 2015
@@ -547,7 +547,9 @@ public class UimaDefaultMessageListenerC
failed = true;
}
}
-
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
private void terminate(Throwable t) {
// ****************************************
// terminate the service
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Fri Jun 5 19:06:57 2015
@@ -525,7 +525,12 @@ public class InProcessCache implements I
public static class CacheEntry {
public static final int FINAL_STATE = 1;
- private CAS cas;
+ private volatile boolean warmUp = false;
+
+ private volatile boolean failed = false;;
+
+
+ private CAS cas;
// the following is set to true if the CAS has been created by CAS Multiplier
// This flag is used to determine if the CAS should be output to client.
@@ -615,10 +620,25 @@ public class InProcessCache implements I
public Semaphore getThreadCompletionSemaphore() {
return threadCompletionSemaphore;
}
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public void setFailed(boolean failed) {
+ this.failed = failed;
+ Thread.dumpStack();
+ }
public void setThreadCompletionSemaphore(Semaphore threadCompletionSemaphore) {
this.threadCompletionSemaphore = threadCompletionSemaphore;
}
+ public boolean isWarmUp() {
+ return warmUp;
+ }
+
+ public void setWarmUp(boolean warmUp) {
+ this.warmUp = warmUp;
+ }
// never called 5/2013 was for XCAS
// protected CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor,
// OutOfTypeSystemData aotsd) {
Added: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java?rev=1683847&view=auto
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java (added)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java Fri Jun 5 19:06:57 2015
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.TypeSystem;
+import org.apache.uima.cas.impl.Serialization;
+import org.apache.uima.cas.impl.XmiCasDeserializer;
+import org.apache.uima.resource.metadata.TypeSystemDescription;
+import org.apache.uima.util.CasCreationUtils;
+import org.apache.uima.util.XMLInputSource;
+import org.apache.uima.util.XMLParser;
+
+public class WarmUpDataProvider {
+ InputChannel inputChannel;
+ AnalysisEngineController controller;
+ private String inputFileName;
+ private FileInputStream fis;
+ private ZipInputStream zis;
+ private ZipEntry nextEntry;
+ private int docSeq;
+ private boolean readingXmiFormat;
+ private TypeSystem inputTS;
+
+ public static void main(String[] args) {
+
+ }
+
+ public WarmUpDataProvider(String inputFileName)
+ throws IOException {
+ this.inputFileName = inputFileName;
+ fis = new FileInputStream(new File(inputFileName));
+ zis = new ZipInputStream(new BufferedInputStream(fis, 1024 * 100));
+ docSeq = 0;
+ }
+
+ public boolean hasNext() throws AnalysisEngineProcessException {
+ try {
+ nextEntry = zis.getNextEntry();
+ } catch (IOException e) {
+ throw new AnalysisEngineProcessException(e);
+ }
+ return (nextEntry != null) ? true : false;
+ }
+
+ public CAS next(CAS cas) throws Exception {
+ if (0 == docSeq) {
+ if (nextEntry.getName().equals("typesystem.xml")) {
+ getTypesystem();
+ readingXmiFormat = false;
+ } else {
+ readingXmiFormat = true;
+ }
+ } else {
+ if (nextEntry.getName().equals("typesystem.xml")) {
+ throw new AnalysisEngineProcessException(new RuntimeException(
+ "typesystem.xml entry found in the middle of input zipfile "
+ + inputFileName));
+ }
+ }
+ byte[] buff = new byte[10000];
+ int bytesread;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ while (-1 != (bytesread = zis.read(buff))) {
+ baos.write(buff, 0, bytesread);
+ }
+ ByteArrayInputStream bis = new ByteArrayInputStream(
+ baos.toByteArray());
+ if (readingXmiFormat) {
+ XmiCasDeserializer.deserialize(bis, cas, true);
+ } else {
+ Serialization.deserializeCAS(cas, bis, inputTS, null);
+ }
+ } catch (Exception e) {
+ throw new AnalysisEngineProcessException(e);
+ }
+ docSeq++;
+ return cas;
+ }
+
+ private void getTypesystem() throws AnalysisEngineProcessException {
+ byte[] buff = new byte[10000];
+ int bytesread;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ while (-1 != (bytesread = zis.read(buff))) {
+ baos.write(buff, 0, bytesread);
+ }
+ ByteArrayInputStream bis = new ByteArrayInputStream(
+ baos.toByteArray());
+ // Get XML parser from framework
+ XMLParser xmlParser = UIMAFramework.getXMLParser();
+ // Parse type system descriptor
+ TypeSystemDescription tsDesc = xmlParser
+ .parseTypeSystemDescription(new XMLInputSource(
+ (InputStream) bis, null));
+ // Use type system description to create CAS and get the type system
+ // object
+ inputTS = CasCreationUtils.createCas(tsDesc, null, null)
+ .getTypeSystem();
+ // advance to first input CAS
+ nextEntry = zis.getNextEntry();
+ } catch (Exception e) {
+ throw new AnalysisEngineProcessException(e);
+ }
+ }
+
+}
Propchange: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Jun 5 19:06:57 2015
@@ -30,7 +30,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMARuntimeException;
@@ -39,8 +42,10 @@ import org.apache.uima.aae.InProcessCach
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.AsynchAECasManager_impl;
import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.UIDGenerator;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.WarmUpDataProvider;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.ControllerDelegate;
import org.apache.uima.aae.delegate.Delegate;
@@ -174,6 +179,8 @@ public class AggregateAnalysisEngineCont
// for a free CAS from the CasPool.
public Semaphore semaphore = null;
+ private Lock mergeLock = new ReentrantLock();
+
/**
*
* @param anEndpointName
@@ -1019,7 +1026,7 @@ public class AggregateAnalysisEngineCont
// CASes will be dropped in finalStep() as they come back from delegates. When all are
// accounted for and dropped, the parent CAS will be returned back to the client
// with an exception.
- if (parentCasStateEntry.isFailed()) {
+ if (parentCasStateEntry != null && parentCasStateEntry.isFailed()) {
// Fetch Delegate object for the CM that produced the CAS. The producer key
// is associated with a cache entry in the ProcessRequestHandler. Each new CAS
// must have a key of a CM that produced it.
@@ -1090,7 +1097,6 @@ public class AggregateAnalysisEngineCont
return false;
}
-
/**
* This is a process method that is executed for CASes not created by a Multiplier in this
* aggregate.
@@ -1703,7 +1709,6 @@ public class AggregateAnalysisEngineCont
}
return;
}
-
// Found entries in caches for a given CAS id
try {
endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
@@ -1715,8 +1720,6 @@ public class AggregateAnalysisEngineCont
return;
}
- // System.out.println(" ---- Controller::"+getComponentName()+" CAS:"+casStateEntry.getCasReferenceId()+ " Has Children:"+casHasChildrenInPlay(casStateEntry)+" Parent::"+casStateEntry.getInputCasReferenceId());
-
// Check if this CAS has children that are still being processed in this aggregate
if (casHasChildrenInPlay(casStateEntry)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -1734,6 +1737,22 @@ public class AggregateAnalysisEngineCont
casStateEntry.waitingForChildrenToFinish(true);
return;
}
+
+ // check if this is a warm up CAS. Such CAS is internally created with
+ // a main purpose of warming up analytics.
+ if ( isTopLevelComponent() && cacheEntry.isWarmUp()) {
+ if ( cacheEntry.getThreadCompletionSemaphore() != null ) {
+ cacheEntry.getThreadCompletionSemaphore().release();
+ }
+ // we are in the warm up state which means the pipelines have been initialized
+ // and we are sending CASes to warm up/prime the analytics. Since we
+ // reached the end of the flow here, just return now. There is nothing
+ // else to do with the CAS.
+ return;
+
+ }
+
+
casStateEntry.waitingForChildrenToFinish(false);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -1792,7 +1811,8 @@ public class AggregateAnalysisEngineCont
// Send a reply to the Client. If the CAS is an input CAS it will be dropped
cEndpoint = replyToClient(cacheEntry, casStateEntry);
replySentToClient = true;
- if (cEndpoint.isRemote()) {
+
+ if (cEndpoint != null && cEndpoint.isRemote()) {
// if this service is a Cas Multiplier don't remove the CAS. It will be removed
// when a remote client sends explicit Release CAS Request
if (!isCasMultiplier()) {
@@ -2664,9 +2684,10 @@ public class AggregateAnalysisEngineCont
mergeTypeSystem(aTypeSystem, fromDestination, null);
}
- public synchronized void mergeTypeSystem(String aTypeSystem, String fromDestination,
+// public synchronized void mergeTypeSystem(String aTypeSystem, String fromDestination,
+ public void mergeTypeSystem(String aTypeSystem, String fromDestination,
String fromServer) throws AsynchAEException {
-
+ mergeLock.lock();
try {
// Find the endpoint for this service, given its input queue name and broker URI.
// We now allow endpoints managed by different servers to have the same queue name.
@@ -2792,7 +2813,10 @@ public class AggregateAnalysisEngineCont
}
} catch (Exception e) {
throw new AsynchAEException(e);
+ } finally {
+ mergeLock.unlock();
}
+
}
public static TypeSystemImpl getTypeSystemImpl(ProcessingResourceMetaData resource) throws ResourceInitializationException {
@@ -2809,7 +2833,7 @@ public class AggregateAnalysisEngineCont
return ((CASImpl) casMgr).getTypeSystemImpl();
}
- private synchronized void completeInitialization() throws Exception {
+ private void completeInitialization() throws Exception {
if (initialized) {
return;
}
@@ -2856,22 +2880,35 @@ public class AggregateAnalysisEngineCont
"completeInitialization", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_initialized_controller__INFO", new Object[] { getComponentName() });
}
-
- // Open latch to allow messages to be processed. The
- // latch was closed to prevent messages from entering
- // the controller before it is initialized.
- latch.openLatch(getName(), isTopLevelComponent(), true);
- initialized = true;
- // Notify client listener that the initialization of the controller was successfull
- notifyListenersWithInitializationStatus(null);
- // If this is a collocated aggregate change its state to RUNNING from INITIALIZING.
- // The top level aggregate state is changed when listeners on its input queue are
- // succesfully started in SpringContainerDeployer.doStartListeners() method.
- if ( !isTopLevelComponent() ) {
- changeState(ServiceState.RUNNING);
+ String warmUpDataPath=null;
+ // start warm up engine which will prime pipeline analytics
+ if ( isTopLevelComponent() && (warmUpDataPath = System.getProperty("WarmUpDataPath")) != null ) {
+// getInputChannel().startTempListeners();
+ CountDownLatch warmUpLatch = new CountDownLatch(1);
+ super.warmUp(warmUpDataPath,warmUpLatch);
+ //warmUpLatch.await();
+ } else {
+ startProcessing();
}
}
+ protected void startProcessing() throws Exception {
+
+ // Open latch to allow messages to be processed. The
+ // latch was closed to prevent messages from entering
+ // the controller before it is initialized.
+ latch.openLatch(getName(), isTopLevelComponent(), true);
+ initialized = true;
+ // Notify client listener that the initialization of the controller was successfull
+ notifyListenersWithInitializationStatus(null);
+ // If this is a collocated aggregate change its state to RUNNING from INITIALIZING.
+ // The top level aggregate state is changed when listeners on its input queue are
+ // succesfully started in SpringContainerDeployer.doStartListeners() method.
+ if ( !isTopLevelComponent() ) {
+ changeState(ServiceState.RUNNING);
+ }
+
+ }
private String findKeyForValue(String fromDestination) {
Set set = destinationMap.entrySet();
@@ -3288,4 +3325,39 @@ public class AggregateAnalysisEngineCont
public int getServiceCasPoolSize() {
return ((AsynchAECasManager_impl)casManager).getCasPoolSize();
}
+ protected void doWarmUp(CAS cas, String casReferenceId) throws Exception {
+ long processTime = 0;
+ Semaphore ts = new Semaphore(0);
+ try {
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "doWarmUp",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_warmup_start_cas__FINE",
+ new Object[] { casReferenceId});
+ }
+ CacheEntry entry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ entry.setThreadCompletionSemaphore(ts);
+ long t1 = System.currentTimeMillis();
+ process(cas, casReferenceId);
+ // wait until the CAS reaches final step
+ ts.acquire();
+ processTime = System.currentTimeMillis() - t1;
+ CasStateEntry cse = getLocalCache().lookupEntry(casReferenceId);
+ } catch( Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "doWarmUp", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_warmup_failed_WARNING", getComponentName());
+ }
+ throw e;
+ } finally {
+ dropCAS(casReferenceId, true);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "doWarmUp",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_warmup_dropping_cas__FINE",
+ new Object[] { casReferenceId, processTime});
+ }
+
+ }
+ }
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Fri Jun 5 19:06:57 2015
@@ -20,6 +20,7 @@
package org.apache.uima.aae.controller;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import org.apache.uima.UimaContext;
import org.apache.uima.aae.AsynchAECasManager;
@@ -230,4 +231,6 @@ public interface AnalysisEngineControlle
* @return
*/
public String getPID();
+
+ public void warmUp(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception;
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Fri Jun 5 19:06:57 2015
@@ -54,10 +54,12 @@ import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+import org.apache.uima.aae.UIDGenerator;
import org.apache.uima.aae.UimaAsContext;
import org.apache.uima.aae.UimaAsVersion;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.UimaEEAdminContext;
+import org.apache.uima.aae.WarmUpDataProvider;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.AsynchAEException;
@@ -256,6 +258,8 @@ public abstract class BaseAnalysisEngine
private String serviceName=null;
public abstract void dumpState(StringBuffer buffer, String lbl1);
+
+ protected abstract void doWarmUp(CAS cas, String casReferenceId) throws Exception;
public BaseAnalysisEngineController() {
@@ -1168,7 +1172,7 @@ public abstract class BaseAnalysisEngine
}
if (!isStopped()) {
Endpoint endpoint = (Endpoint) anErrorContext.get(AsynchAEMessage.Endpoint);
- if ( endpoint != null ) {
+ if ( endpoint != null && !"WarmupDelegate".equals(endpoint.getDelegateKey() ) ) {
getOutputChannel().sendReply((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR),
casReferenceId, parentCasReferenceId,
endpoint, AsynchAEMessage.Process);
@@ -2948,4 +2952,80 @@ public abstract class BaseAnalysisEngine
public Map<String,String> getDeadClientMap() {
return deadClientDestinationMap;
}
+ public void warmUp(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception {
+ if ( isPrimitive() ) {
+ runWarmup(warmUpDataPath, warmUpLatch);
+ } else {
+ asyncWarmup(warmUpDataPath, warmUpLatch);
+ }
+ }
+
+ private void runWarmup(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception {
+ long warmupStartTime = 0;
+ long warmupCasCount=0;
+ CAS cas = null;
+ boolean isException = false;
+ if ( isTopLevelComponent() ) {
+ try {
+ warmupStartTime = System.currentTimeMillis();
+ WarmUpDataProvider wdp = new WarmUpDataProvider(warmUpDataPath);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "runWarmup", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_warmup_start_INFO", new Object[] { getComponentName(), Thread.currentThread().getId() });
+ }
+
+ while( wdp.hasNext() && !isStopped()) {
+ cas = getCasManagerWrapper().getNewCas();
+ wdp.next(cas);
+ warmupCasCount++;
+ UIDGenerator idGenerator = new UIDGenerator();
+ String casReferenceId = idGenerator.nextId();
+
+ CasStateEntry cse = getLocalCache().createCasStateEntry(casReferenceId);
+ CacheEntry entry = getInProcessCache().register(cas, null, null, null,
+ casReferenceId, null, false);
+ entry.setWarmUp(true);
+ // delegate execution to the controller (primitive or aggregate)
+ doWarmUp(cas, casReferenceId);
+
+ }
+ // }
+
+ } catch( Exception e) {
+ isException = true;
+ throw e;
+ }
+ finally {
+ if ( !isException ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "runWarmup", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_warmup_success_INFO", new Object[] { getComponentName(), Thread.currentThread().getId(), warmupCasCount, (System.currentTimeMillis()-warmupStartTime)/1000 });
+ }
+ if ( this instanceof AggregateAnalysisEngineController ) {
+ ((AggregateAnalysisEngineController_impl)this).startProcessing();
+ }
+ }
+ }
+
+ }
+ warmUpLatch.countDown();
+ }
+ private void asyncWarmup(final String warmUpDataPath, final CountDownLatch warmUpLatch) throws Exception {
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ runWarmup(warmUpDataPath, warmUpLatch);
+ } catch( Exception e) {
+ //e.printStackTrace();
+ notifyListenersWithInitializationStatus(new RuntimeException(e));
+ }
+ }
+
+ });
+ t.start();
+
+ }
+
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Fri Jun 5 19:06:57 2015
@@ -185,8 +185,7 @@ public class LocalCache extends Concurre
}
public static class CasStateEntry {
-
- private String casReferenceId;
+ private String casReferenceId;
private volatile boolean waitingForChildren; // true if in FinalState and still has children in play
@@ -488,5 +487,8 @@ public class LocalCache extends Concurre
public List<AnalysisEnginePerformanceMetrics> getAEPerformanceList() {
return performanceList;
}
+
+
+
}
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Fri Jun 5 19:06:57 2015
@@ -19,6 +19,7 @@
package org.apache.uima.aae.controller;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.text.NumberFormat;
@@ -28,6 +29,7 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import org.apache.uima.UIMAFramework;
@@ -53,12 +55,14 @@ import org.apache.uima.analysis_engine.A
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.analysis_engine.AnalysisEngineManagement;
import org.apache.uima.analysis_engine.CasIterator;
+import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.cas.impl.OutOfTypeSystemData;
import org.apache.uima.collection.CollectionReaderDescription;
import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.metadata.ConfigurationParameter;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
@@ -73,6 +77,7 @@ public class PrimitiveAnalysisEngineCont
private static final Class CLASS_NAME = PrimitiveAnalysisEngineController_impl.class;
private static final String DUMP_HEAP_THRESHOLD = "dumpHeapThreshold";
+ private volatile boolean casPoolInited = false;
// Stores AE metadata
private AnalysisEngineMetaData analysisEngineMetadata;
@@ -193,8 +198,28 @@ public class PrimitiveAnalysisEngineCont
sharedInitSemaphore.acquire();
// Parse the descriptor in the calling thread.
rSpecifier = UimaClassFactory.produceResourceSpecifier(super.aeDescriptor);
+/*
+ if ( rSpecifier instanceof AnalysisEngineDescription ) {
+ String name = ((AnalysisEngineDescription)rSpecifier).getAnalysisEngineMetaData().getName();
+ ((AnalysisEngineDescription)rSpecifier).getAnalysisEngineMetaData().setName(name+"-"+Thread.currentThread().getId());
+ // System.out.println(getUimaContextAdmin().);
+ Field f =getUimaContextAdmin().getClass().getDeclaredField("mQualifiedContextName");//getQualifiedContextName()
+ f.setAccessible(true);
+
+ f.get(getUimaContextAdmin().getQualifiedContextName()+Thread.currentThread().getId());
+ }
+ */
+ //paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, String.valueOf(Thread.currentThread().getId()));
+ //String p = (String)paramsMap.get(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX);// +"-"+Thread.currentThread().getId();
+ //p = p.substring(0, p.lastIndexOf(","))+" "+Thread.currentThread().getId()+",";
+ //paramsMap.remove(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX);
+ //paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, p);
AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, paramsMap);
- // Call to produceAnalysisEngine() may take a long time to complete. While this
+
+ //AnalysisEngineManagementImpl aemi = (AnalysisEngineManagementImpl)ae.getManagementInterface();
+ //System.out.println("..... Created AE instance - Mgmt Instance Hashcode:"+aemi.hashCode()+" Unique MBean Name:"+aemi.getUniqueMBeanName());
+ // ae.getManagementInterface().getClass().
+ // Call to produceAnalysisEngine() may take a long time to complete. While this
// method was executing, the service may have been stopped. Before continuing
// check if the service has been stopped. If so, destroy AE instance and return.
if ( isStopped() ) {
@@ -216,6 +241,54 @@ public class PrimitiveAnalysisEngineCont
"UIMAEE_multiple_deployment_not_allowed__WARNING", new Object[] {this.getComponentName(), ae.getMetaData().getName()});
}
aeInstancePool.checkin(ae);
+
+ if (!isStopped() && !casPoolInited) {
+ casPoolInited = true;
+
+ if (errorHandlerChain == null) {
+ super.plugInDefaultErrorHandlerChain();
+ }
+
+ getMonitor().setThresholds(getErrorHandlerChain().getThresholds());
+ // Initialize Cas Manager
+ if (getCasManagerWrapper() != null) {
+ try {
+ // Below should always be true. In spring context file AsynchAECasManager_impl
+ // is instantiated and setCasPoolSize() method is called which sets the
+ // initialized state = true. isInitialized() returning true just means that
+ // setCasPoolSize() was called.
+ if (getCasManagerWrapper().isInitialized()) {
+ getCasManagerWrapper().addMetadata(getAnalysisEngineMetadata());
+ if (isTopLevelComponent()) {
+ getCasManagerWrapper().initialize("PrimitiveAEService");
+ CAS cas = getCasManagerWrapper().getNewCas("PrimitiveAEService");
+ cas.release();
+ }
+ }
+ } catch( Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "postInitialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", getComponentName());
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "postInitialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
+ throw new AsynchAEException(e);
+ }
+ }
+ }
+
+
+ String warmUpDataPath=null;
+ // start warm up engine which will prime pipeline analytics
+ if ( isTopLevelComponent() && (warmUpDataPath = System.getProperty("WarmUpDataPath")) != null ) {
+ CountDownLatch warmUpLatch = new CountDownLatch(1);
+ warmUp(warmUpDataPath, warmUpLatch);
+ warmUpLatch.await();
+ }
+
if (aeInstancePool.size() == analysisEnginePoolSize) {
try {
postInitialize();
@@ -299,6 +372,7 @@ public class PrimitiveAnalysisEngineCont
serviceInfo.setAnalysisEngineInstanceCount(analysisEnginePoolSize);
if (!isStopped()) {
+ /*
getMonitor().setThresholds(getErrorHandlerChain().getThresholds());
// Initialize Cas Manager
if (getCasManagerWrapper() != null) {
@@ -315,6 +389,9 @@ public class PrimitiveAnalysisEngineCont
cas.release();
}
}
+ */
+ if (getCasManagerWrapper() != null) {
+ try {
if (isTopLevelComponent()) {
// add delay to allow controller listener to plug itself in
synchronized(this) {
@@ -551,6 +628,7 @@ public class PrimitiveAnalysisEngineCont
}
private String produceUniqueName(AnalysisEngineManagement aem) {
+// System.out.println(">>>>>>>>>>>>>>>>>>> Thread:"+Thread.currentThread().getId()+" MBean:"+aem.getUniqueMBeanName());
String[] parts = aem.getUniqueMBeanName().split(",");
StringBuffer sb = new StringBuffer();
for( String part : parts) {
@@ -574,6 +652,8 @@ public class PrimitiveAnalysisEngineCont
sb.append("/").append(part.substring(part.trim().indexOf("=")+1));
}
}
+ // System.out.println("<<<<<<<<<<<<<<<<<<< Thread:"+Thread.currentThread().getId()+" MBean:"+sb.toString());
+
return sb.toString();
}
@@ -682,6 +762,7 @@ public class PrimitiveAnalysisEngineCont
AnalysisEngineManagement rootAem = ae.getManagementInterface();
+ //System.out.println("%%%%%%%%%%%%%%%%%%%% Unique MBean Name:"+rootAem.getUniqueMBeanName()+" AE Instance Hashcode"+ae.hashCode());
if ( rootAem.getComponents().size() > 0 ) {
getLeafManagementObjects(rootAem, beforeAnalysisManagementObjects);
} else {
@@ -866,6 +947,27 @@ public class PrimitiveAnalysisEngineCont
// Increment number of CASes processed by this service
sequence++;
}
+ try {
+ CacheEntry cacheEntry =
+ getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+
+ if ( isTopLevelComponent() && cacheEntry.isWarmUp() && "WarmupDelegate".equals(anEndpoint.getDelegateKey())) {
+ localCache.lookupEntry(newEntry.getCasReferenceId()).setDropped(true);
+ localCache.remove(newEntry.getCasReferenceId());
+ // Remove Stats from the global Map associated with the new CAS
+ // These stats for this CAS were added to the response message
+ // and are no longer needed
+ dropCasStatistics(newEntry.getCasReferenceId());
+
+ return;
+ }
+
+ } catch( Exception exx ) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", exx);
+ }
+
if (!anEndpoint.isRemote()) {
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
@@ -932,6 +1034,24 @@ public class PrimitiveAnalysisEngineCont
new Object[] { Thread.currentThread().getName(), getComponentName(),
aCasReferenceId, (double) (super.getCpuTime() - time) / (double) 1000000 });
}
+
+
+ // check if this is a warm up CAS. Such CAS is internally created with
+ // a main purpose of warming up analytics.
+ CacheEntry cacheEntry =
+ getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+
+ if ( isTopLevelComponent() && cacheEntry.isWarmUp()) {
+// if ( cacheEntry.getThreadCompletionSemaphore() != null ) {
+// cacheEntry.getThreadCompletionSemaphore().release();
+// }
+ // we are in the warm state which means the pipelines have been initialized
+ // and we are sending CASes to warm up/prime the analytics. Since we
+ // reached the end of the flow here, just return now. There is nothing
+ // else to do with the CAS.
+ return;
+ }
+
getMonitor().resetCountingStatistic("", Monitor.ProcessErrorCount);
// Set total number of children generated from this CAS
// Store total time spent processing this input CAS
@@ -971,8 +1091,8 @@ public class PrimitiveAnalysisEngineCont
after.getUniqueName(),
after.getAnalysisTime()- before.getAnalysisTime(),
after.getNumProcessed());
- //System.out.println("********************"+metrics.getUniqueName());
- // System.out.println("********************"+metrics.getName());
+ // System.out.println("********************"+metrics.getUniqueName()+" Analysis Time:"+metrics.getAnalysisTime());
+ //System.out.println("********************"+metrics.getName()+" Analysis Time:"+metrics.getAnalysisTime());
performanceList.add(metrics);
break;
}
@@ -1125,19 +1245,20 @@ public class PrimitiveAnalysisEngineCont
public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException {
if ( ((ProcessingResourceMetaData) getAnalysisEngineMetadata())
- .getConfigurationParameterSettings().getParameterValue(
- AnalysisEngineController.AEInstanceCount) == null ) {
- addConfigIntParameter(AnalysisEngineController.AEInstanceCount, analysisEnginePoolSize);
+ .getConfigurationParameterSettings().getParameterValue(
+ AnalysisEngineController.AEInstanceCount) == null ) {
+ addConfigIntParameter(AnalysisEngineController.AEInstanceCount, analysisEnginePoolSize);
}
- if (getAnalysisEngineMetadata().getOperationalProperties().getOutputsNewCASes()) {
- if ( ((ProcessingResourceMetaData) getAnalysisEngineMetadata())
- .getConfigurationParameterSettings().getParameterValue(
- AnalysisEngineController.CasPoolSize) == null ) {
- addConfigIntParameter(AnalysisEngineController.CasPoolSize, super.componentCasPoolSize);
- }
- }
- super.sendMetadata(anEndpoint, getAnalysisEngineMetadata());
+ if (getAnalysisEngineMetadata().getOperationalProperties().getOutputsNewCASes()) {
+ if ( ((ProcessingResourceMetaData) getAnalysisEngineMetadata())
+ .getConfigurationParameterSettings().getParameterValue(
+ AnalysisEngineController.CasPoolSize) == null ) {
+ addConfigIntParameter(AnalysisEngineController.CasPoolSize, super.componentCasPoolSize);
+ }
+
+ }
+ super.sendMetadata(anEndpoint, getAnalysisEngineMetadata());
}
private AnalysisEngineMetaData getAnalysisEngineMetadata() {
@@ -1356,4 +1477,50 @@ public class PrimitiveAnalysisEngineCont
public void dumpState(StringBuffer buffer, String lbl1) {
buffer.append(getComponentName()+" State:"+getState());
}
+
+ protected void doWarmUp(CAS cas, String casReferenceId) throws Exception {
+ long processTime = 0;
+ try {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "doWarmUp",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_warmup_start_cas__FINE",
+ new Object[] { casReferenceId});
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ Endpoint endpoint = new Endpoint_impl();
+ // set fake delegate key. This will be checked in process() method
+ // to make sure we are not attempting sending the CAS to a reply queue.
+ // Warm up simulates incoming CASes but there is no client to reply to.
+ endpoint.setDelegateKey("WarmupDelegate");
+
+ process(cas, casReferenceId, endpoint);
+ processTime = System.currentTimeMillis() - t1;
+
+// CacheEntry entry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
+// if ( entry != null && entry.isFailed()) {
+// if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+// UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+// "doWarmUp", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+// "UIMAEE_service_warmup_failed_WARNING", getComponentName());
+// }
+// throw new ResourceInitializationException();
+// }
+ } catch( Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "doWarmUp", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_warmup_failed_WARNING", getComponentName());
+ }
+ throw e;
+ } finally {
+ dropCAS(casReferenceId, true);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "doWarmUp",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_warmup_dropping_cas__FINE",
+ new Object[] { casReferenceId, processTime});
+ }
+ }
+ }
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java Fri Jun 5 19:06:57 2015
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -844,6 +845,12 @@ public class UimacppServiceController ex
// TODO Auto-generated method stub
return null;
}
+
+@Override
+public void warmUp(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception {
+ // TODO Auto-generated method stub
+
+}
}
/**
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Fri Jun 5 19:06:57 2015
@@ -104,6 +104,11 @@ public class ProcessCasErrorHandler exte
private void sendExceptionToClient(Throwable t, String aCasReferenceId, Endpoint anEndpoint,
AnalysisEngineController aController) throws Exception {
+ // When warming up the pipeline there is no client. CASes are
+ // created by the controller itself. Just return in this case.
+ if ( anEndpoint != null && "WarmupDelegate".equals(anEndpoint.getDelegateKey())) {
+ return;
+ }
// Notify the parent of the exception
if (anEndpoint != null && aCasReferenceId != null && !anEndpoint.isCasMultiplier()) {
try {
@@ -384,6 +389,9 @@ public class ProcessCasErrorHandler exte
anErrorContext.add(AsynchAEMessage.SkipPendingLists, "true");
}
if (ErrorHandler.TERMINATE.equalsIgnoreCase(threshold.getAction())) {
+ if ( aController.isTopLevelComponent() && t instanceof Exception ) {
+ aController.notifyListenersWithInitializationStatus((Exception)t);
+ }
anErrorContext.add(ErrorContext.THROWABLE_ERROR, t);
if (casReferenceId != null) {
try {
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Fri Jun 5 19:06:57 2015
@@ -453,9 +453,10 @@ public class ProcessRequestHandler_impl
// we dont want to send a generated CAS back to the CM, override
// with an endpoint provided by the client of
// this service. Client endpoint is attached to an input Cas cache entry.
- aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
- aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
-
+ if ( replyToEndpoint != null ) {
+ aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
+ aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
+ }
// Before sending a CAS to Cas Multiplier, the aggregate has
// saved the CM key in the CAS cache entry. Fetch the key
// of the CM so that we can ask the right Shadow Cas Pool for
@@ -1021,7 +1022,18 @@ public class ProcessRequestHandler_impl
int payload = messageContext.getMessageIntProperty(AsynchAEMessage.Payload);
int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command);
- getController().getControllerLatch().waitUntilInitialized();
+ CacheEntry ce = null;
+ if (AsynchAEMessage.CASRefID == payload) {
+ String cid = null;
+ // Fetch id of the CAS from the message.
+ if ((cid = getCasReferenceId(messageContext)) == null) {
+ return; // Invalid message. Nothing to do
+ }
+ ce = getController().getInProcessCache().getCacheEntryForCAS(cid);
+ }
+ if ( ce == null || !ce.isWarmUp() ) {
+ getController().getControllerLatch().waitUntilInitialized();
+ }
// If a Process Request, increment number of CASes processed
if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Fri Jun 5 19:06:57 2015
@@ -18,6 +18,7 @@
UIMAEE_service_id_INFO = Starting Uima EE Service On {0}
UIMAEE_service_exception_WARNING = Service: {0} Runtime Exception
+UIMAEE_service_warmup_failed_WARNING = Service: {0} Failed While Warming the Pipeline
UIMAEE_exception__WARNING =
UIMAEE_invalid_cpc_request__INFO = Invalid {0} Request. Analysis Engine Instance Not Found For Thread:
UIMAEE_primary_cas_pool_init__CONFIG = Primary CAS Pool Size: {0} Context: {1} Initial Cas Heap Size:{2} cells. Supports Incoming Service Requests.
@@ -259,4 +260,8 @@ UIMAEE_terminal_error_WARNING=Controller
UIMAEE_drop_cas_debug_FINEST=Controller:{0} Drop:{1} CAS:{2} ReplyReceived:{3}
UIMAEE_timer_started_FINE=Timer Started For CAS: {0} Timeout Value:{1} Timer Thread ID:{2} Timer Thread Name:{3}
UIMAEE_service_lost_connectivity_WARNING = Service: {0} Unable to Open Connection To Broker: {1} - Silently Retrying ...
-UIMAEE_service_regained_connectivity_INFO = Service: {0} Recovered Connectivity to Broker: {1}
\ No newline at end of file
+UIMAEE_service_regained_connectivity_INFO = Service: {0} Recovered Connectivity to Broker: {1}
+UIMAEE_service_warmup_start_INFO = Service: {0} Thread: {1} Warming Up The Pipeline ...
+UIMAEE_service_warmup_success_INFO = Service: {0} Thread: {1} WarmUp Has Finished Successfully - Processed: {2} CASes - Time Spent Warming Up: {3} secs- Ready For Processing
+UIMAEE_warmup_dropping_cas__FINE = Aggregate Warmup Stage - Dropping CAS:{0} Processing took {1}
+UIMAEE_warmup_start_cas__FINE = Aggregate Warmup Stage - Processing CAS id:{0}
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri Jun 5 19:06:57 2015
@@ -1594,6 +1594,7 @@ public abstract class BaseUIMAAsynchrono
cachedRequest.setException(exception);
cachedRequest.setProcessException();
}
+ cpcReadySemaphore.release();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
@@ -2378,7 +2379,8 @@ public abstract class BaseUIMAAsynchrono
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
// Check if all replies have been received
long outstandingCasCount = outstandingCasRequests.decrementAndGet();
- if (outstandingCasCount == 0) {
+
+ if (outstandingCasCount <= 0) {
cpcReadySemaphore.release();
}
//