You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2015/06/05 21:06:58 UTC

svn commit: r1683847 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-core/src/main/java/org/apache/uima/aae/ uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ uimaj-as-core/src/main/jav...

Author: cwiklik
Date: Fri Jun  5 19:06:57 2015
New Revision: 1683847

URL: http://svn.apache.org/r1683847
Log:
UIMA-4265 Implemented support for warming up the pipeline right after the initialization and before opening jms listeners

Added:
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java   (with props)
Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
    uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Jun  5 19:06:57 2015
@@ -547,7 +547,9 @@ public class UimaDefaultMessageListenerC
       failed = true;
     }
   }
-
+  public Endpoint getEndpoint() {
+	  return endpoint;
+  }
   private void terminate(Throwable t) {
     // ****************************************
     // terminate the service

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Fri Jun  5 19:06:57 2015
@@ -525,7 +525,12 @@ public class InProcessCache implements I
   public static class CacheEntry {
     public static final int FINAL_STATE = 1;
 
-    private CAS cas;
+    private volatile boolean warmUp = false;
+    
+    private volatile boolean failed = false;;
+    
+	
+	private CAS cas;
 
     // the following is set to true if the CAS has been created by CAS Multiplier
     // This flag is used to determine if the CAS should be output to client.
@@ -615,10 +620,25 @@ public class InProcessCache implements I
     public Semaphore getThreadCompletionSemaphore() {
       return threadCompletionSemaphore;
     }
+    public boolean isFailed() {
+  		return failed;
+  	}
+
+    public void setFailed(boolean failed) {
+  		this.failed = failed;
+  		Thread.dumpStack();
+    }
 
     public void setThreadCompletionSemaphore(Semaphore threadCompletionSemaphore) {
       this.threadCompletionSemaphore = threadCompletionSemaphore;
     }
+    public boolean isWarmUp() {
+ 		return warmUp;
+ 	}
+
+ 	public void setWarmUp(boolean warmUp) {
+ 		this.warmUp = warmUp;
+ 	}
     // never called 5/2013  was for XCAS
 //    protected CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor,
 //            OutOfTypeSystemData aotsd) {

Added: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java?rev=1683847&view=auto
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java (added)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java Fri Jun  5 19:06:57 2015
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.TypeSystem;
+import org.apache.uima.cas.impl.Serialization;
+import org.apache.uima.cas.impl.XmiCasDeserializer;
+import org.apache.uima.resource.metadata.TypeSystemDescription;
+import org.apache.uima.util.CasCreationUtils;
+import org.apache.uima.util.XMLInputSource;
+import org.apache.uima.util.XMLParser;
+
+public class WarmUpDataProvider {
+	InputChannel inputChannel;
+	AnalysisEngineController controller;
+	private String inputFileName;
+	private FileInputStream fis;
+	private ZipInputStream zis;
+	private ZipEntry nextEntry;
+	private int docSeq;
+	private boolean readingXmiFormat;
+	private TypeSystem inputTS;
+
+	public static void main(String[] args) {
+
+	}
+
+	public WarmUpDataProvider(String inputFileName)
+			throws IOException {
+		this.inputFileName = inputFileName;
+		fis = new FileInputStream(new File(inputFileName));
+		zis = new ZipInputStream(new BufferedInputStream(fis, 1024 * 100));
+		docSeq = 0;
+	}
+
+	public boolean hasNext() throws AnalysisEngineProcessException {
+		try {
+			nextEntry = zis.getNextEntry();
+		} catch (IOException e) {
+			throw new AnalysisEngineProcessException(e);
+		}
+		return (nextEntry != null) ? true : false;
+	}
+
+	public CAS next(CAS cas) throws Exception {
+		if (0 == docSeq) {
+			if (nextEntry.getName().equals("typesystem.xml")) {
+				getTypesystem();
+				readingXmiFormat = false;
+			} else {
+				readingXmiFormat = true;
+			}
+		} else {
+			if (nextEntry.getName().equals("typesystem.xml")) {
+				throw new AnalysisEngineProcessException(new RuntimeException(
+						"typesystem.xml entry found in the middle of input zipfile "
+								+ inputFileName));
+			}
+		}
+		byte[] buff = new byte[10000];
+		int bytesread;
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		try {
+			while (-1 != (bytesread = zis.read(buff))) {
+				baos.write(buff, 0, bytesread);
+			}
+			ByteArrayInputStream bis = new ByteArrayInputStream(
+					baos.toByteArray());
+			if (readingXmiFormat) {
+				XmiCasDeserializer.deserialize(bis, cas, true);
+			} else {
+				Serialization.deserializeCAS(cas, bis, inputTS, null);
+			}
+		} catch (Exception e) {
+			throw new AnalysisEngineProcessException(e);
+		}
+		docSeq++;
+		return cas;
+	}
+
+	private void getTypesystem() throws AnalysisEngineProcessException {
+		byte[] buff = new byte[10000];
+		int bytesread;
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		try {
+			while (-1 != (bytesread = zis.read(buff))) {
+				baos.write(buff, 0, bytesread);
+			}
+			ByteArrayInputStream bis = new ByteArrayInputStream(
+					baos.toByteArray());
+			// Get XML parser from framework
+			XMLParser xmlParser = UIMAFramework.getXMLParser();
+			// Parse type system descriptor
+			TypeSystemDescription tsDesc = xmlParser
+					.parseTypeSystemDescription(new XMLInputSource(
+							(InputStream) bis, null));
+			// Use type system description to create CAS and get the type system
+			// object
+			inputTS = CasCreationUtils.createCas(tsDesc, null, null)
+					.getTypeSystem();
+			// advance to first input CAS
+			nextEntry = zis.getNextEntry();
+		} catch (Exception e) {
+			throw new AnalysisEngineProcessException(e);
+		}
+	}
+
+}

Propchange: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/WarmUpDataProvider.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Jun  5 19:06:57 2015
@@ -30,7 +30,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMARuntimeException;
@@ -39,8 +42,10 @@ import org.apache.uima.aae.InProcessCach
 import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.AsynchAECasManager_impl;
 import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.UIDGenerator;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.WarmUpDataProvider;
 import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
 import org.apache.uima.aae.delegate.ControllerDelegate;
 import org.apache.uima.aae.delegate.Delegate;
@@ -174,6 +179,8 @@ public class AggregateAnalysisEngineCont
   // for a free CAS from the CasPool.
   public Semaphore semaphore = null;
   
+  private Lock mergeLock = new ReentrantLock();
+  
   /**
    * 
    * @param anEndpointName
@@ -1019,7 +1026,7 @@ public class AggregateAnalysisEngineCont
         // CASes will be dropped in finalStep() as they come back from delegates. When all are
         // accounted for and dropped, the parent CAS will be returned back to the client
         // with an exception.
-        if (parentCasStateEntry.isFailed()) {
+        if (parentCasStateEntry != null && parentCasStateEntry.isFailed()) {
           // Fetch Delegate object for the CM that produced the CAS. The producer key
           // is associated with a cache entry in the ProcessRequestHandler. Each new CAS
           // must have a key of a CM that produced it.
@@ -1090,7 +1097,6 @@ public class AggregateAnalysisEngineCont
 
     return false;
   }
-
   /**
    * This is a process method that is executed for CASes not created by a Multiplier in this
    * aggregate.
@@ -1703,7 +1709,6 @@ public class AggregateAnalysisEngineCont
       }
       return;
     }
-
     // Found entries in caches for a given CAS id
     try {
       endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
@@ -1715,8 +1720,6 @@ public class AggregateAnalysisEngineCont
           return;
         }
         
-  	 // System.out.println(" ---- Controller::"+getComponentName()+" CAS:"+casStateEntry.getCasReferenceId()+ " Has Children:"+casHasChildrenInPlay(casStateEntry)+" Parent::"+casStateEntry.getInputCasReferenceId());
-
         // Check if this CAS has children that are still being processed in this aggregate
         if (casHasChildrenInPlay(casStateEntry)) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -1734,6 +1737,22 @@ public class AggregateAnalysisEngineCont
           casStateEntry.waitingForChildrenToFinish(true);
           return;
         }
+      
+        // check if this is a warm up CAS. Such CAS is internally created with
+        // a main purpose of warming up analytics.
+        if ( isTopLevelComponent() && cacheEntry.isWarmUp()) {
+        	if ( cacheEntry.getThreadCompletionSemaphore() != null ) {
+            	cacheEntry.getThreadCompletionSemaphore().release();
+        	}
+        	// we are in the warm up state which means the pipelines have been initialized
+        	// and we are sending CASes to warm up/prime the analytics. Since we
+        	// reached the end of the flow here, just return now. There is nothing
+        	// else to do with the CAS.
+        	return;
+        	
+        }
+
+        
         casStateEntry.waitingForChildrenToFinish(false);
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -1792,7 +1811,8 @@ public class AggregateAnalysisEngineCont
           // Send a reply to the Client. If the CAS is an input CAS it will be dropped
           cEndpoint = replyToClient(cacheEntry, casStateEntry);
           replySentToClient = true;
-          if (cEndpoint.isRemote()) {
+          
+          if (cEndpoint != null && cEndpoint.isRemote()) {
             // if this service is a Cas Multiplier don't remove the CAS. It will be removed
             // when a remote client sends explicit Release CAS Request
             if (!isCasMultiplier()) {
@@ -2664,9 +2684,10 @@ public class AggregateAnalysisEngineCont
     mergeTypeSystem(aTypeSystem, fromDestination, null);
   }
 
-  public synchronized void mergeTypeSystem(String aTypeSystem, String fromDestination,
+//  public synchronized void mergeTypeSystem(String aTypeSystem, String fromDestination,
+  public void mergeTypeSystem(String aTypeSystem, String fromDestination,
           String fromServer) throws AsynchAEException {
-
+    mergeLock.lock();
     try {
       // Find the endpoint for this service, given its input queue name and broker URI.
       // We now allow endpoints managed by different servers to have the same queue name.
@@ -2792,7 +2813,10 @@ public class AggregateAnalysisEngineCont
       }
     } catch (Exception e) {
       throw new AsynchAEException(e);
+    } finally {
+    	mergeLock.unlock();
     }
+    
   }
   
   public static TypeSystemImpl getTypeSystemImpl(ProcessingResourceMetaData resource) throws ResourceInitializationException {
@@ -2809,7 +2833,7 @@ public class AggregateAnalysisEngineCont
     return ((CASImpl) casMgr).getTypeSystemImpl();
   }
 
-  private synchronized void completeInitialization() throws Exception {
+  private void completeInitialization() throws Exception {
     if (initialized) {
       return;
     }
@@ -2856,22 +2880,35 @@ public class AggregateAnalysisEngineCont
               "completeInitialization", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
               "UIMAEE_initialized_controller__INFO", new Object[] { getComponentName() });
     }
-
-    // Open latch to allow messages to be processed. The
-    // latch was closed to prevent messages from entering
-    // the controller before it is initialized.
-    latch.openLatch(getName(), isTopLevelComponent(), true);
-    initialized = true;
-    // Notify client listener that the initialization of the controller was successfull
-    notifyListenersWithInitializationStatus(null);
-    //  If this is a collocated aggregate change its state to RUNNING from INITIALIZING.
-    //  The top level aggregate state is changed when listeners on its input queue are
-    //  succesfully started in SpringContainerDeployer.doStartListeners() method.
-    if ( !isTopLevelComponent() ) {
-      changeState(ServiceState.RUNNING);
+    String warmUpDataPath=null;
+    // start warm up engine which will prime pipeline analytics
+    if ( isTopLevelComponent() && (warmUpDataPath = System.getProperty("WarmUpDataPath")) != null ) {
+//    	getInputChannel().startTempListeners();
+    	CountDownLatch warmUpLatch = new CountDownLatch(1);
+        super.warmUp(warmUpDataPath,warmUpLatch);
+        //warmUpLatch.await();
+    } else {
+    	startProcessing();
     }
   }
 
+  protected void startProcessing() throws Exception {
+	  
+	    // Open latch to allow messages to be processed. The
+	    // latch was closed to prevent messages from entering
+	    // the controller before it is initialized.
+	    latch.openLatch(getName(), isTopLevelComponent(), true);
+	    initialized = true;
+	    // Notify client listener that the initialization of the controller was successfull
+	    notifyListenersWithInitializationStatus(null);
+	    //  If this is a collocated aggregate change its state to RUNNING from INITIALIZING.
+	    //  The top level aggregate state is changed when listeners on its input queue are
+	    //  succesfully started in SpringContainerDeployer.doStartListeners() method.
+	    if ( !isTopLevelComponent() ) {
+	      changeState(ServiceState.RUNNING);
+	    }
+	  
+  }
   private String findKeyForValue(String fromDestination) {
 
     Set set = destinationMap.entrySet();
@@ -3288,4 +3325,39 @@ public class AggregateAnalysisEngineCont
   public int getServiceCasPoolSize() {
 	 return ((AsynchAECasManager_impl)casManager).getCasPoolSize();
   }
+  protected void doWarmUp(CAS cas, String casReferenceId) throws Exception {
+	  long processTime = 0;
+      Semaphore ts = new Semaphore(0);
+	  try {
+		  
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "doWarmUp",
+	                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_warmup_start_cas__FINE",
+	                  new Object[] { casReferenceId});
+	      }
+	      CacheEntry entry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
+	      entry.setThreadCompletionSemaphore(ts);
+	      long t1 = System.currentTimeMillis();
+	      process(cas, casReferenceId);
+	      // wait until the CAS reaches final step
+	      ts.acquire();
+		  processTime = System.currentTimeMillis() - t1;
+		  CasStateEntry cse = getLocalCache().lookupEntry(casReferenceId);
+	  } catch( Exception e) {
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                  "doWarmUp", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAEE_service_warmup_failed_WARNING", getComponentName());
+	      }
+	      throw e;
+	  } finally {
+		  dropCAS(casReferenceId, true);
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "doWarmUp",
+	                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_warmup_dropping_cas__FINE",
+	                  new Object[] { casReferenceId, processTime});
+	      }
+
+	  }
+  }
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Fri Jun  5 19:06:57 2015
@@ -20,6 +20,7 @@
 package org.apache.uima.aae.controller;
 
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.uima.UimaContext;
 import org.apache.uima.aae.AsynchAECasManager;
@@ -230,4 +231,6 @@ public interface AnalysisEngineControlle
    * @return
    */
   public String getPID();
+  
+  public void warmUp(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception;
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Fri Jun  5 19:06:57 2015
@@ -54,10 +54,12 @@ import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.OutputChannel;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+import org.apache.uima.aae.UIDGenerator;
 import org.apache.uima.aae.UimaAsContext;
 import org.apache.uima.aae.UimaAsVersion;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.UimaEEAdminContext;
+import org.apache.uima.aae.WarmUpDataProvider;
 import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
 import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.error.AsynchAEException;
@@ -256,6 +258,8 @@ public abstract class BaseAnalysisEngine
   private String serviceName=null;
   
   public abstract void dumpState(StringBuffer buffer, String lbl1);
+  
+  protected abstract void doWarmUp(CAS cas, String casReferenceId) throws Exception;
 
   public BaseAnalysisEngineController() {
 
@@ -1168,7 +1172,7 @@ public abstract class BaseAnalysisEngine
           }
           if (!isStopped()) {
             Endpoint endpoint = (Endpoint) anErrorContext.get(AsynchAEMessage.Endpoint);
-            if ( endpoint != null ) {
+            if ( endpoint != null && !"WarmupDelegate".equals(endpoint.getDelegateKey() ) ) {
               getOutputChannel().sendReply((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR), 
                       casReferenceId, parentCasReferenceId,
                       endpoint, AsynchAEMessage.Process);
@@ -2948,4 +2952,80 @@ public abstract class BaseAnalysisEngine
   public Map<String,String> getDeadClientMap() {
 	  return deadClientDestinationMap;
   }
+  public void warmUp(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception {
+	  if ( isPrimitive() ) {
+		  runWarmup(warmUpDataPath, warmUpLatch);
+	  } else {
+		  asyncWarmup(warmUpDataPath, warmUpLatch);
+	  }
+  }
+  
+  private void runWarmup(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception {
+	  long warmupStartTime = 0;
+	  long warmupCasCount=0;
+	  CAS cas = null;
+	  boolean isException = false;
+      if ( isTopLevelComponent() ) {
+          try {
+        	  warmupStartTime = System.currentTimeMillis();
+        	  WarmUpDataProvider wdp = new WarmUpDataProvider(warmUpDataPath);
+	    	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	    	     UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+	    	                "runWarmup", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	    	                "UIMAEE_service_warmup_start_INFO", new Object[] { getComponentName(), Thread.currentThread().getId() });
+	    	  }
+	    	  
+	    	  while( wdp.hasNext() && !isStopped()) {
+	    		  cas = getCasManagerWrapper().getNewCas();
+	    		  wdp.next(cas);
+	    		  warmupCasCount++;
+	    		  UIDGenerator idGenerator = new UIDGenerator();
+	    		  String casReferenceId = idGenerator.nextId();
+
+	    		  CasStateEntry cse = getLocalCache().createCasStateEntry(casReferenceId);
+	    		  CacheEntry entry = getInProcessCache().register(cas, null, null, null,
+	    				  casReferenceId, null, false);
+	    		  entry.setWarmUp(true);
+	    		  // delegate execution to the controller (primitive or aggregate)
+	    		  doWarmUp(cas, casReferenceId);
+
+	    	  }
+	    	  //        	      }
+        		  
+          } catch( Exception e) {
+        	  isException = true;
+        	  throw e;
+          }
+          finally {
+      		  if ( !isException ) {
+       			  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+       			     UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+        			                "runWarmup", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+        			                "UIMAEE_service_warmup_success_INFO", new Object[] { getComponentName(), Thread.currentThread().getId(), warmupCasCount, (System.currentTimeMillis()-warmupStartTime)/1000 });
+       			  }
+               	  if ( this instanceof AggregateAnalysisEngineController ) {
+            		  ((AggregateAnalysisEngineController_impl)this).startProcessing();
+        		  }
+        	  }
+          }
+    	  
+      }
+	  warmUpLatch.countDown();
+  }
+  private void asyncWarmup(final String warmUpDataPath, final CountDownLatch warmUpLatch) throws Exception {
+	  Thread t = new Thread(new Runnable() {
+			public void run() {
+				try {
+					runWarmup(warmUpDataPath, warmUpLatch);
+				} catch( Exception e) {
+					//e.printStackTrace();
+					notifyListenersWithInitializationStatus(new RuntimeException(e));
+				}
+			}
+
+	  });
+	  t.start();
+  
+  }
+  
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Fri Jun  5 19:06:57 2015
@@ -185,8 +185,7 @@ public class LocalCache extends Concurre
   }
 
   public static class CasStateEntry {
-
-    private String casReferenceId;
+	private String casReferenceId;
 
     private volatile boolean waitingForChildren; // true if in FinalState and still has children in play
     
@@ -488,5 +487,8 @@ public class LocalCache extends Concurre
     public List<AnalysisEnginePerformanceMetrics> getAEPerformanceList() {
       return performanceList;
     }
+    
+  
+
   }
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Fri Jun  5 19:06:57 2015
@@ -19,6 +19,7 @@
 
 package org.apache.uima.aae.controller;
 
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.text.NumberFormat;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 
 import org.apache.uima.UIMAFramework;
@@ -53,12 +55,14 @@ import org.apache.uima.analysis_engine.A
 import org.apache.uima.analysis_engine.AnalysisEngineDescription;
 import org.apache.uima.analysis_engine.AnalysisEngineManagement;
 import org.apache.uima.analysis_engine.CasIterator;
+import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.CASImpl;
 import org.apache.uima.cas.impl.OutOfTypeSystemData;
 import org.apache.uima.collection.CollectionReaderDescription;
 import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceProcessException;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.resource.metadata.ConfigurationParameter;
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
@@ -73,6 +77,7 @@ public class PrimitiveAnalysisEngineCont
   private static final Class CLASS_NAME = PrimitiveAnalysisEngineController_impl.class;
   private static final String DUMP_HEAP_THRESHOLD = "dumpHeapThreshold";
   
+  private volatile boolean casPoolInited = false;
   // Stores AE metadata
   private AnalysisEngineMetaData analysisEngineMetadata;
 
@@ -193,8 +198,28 @@ public class PrimitiveAnalysisEngineCont
       sharedInitSemaphore.acquire();
       // Parse the descriptor in the calling thread.
       rSpecifier = UimaClassFactory.produceResourceSpecifier(super.aeDescriptor);
+/*      
+      if ( rSpecifier instanceof AnalysisEngineDescription ) {
+          String name = ((AnalysisEngineDescription)rSpecifier).getAnalysisEngineMetaData().getName();
+          ((AnalysisEngineDescription)rSpecifier).getAnalysisEngineMetaData().setName(name+"-"+Thread.currentThread().getId());
+      //    System.out.println(getUimaContextAdmin().);
+          Field f =getUimaContextAdmin().getClass().getDeclaredField("mQualifiedContextName");//getQualifiedContextName()
+          f.setAccessible(true);
+         
+          f.get(getUimaContextAdmin().getQualifiedContextName()+Thread.currentThread().getId());
+      }
+      */
+      //paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, String.valueOf(Thread.currentThread().getId()));
+      //String p = (String)paramsMap.get(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX);//      +"-"+Thread.currentThread().getId();
+      //p = p.substring(0, p.lastIndexOf(","))+" "+Thread.currentThread().getId()+",";
+      //paramsMap.remove(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX);
+      //paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, p);
       AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, paramsMap);
-        //  Call to produceAnalysisEngine() may take a long time to complete. While this
+   
+      //AnalysisEngineManagementImpl aemi = (AnalysisEngineManagementImpl)ae.getManagementInterface();
+      //System.out.println("..... Created AE instance - Mgmt Instance Hashcode:"+aemi.hashCode()+" Unique MBean Name:"+aemi.getUniqueMBeanName());
+         // ae.getManagementInterface().getClass().
+      //  Call to produceAnalysisEngine() may take a long time to complete. While this
         //  method was executing, the service may have been stopped. Before continuing 
         //  check if the service has been stopped. If so, destroy AE instance and return.
         if ( isStopped() ) {
@@ -216,6 +241,54 @@ public class PrimitiveAnalysisEngineCont
                   "UIMAEE_multiple_deployment_not_allowed__WARNING", new Object[] {this.getComponentName(), ae.getMetaData().getName()});
         }
         aeInstancePool.checkin(ae);
+        
+        if (!isStopped() && !casPoolInited) {
+        	casPoolInited = true;
+        	
+            if (errorHandlerChain == null) {
+                super.plugInDefaultErrorHandlerChain();
+            }
+
+            getMonitor().setThresholds(getErrorHandlerChain().getThresholds());
+            // Initialize Cas Manager
+            if (getCasManagerWrapper() != null) {
+              try {
+            	  // Below should always be true. In spring context file AsynchAECasManager_impl
+            	  // is instantiated and setCasPoolSize() method is called which sets the 
+            	  // initialized state = true. isInitialized() returning true just means that
+            	  // setCasPoolSize() was called.
+                if (getCasManagerWrapper().isInitialized()) {
+                  getCasManagerWrapper().addMetadata(getAnalysisEngineMetadata());
+                  if (isTopLevelComponent()) {
+                    getCasManagerWrapper().initialize("PrimitiveAEService");
+                    CAS cas = getCasManagerWrapper().getNewCas("PrimitiveAEService");
+                    cas.release();
+                  }
+                }
+              } catch( Exception e) {
+            	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                              "postInitialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                              "UIMAEE_service_exception_WARNING", getComponentName());
+
+                      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                              "postInitialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                              "UIMAEE_exception__WARNING", e);
+            	  }
+            	  throw new AsynchAEException(e);
+              }
+            }
+        }
+        
+        
+        String warmUpDataPath=null;
+		// start warm up engine which will prime pipeline analytics
+		if ( isTopLevelComponent() && (warmUpDataPath = System.getProperty("WarmUpDataPath")) != null ) {
+		     CountDownLatch warmUpLatch = new CountDownLatch(1);
+			 warmUp(warmUpDataPath, warmUpLatch);
+			 warmUpLatch.await();
+		}
+		
         if (aeInstancePool.size() == analysisEnginePoolSize) {
           try {
             postInitialize();
@@ -299,6 +372,7 @@ public class PrimitiveAnalysisEngineCont
       serviceInfo.setAnalysisEngineInstanceCount(analysisEnginePoolSize);
 
       if (!isStopped()) {
+    	  /*
         getMonitor().setThresholds(getErrorHandlerChain().getThresholds());
         // Initialize Cas Manager
         if (getCasManagerWrapper() != null) {
@@ -315,6 +389,9 @@ public class PrimitiveAnalysisEngineCont
                 cas.release();
               }
             }
+            */
+     if (getCasManagerWrapper() != null) {
+          try {
             if (isTopLevelComponent()) {
               // add delay to allow controller listener to plug itself in
               synchronized(this) {
@@ -551,6 +628,7 @@ public class PrimitiveAnalysisEngineCont
   }
    
   private String produceUniqueName(AnalysisEngineManagement aem) {
+//	  System.out.println(">>>>>>>>>>>>>>>>>>> Thread:"+Thread.currentThread().getId()+" MBean:"+aem.getUniqueMBeanName());
     String[] parts = aem.getUniqueMBeanName().split(",");
     StringBuffer sb = new StringBuffer();
     for( String part : parts) {
@@ -574,6 +652,8 @@ public class PrimitiveAnalysisEngineCont
         sb.append("/").append(part.substring(part.trim().indexOf("=")+1));
       }
     }
+	//  System.out.println("<<<<<<<<<<<<<<<<<<< Thread:"+Thread.currentThread().getId()+" MBean:"+sb.toString());
+
     return sb.toString();
   }
 
@@ -682,6 +762,7 @@ public class PrimitiveAnalysisEngineCont
       
       
       AnalysisEngineManagement rootAem = ae.getManagementInterface();
+      //System.out.println("%%%%%%%%%%%%%%%%%%%% Unique MBean Name:"+rootAem.getUniqueMBeanName()+" AE Instance Hashcode"+ae.hashCode());
       if ( rootAem.getComponents().size() > 0 ) {
           getLeafManagementObjects(rootAem, beforeAnalysisManagementObjects);
       } else {
@@ -866,6 +947,27 @@ public class PrimitiveAnalysisEngineCont
           // Increment number of CASes processed by this service
           sequence++;
         }
+        try {
+           CacheEntry cacheEntry =
+                getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+
+           if ( isTopLevelComponent() && cacheEntry.isWarmUp() && "WarmupDelegate".equals(anEndpoint.getDelegateKey())) {
+              localCache.lookupEntry(newEntry.getCasReferenceId()).setDropped(true);
+              localCache.remove(newEntry.getCasReferenceId());
+              // Remove Stats from the global Map associated with the new CAS
+              // These stats for this CAS were added to the response message
+              // and are no longer needed
+              dropCasStatistics(newEntry.getCasReferenceId());
+
+        	  return;
+           }
+        
+        } catch( Exception exx ) {
+           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                    "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAEE_exception__WARNING", exx);
+        }
+
         if (!anEndpoint.isRemote()) {
           UimaTransport transport = getTransport(anEndpoint.getEndpoint());
           UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
@@ -932,6 +1034,24 @@ public class PrimitiveAnalysisEngineCont
                 new Object[] { Thread.currentThread().getName(), getComponentName(),
                     aCasReferenceId, (double) (super.getCpuTime() - time) / (double) 1000000 });
       }
+
+
+      // check if this is a warm up CAS. Such CAS is internally created with
+      // a main purpose of warming up analytics.
+      CacheEntry cacheEntry =
+              getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+
+      if ( isTopLevelComponent() && cacheEntry.isWarmUp()) {
+//      	if ( cacheEntry.getThreadCompletionSemaphore() != null ) {
+//          	cacheEntry.getThreadCompletionSemaphore().release();
+//      	}
+      	// we are in the warm state which means the pipelines have been initialized
+      	// and we are sending CASes to warm up/prime the analytics. Since we
+      	// reached the end of the flow here, just return now. There is nothing
+      	// else to do with the CAS.
+      	return;
+      }
+
       getMonitor().resetCountingStatistic("", Monitor.ProcessErrorCount);
       // Set total number of children generated from this CAS
       // Store total time spent processing this input CAS
@@ -971,8 +1091,8 @@ public class PrimitiveAnalysisEngineCont
                       after.getUniqueName(),
                       after.getAnalysisTime()- before.getAnalysisTime(),
                       after.getNumProcessed());
-            //System.out.println("********************"+metrics.getUniqueName());
-           // System.out.println("********************"+metrics.getName());
+           // System.out.println("********************"+metrics.getUniqueName()+" Analysis Time:"+metrics.getAnalysisTime());
+            //System.out.println("********************"+metrics.getName()+" Analysis Time:"+metrics.getAnalysisTime());
             performanceList.add(metrics);
             break;
           }
@@ -1125,19 +1245,20 @@ public class PrimitiveAnalysisEngineCont
   public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException {
    
 	  if ( ((ProcessingResourceMetaData) getAnalysisEngineMetadata())
-	            .getConfigurationParameterSettings().getParameterValue(
-	              AnalysisEngineController.AEInstanceCount) == null ) {
-	     addConfigIntParameter(AnalysisEngineController.AEInstanceCount, analysisEnginePoolSize);
+            .getConfigurationParameterSettings().getParameterValue(
+              AnalysisEngineController.AEInstanceCount) == null ) {
+		    addConfigIntParameter(AnalysisEngineController.AEInstanceCount, analysisEnginePoolSize);
 	  }
 
-	  if (getAnalysisEngineMetadata().getOperationalProperties().getOutputsNewCASes()) {
-         if ( ((ProcessingResourceMetaData) getAnalysisEngineMetadata())
-	              .getConfigurationParameterSettings().getParameterValue(
-	                AnalysisEngineController.CasPoolSize) == null ) {
-	                addConfigIntParameter(AnalysisEngineController.CasPoolSize, super.componentCasPoolSize);
-	     }
-	  }
-	  super.sendMetadata(anEndpoint, getAnalysisEngineMetadata());
+    if (getAnalysisEngineMetadata().getOperationalProperties().getOutputsNewCASes()) {
+  	  if ( ((ProcessingResourceMetaData) getAnalysisEngineMetadata())
+              .getConfigurationParameterSettings().getParameterValue(
+                AnalysisEngineController.CasPoolSize) == null ) {
+  	    	addConfigIntParameter(AnalysisEngineController.CasPoolSize, super.componentCasPoolSize);
+  	  }
+
+    }
+    super.sendMetadata(anEndpoint, getAnalysisEngineMetadata());
   }
 
   private AnalysisEngineMetaData getAnalysisEngineMetadata() {
@@ -1356,4 +1477,50 @@ public class PrimitiveAnalysisEngineCont
   public void dumpState(StringBuffer buffer, String lbl1) {
     buffer.append(getComponentName()+" State:"+getState());
   }
+
+  protected void doWarmUp(CAS cas, String casReferenceId) throws Exception {
+	long processTime = 0;
+	try {
+	    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "doWarmUp",
+	                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_warmup_start_cas__FINE",
+	                new Object[] { casReferenceId});
+	    }
+
+		long t1 = System.currentTimeMillis();
+		
+		Endpoint endpoint = new Endpoint_impl();
+		// set fake delegate key. This will be checked in process() method
+		// to make sure we are not attempting sending the CAS to a reply queue.
+		// Warm up simulates incoming CASes but there is no client to reply to.
+		endpoint.setDelegateKey("WarmupDelegate");
+		
+		process(cas, casReferenceId, endpoint);
+		processTime = System.currentTimeMillis() - t1;
+
+//		CacheEntry entry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
+//	    if ( entry != null && entry.isFailed()) {
+//		   if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+//		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+//		                  "doWarmUp", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+//		                  "UIMAEE_service_warmup_failed_WARNING", getComponentName());
+//		   }
+//		   throw new ResourceInitializationException();
+//	    }
+	} catch( Exception e) {
+		 if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                  "doWarmUp", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                  "UIMAEE_service_warmup_failed_WARNING", getComponentName());
+	      }
+	      throw e;
+	} finally {
+		  dropCAS(casReferenceId, true);
+		 if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "doWarmUp",
+	                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_warmup_dropping_cas__FINE",
+	                  new Object[] { casReferenceId, processTime});
+	      }
+	}
+  }
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java Fri Jun  5 19:06:57 2015
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -844,6 +845,12 @@ public class UimacppServiceController ex
     // TODO Auto-generated method stub
     return null;
   }
+
+@Override
+public void warmUp(String warmUpDataPath, CountDownLatch warmUpLatch) throws Exception {
+	// TODO Auto-generated method stub
+	
+}
 }
 
 /**

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Fri Jun  5 19:06:57 2015
@@ -104,6 +104,11 @@ public class ProcessCasErrorHandler exte
 
   private void sendExceptionToClient(Throwable t, String aCasReferenceId, Endpoint anEndpoint,
           AnalysisEngineController aController) throws Exception {
+	// When warming up the pipeline there is no client. CASes are
+	// created by the controller itself. Just return in this case.                
+	if ( anEndpoint != null && "WarmupDelegate".equals(anEndpoint.getDelegateKey())) {
+		  return;
+	}
     // Notify the parent of the exception
     if (anEndpoint != null && aCasReferenceId != null && !anEndpoint.isCasMultiplier()) {
       try {
@@ -384,6 +389,9 @@ public class ProcessCasErrorHandler exte
               anErrorContext.add(AsynchAEMessage.SkipPendingLists, "true");
             }
             if (ErrorHandler.TERMINATE.equalsIgnoreCase(threshold.getAction())) {
+            	if ( aController.isTopLevelComponent() && t instanceof Exception  ) {
+            		aController.notifyListenersWithInitializationStatus((Exception)t);
+            	}
               anErrorContext.add(ErrorContext.THROWABLE_ERROR, t);
               if (casReferenceId != null) {
                 try {

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Fri Jun  5 19:06:57 2015
@@ -453,9 +453,10 @@ public class ProcessRequestHandler_impl
         // we dont want to send a generated CAS back to the CM, override
         // with an endpoint provided by the client of
         // this service. Client endpoint is attached to an input Cas cache entry.
-        aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
-        aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
-
+        if ( replyToEndpoint != null ) {
+            aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
+            aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
+        } 
         // Before sending a CAS to Cas Multiplier, the aggregate has
         // saved the CM key in the CAS cache entry. Fetch the key
         // of the CM so that we can ask the right Shadow Cas Pool for
@@ -1021,7 +1022,18 @@ public class ProcessRequestHandler_impl
         int payload = messageContext.getMessageIntProperty(AsynchAEMessage.Payload);
         int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command);
 
-        getController().getControllerLatch().waitUntilInitialized();
+        CacheEntry ce = null;
+        if (AsynchAEMessage.CASRefID == payload) {
+        	String cid = null;
+        	// Fetch id of the CAS from the message.
+            if ((cid = getCasReferenceId(messageContext)) == null) {
+              return; // Invalid message. Nothing to do
+            }
+            ce = getController().getInProcessCache().getCacheEntryForCAS(cid);
+        }
+        if ( ce == null || !ce.isWarmUp() ) {
+            getController().getControllerLatch().waitUntilInitialized();
+        }
 
         // If a Process Request, increment number of CASes processed
         if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Fri Jun  5 19:06:57 2015
@@ -18,6 +18,7 @@
 
 UIMAEE_service_id_INFO = Starting Uima EE Service On {0}
 UIMAEE_service_exception_WARNING = Service: {0} Runtime Exception  
+UIMAEE_service_warmup_failed_WARNING = Service: {0} Failed While Warming the Pipeline
 UIMAEE_exception__WARNING = 
 UIMAEE_invalid_cpc_request__INFO = Invalid {0} Request. Analysis Engine Instance Not Found For Thread: 
 UIMAEE_primary_cas_pool_init__CONFIG = Primary CAS Pool Size: {0} Context: {1} Initial Cas Heap Size:{2} cells. Supports Incoming Service Requests.
@@ -259,4 +260,8 @@ UIMAEE_terminal_error_WARNING=Controller
 UIMAEE_drop_cas_debug_FINEST=Controller:{0} Drop:{1} CAS:{2} ReplyReceived:{3}
 UIMAEE_timer_started_FINE=Timer Started For CAS: {0} Timeout Value:{1} Timer Thread ID:{2} Timer Thread Name:{3}
 UIMAEE_service_lost_connectivity_WARNING = Service: {0} Unable to Open Connection To Broker: {1} - Silently Retrying ...
-UIMAEE_service_regained_connectivity_INFO = Service: {0} Recovered Connectivity to Broker: {1}
\ No newline at end of file
+UIMAEE_service_regained_connectivity_INFO = Service: {0} Recovered Connectivity to Broker: {1}
+UIMAEE_service_warmup_start_INFO = Service: {0} Thread: {1} Warming Up The Pipeline ...
+UIMAEE_service_warmup_success_INFO = Service: {0} Thread: {1} WarmUp Has Finished Successfully - Processed: {2} CASes - Time Spent Warming Up: {3} secs- Ready For Processing
+UIMAEE_warmup_dropping_cas__FINE = Aggregate Warmup Stage - Dropping CAS:{0} Processing took {1}
+UIMAEE_warmup_start_cas__FINE = Aggregate Warmup Stage - Processing CAS id:{0}

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1683847&r1=1683846&r2=1683847&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri Jun  5 19:06:57 2015
@@ -1594,6 +1594,7 @@ public abstract class BaseUIMAAsynchrono
       cachedRequest.setException(exception);
       cachedRequest.setProcessException();
     }
+    cpcReadySemaphore.release();
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(
               Level.INFO,
@@ -2378,7 +2379,8 @@ public abstract class BaseUIMAAsynchrono
     	        serviceDelegate.removeCasFromOutstandingList(casReferenceId);
     	        // Check if all replies have been received
     	        long outstandingCasCount = outstandingCasRequests.decrementAndGet();
-    	        if (outstandingCasCount == 0) {
+
+    	        if (outstandingCasCount <= 0) {
     	          cpcReadySemaphore.release();
     	        }
     	        //