You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/05/07 21:27:44 UTC

svn commit: r1831129 [4/5] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima/...

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java Mon May  7 21:27:43 2018
@@ -112,12 +112,12 @@ public class ActiveMQSupport extends Tes
     Logger.getRootLogger().addAppender(console);
     */
     broker = createBroker();  // sets uri
-    /*
-    broker.setUseJmx(false);
+    
+    broker.setUseJmx(true);
     if ( broker.isUseJmx()) {
         broker.getManagementContext().setConnectorPort(1098);
     }
-    */
+    
     SystemUsage su = new SystemUsage();
     MemoryUsage mu = new MemoryUsage();
     mu.setPercentOfJvmHeap(50);
@@ -305,6 +305,8 @@ public class ActiveMQSupport extends Tes
       if ( enableJMX ) {
           broker.setUseJmx(enableJMX);
     	  broker.getManagementContext().setConnectorPort(defaultJMXPort);
+    	  System.out.println("************** ACTIVEMQ JMX Connector Enabled Port:"+defaultJMXPort+"****************");
+
       } else {
     	  broker.setUseJmx(false);
     	  System.out.println("************** ACTIVEMQ JMX Connector Not Enabled ****************");

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Mon May  7 21:27:43 2018
@@ -34,6 +34,7 @@ import javax.jms.Message;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+import org.apache.uima.aae.client.UimaAS;
 import org.apache.uima.aae.client.UimaASProcessStatus;
 import org.apache.uima.aae.client.UimaASProcessStatusImpl;
 import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
@@ -103,7 +104,7 @@ public abstract class BaseTestSupport ex
 
   private Object errorCounterMonitor = new Object();
 
-  private BaseUIMAAsynchronousEngine_impl engine;
+  private UimaAsynchronousEngine uimaAsClient;
 
   protected UimaAsTestCallbackListener listener = new UimaAsTestCallbackListener();
 
@@ -115,17 +116,17 @@ public abstract class BaseTestSupport ex
 
   protected long failedCasCountDueToBrokerFailure = 0;
   
-  protected String deployService(Transport transport, BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+  protected String deployService(Transport transport, UimaAsynchronousEngine uimaAsClient,
           String aDeploymentDescriptorPath) throws Exception  {
 	  String serviceId = null;
 	  if ( transport.equals(Transport.Java)) {
-		  serviceId = deployJavaService(eeUimaEngine, aDeploymentDescriptorPath);
+		  serviceId = deployJavaService(uimaAsClient, aDeploymentDescriptorPath);
 	  } else if ( transport.equals(Transport.JMS)) {
-		  serviceId = deployJmsService(eeUimaEngine, aDeploymentDescriptorPath);
+		  serviceId = deployJmsService(uimaAsClient, aDeploymentDescriptorPath);
 	  }
 	  return serviceId;
   }
-  protected String deployJavaService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+  protected String deployJavaService(UimaAsynchronousEngine uimaAsClient,
           String aDeploymentDescriptorPath) throws Exception  {
     System.setProperty("Provider", "java");
     System.setProperty("Protocol", "java");
@@ -135,9 +136,9 @@ public abstract class BaseTestSupport ex
     Map<String, Object> appCtx = new HashMap<>();
     appCtx.put( AsynchAEMessage.Transport, Transport.Java);
  
-    return deployService(eeUimaEngine, aDeploymentDescriptorPath, appCtx);
+    return deployService(uimaAsClient, aDeploymentDescriptorPath, appCtx);
   }
-  protected String deployJmsService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+  protected String deployJmsService(UimaAsynchronousEngine uimaAsClient,
           String aDeploymentDescriptorPath)  throws Exception  {
      System.setProperty("Provider", "activemq");
      System.setProperty("Protocol", "jms");
@@ -158,13 +159,13 @@ public abstract class BaseTestSupport ex
      Map<String, Object> appCtx = new HashMap<>();
 	 appCtx.put( AsynchAEMessage.Transport, Transport.JMS);
 	 
-	 return deployService(eeUimaEngine, aDeploymentDescriptorPath, appCtx);
+	 return deployService(uimaAsClient, aDeploymentDescriptorPath, appCtx);
   }
   protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
           String aDeploymentDescriptorPath) throws Exception {
 	  return deployService(eeUimaEngine, aDeploymentDescriptorPath, new HashMap<String, Object>());
   }
-  protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
+  protected String deployService(UimaAsynchronousEngine uimaAsClient,
           String aDeploymentDescriptorPath,  Map<String, Object> appCtx) throws Exception {
 
  // protected String deployService(BaseUIMAAsynchronousEngine_impl eeUimaEngine,
@@ -191,13 +192,13 @@ public abstract class BaseTestSupport ex
     // appCtx.put(UimaAsynchronousEngine.UimaEeDebug, UimaAsynchronousEngine.UimaEeDebug);
     String serviceId = null;
     try {
-    	serviceId = eeUimaEngine.deploy(aDeploymentDescriptorPath, appCtx);
+    	serviceId = uimaAsClient.deploy(aDeploymentDescriptorPath, appCtx);
     } catch (ResourceInitializationException e) {
       if (!ignoreException(ResourceInitializationException.class)) {
         System.out
                 .println(">>>>>>>>>>> runTest: Stopping Client API Due To Initialization Exception");
         isStopping = true;
-        eeUimaEngine.stop();
+        uimaAsClient.stop();
         throw e;
       }
       System.out.println(">>>>>>>>>>> Exception ---:" + e.getClass().getName());
@@ -231,10 +232,13 @@ public abstract class BaseTestSupport ex
     return false;
   }
 
-  public void initialize(BaseUIMAAsynchronousEngine_impl eeUimaEngine, Map<String, Object> appCtx)
+  public void initialize(UimaAsynchronousEngine uimaAsClient, Map<String, Object> appCtx)
           throws Exception {
-    eeUimaEngine.addStatusCallbackListener(listener);
-    eeUimaEngine.initialize(appCtx);
+	  uimaAsClient.addStatusCallbackListener(listener);
+	  System.out.println("... Client calling initialize");
+	  uimaAsClient.initialize(appCtx);
+	  System.out.println("... Client Done calling initialize");
+
   }
 
   protected void setDoubleByteText(String aDoubleByteText) {
@@ -370,11 +374,14 @@ public abstract class BaseTestSupport ex
     // Instantiate Uima EE Client
     isStopped = false;
     isStopping = false;
-    final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
+    uimaAsClient = 
+  		  UimaAS.newInstance(Transport.JMS);
+
+//    final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
     // Deploy Uima EE Primitive Service
-    final String containerId = deployJavaService(eeUimaEngine, serviceDeplyDescriptor);
+    final String containerId = deployJmsService(uimaAsClient, serviceDeplyDescriptor);
 
-    engine = eeUimaEngine;
+    //uimaAsClient = eeUimaEngine;
 
     Thread t1 = null;
     Thread t2 = null;
@@ -383,7 +390,7 @@ public abstract class BaseTestSupport ex
     appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, aGetMetaTimeout);
     appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
 
-    initialize(eeUimaEngine, appCtx);
+    initialize(uimaAsClient, appCtx);
 
     // Wait until the top level service returns its metadata
     waitUntilInitialized();
@@ -405,7 +412,7 @@ public abstract class BaseTestSupport ex
             try {
               mux.wait(5000);
               // Undeploy service container
-              eeUimaEngine.undeploy(containerId);
+              uimaAsClient.undeploy(containerId);
             } catch (Exception e) {
             }
           }
@@ -416,7 +423,7 @@ public abstract class BaseTestSupport ex
 
     // Spin runner threads and start sending CASes
     for (int i = 0; i < howManyRunningThreads; i++) {
-      SynchRunner runner = new SynchRunner(eeUimaEngine, howManyCASesPerRunningThread, listener);
+      SynchRunner runner = new SynchRunner(uimaAsClient, howManyCASesPerRunningThread, listener);
       Thread runnerThread = new Thread(runner, "Runner" + i);
       runnerThread.start();
       System.out.println("runTest: Started Runner Thread::Id=" + runnerThread.getId());
@@ -429,7 +436,7 @@ public abstract class BaseTestSupport ex
     if (!isStopped && !unexpectedException) {
       System.out.println("runTest: Sending CPC");
       // Send CPC
-      eeUimaEngine.collectionProcessingComplete();
+      uimaAsClient.collectionProcessingComplete();
     }
 
     // If have skipped CPC trip the latch
@@ -438,29 +445,29 @@ public abstract class BaseTestSupport ex
     }
     t1.join();
     isStopping = true;
-    eeUimaEngine.stop();
+    uimaAsClient.stop();
   }
 
-  protected void runCrTest(BaseUIMAAsynchronousEngine_impl aUimaEeEngine, int howMany)
+  protected void runCrTest(UimaAsynchronousEngine uimaAsClient, int howMany)
           throws Exception {
-    engine = aUimaEeEngine;
+    this.uimaAsClient = uimaAsClient;
     final Semaphore ctrlSemaphore = new Semaphore(1);
     spinMonitorThread(ctrlSemaphore, howMany, PROCESS_LATCH);
-    aUimaEeEngine.process();
+    uimaAsClient.process();
     waitOnMonitor(ctrlSemaphore);
   }
 
-  protected void runTest(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine,
+  protected void runTest(Map appCtx, UimaAsynchronousEngine uimaAsClient,
           String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind)
           throws Exception {
-    runTest(appCtx, aUimaEeEngine, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind,
+    runTest(appCtx, uimaAsClient, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind,
             SEND_CAS_ASYNCHRONOUSLY);
   }
 
-  protected void runTest2(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine,
+  protected void runTest2(Map appCtx, UimaAsynchronousEngine uimaAsClient,
           String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind)
           throws Exception {
-    runTest2(appCtx, aUimaEeEngine, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind,
+    runTest2(appCtx, uimaAsClient, aBrokerURI, aTopLevelServiceQueueName, howMany, aLatchKind,
             SEND_CAS_ASYNCHRONOUSLY);
   }
 
@@ -478,14 +485,14 @@ public abstract class BaseTestSupport ex
    * @param sendCasAsynchronously
    * @throws Exception
    */
-  protected void runTest(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine,
+  protected void runTest(Map appCtx, UimaAsynchronousEngine uimaAsClient,
           String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind,
           boolean sendCasAsynchronously) throws Exception {
     Thread t1 = null;
     Thread t2 = null;
     serviceShutdownException = false;
     unexpectedException = false;
-    engine = aUimaEeEngine;
+    this.uimaAsClient = uimaAsClient;
     isStopped = false;
     isStopping = false;
 
@@ -493,7 +500,7 @@ public abstract class BaseTestSupport ex
       appCtx = buildContext(aBrokerURI, aTopLevelServiceQueueName, 0);
     }
     try {
-      initialize(aUimaEeEngine, appCtx);
+      initialize(uimaAsClient, appCtx);
     } catch (ResourceInitializationException e) {
       if (ignoreException(ResourceInitializationException.class)) {
         return;
@@ -527,7 +534,7 @@ public abstract class BaseTestSupport ex
         if (!isStopped) {
           // Send an in CAS to the top level service
           try {
-            sendCAS(aUimaEeEngine, howMany, sendCasAsynchronously);
+            sendCAS(uimaAsClient, howMany, sendCasAsynchronously);
           } catch( Exception e) {}
         }
         // Wait until ALL CASes return from the service
@@ -544,7 +551,7 @@ public abstract class BaseTestSupport ex
           
           if (!serviceShutdownException && !isStopped && !unexpectedException) {
             System.out.println("runTest: Sending CPC");
-            aUimaEeEngine.collectionProcessingComplete();
+            uimaAsClient.collectionProcessingComplete();
           } else {
             System.out
                     .println(">>>>>>>>>>>>>>>> runTest: Not Sending CPC Due To Exception [serviceShutdownException="
@@ -577,7 +584,7 @@ public abstract class BaseTestSupport ex
 */
     isStopping = true;
     
-    aUimaEeEngine.stop();
+    uimaAsClient.stop();
 
     // Finally fail test if unhappy ... must be last call as acts like "throw"
     if (unexpectedException) {
@@ -600,19 +607,19 @@ public abstract class BaseTestSupport ex
    * @param sendCasAsynchronously
    * @throws Exception
    */
-  protected void runTest2(Map appCtx, BaseUIMAAsynchronousEngine_impl aUimaEeEngine,
+  protected void runTest2(Map appCtx, UimaAsynchronousEngine uimaAsClient,
           String aBrokerURI, String aTopLevelServiceQueueName, int howMany, int aLatchKind,
           boolean sendCasAsynchronously) throws Exception {
     Thread t1 = null;
     Thread t2 = null;
-    engine = aUimaEeEngine;
+    this.uimaAsClient = uimaAsClient;
     isStopped = false;
     isStopping = false;
 
     if (appCtx == null) {
       appCtx = buildContext(aBrokerURI, aTopLevelServiceQueueName, 0);
     }
-    initialize(aUimaEeEngine, appCtx);
+    initialize(uimaAsClient, appCtx);
 
     // Wait until the top level service returns its metadata
     waitUntilInitialized();
@@ -637,7 +644,7 @@ public abstract class BaseTestSupport ex
         			append(i+1).append(" CAS of ").append(howMany);
         	System.out.println(sb.toString());
           // Send an in CAS to the top level service
-          sendCAS(aUimaEeEngine, 1, sendCasAsynchronously);
+          sendCAS(uimaAsClient, 1, sendCasAsynchronously);
         }
         // Wait until ALL CASes return from the service
         if (t2 != null) {
@@ -652,7 +659,7 @@ public abstract class BaseTestSupport ex
             }
 
             // Send CPC
-            aUimaEeEngine.collectionProcessingComplete();
+            uimaAsClient.collectionProcessingComplete();
           }
         }
 
@@ -668,7 +675,7 @@ public abstract class BaseTestSupport ex
 
     isStopping = true;
     //aUimaEeEngine.stop();
-    aUimaEeEngine.undeploy();
+    uimaAsClient.undeploy();
 
     // Finally fail test if unhappy ... must be last call as acts like "throw"
     if (unexpectedException) {
@@ -688,15 +695,15 @@ public abstract class BaseTestSupport ex
    *          - use either synchronous or asynchronous API
    * @throws Exception
    */
-  protected void sendCAS(BaseUIMAAsynchronousEngine_impl eeUimaEngine, int howMany,
+  protected void sendCAS(UimaAsynchronousEngine uimaAsClient, int howMany,
           boolean sendCasAsynchronously) throws Exception {
-    engine = eeUimaEngine;
+    this.uimaAsClient = uimaAsClient;
     for (int i = 0; i < howMany; i++) {
  
     	if (isStopping) {
     		break;
     	}
-      CAS cas = eeUimaEngine.getCAS();
+      CAS cas = uimaAsClient.getCAS();
       if (cas == null) {
         if (isStopping) {
           System.out.println(">> runTest: stopping after sending " + i + " of " + howMany
@@ -712,9 +719,9 @@ public abstract class BaseTestSupport ex
         cas.setDocumentText(text);
       }
       if (sendCasAsynchronously) {
-        eeUimaEngine.sendCAS(cas);
+    	  uimaAsClient.sendCAS(cas);
       } else {
-        eeUimaEngine.sendAndReceiveCAS(cas);
+    	  uimaAsClient.sendAndReceiveCAS(cas);
       }
     }
   }
@@ -829,7 +836,9 @@ public abstract class BaseTestSupport ex
                   || (e.getCause() != null && e.getCause() instanceof ServiceShutdownException)) {
             serviceShutdownException = true;
             isStopping = true;
-            engine.stop();
+            try {
+                uimaAsClient.stop();
+            } catch( Exception ex) {ex.printStackTrace();}
           } else if (ignoreException(e.getClass())) {
             expectedException = true;
           } else if (e instanceof ResourceProcessException && isProcessTimeout(e)) {
@@ -837,14 +846,17 @@ public abstract class BaseTestSupport ex
               System.out.println("runTest: Incrementing ProcessTimeout Counter");
               timeoutCounter++;
             }
-          } else if (engine != null && (e instanceof UimaASPingTimeout || (e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) )) {
+          } else if (uimaAsClient != null && (e instanceof UimaASPingTimeout || (e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) )) {
             System.out.println("runTest: Ping Timeout - service Not Responding To Ping");
             if (cpcLatch != null) {
               cpcLatch.countDown();
             }
             isStopping = true;
-            engine.stop();
-          } else if ( engine != null && e instanceof UimaASProcessCasTimeout) {
+            try {
+                uimaAsClient.stop();
+            } catch( Exception ex) {ex.printStackTrace();}
+            
+          } else if ( uimaAsClient != null && e instanceof UimaASProcessCasTimeout) {
             if ( e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) {
               if ( countPingRetries ) {
                 if ( pingTimeoutCount > maxPingRetryCount ) {
@@ -852,7 +864,9 @@ public abstract class BaseTestSupport ex
                     cpcLatch.countDown();
                   }
                   isStopping = true;
-                  engine.stop();
+                  try {
+                      uimaAsClient.stop();
+                  } catch( Exception ex) {ex.printStackTrace();}
                   
                 } else {
                   pingTimeoutCount++;
@@ -1063,15 +1077,15 @@ public abstract class BaseTestSupport ex
    * 
    */
   public class SynchRunner implements Runnable {
-    private BaseUIMAAsynchronousEngine_impl uimaClient = null;
+    private UimaAsynchronousEngine uimaClient = null;
 
     private long howManyCASes = 1;
     private UimaAsTestCallbackListener callbackListener;
-    public SynchRunner(BaseUIMAAsynchronousEngine_impl aUimaClient, int howMany) {
-      this(aUimaClient, howMany, null);
+    public SynchRunner(UimaAsynchronousEngine uimaClient, int howMany) {
+      this(uimaClient, howMany, null);
     }
-    public SynchRunner(BaseUIMAAsynchronousEngine_impl aUimaClient, int howMany, UimaAsTestCallbackListener aListener) {
-      uimaClient = aUimaClient;
+    public SynchRunner(UimaAsynchronousEngine uimaClient, int howMany, UimaAsTestCallbackListener aListener) {
+      this.uimaClient = uimaClient;
       howManyCASes = howMany;
       callbackListener = aListener;
     }
@@ -1087,8 +1101,12 @@ public abstract class BaseTestSupport ex
 
           try {
             // Send CAS and wait for a response
-            String casReferenceId = uimaClient.sendAndReceiveCAS(cas, pt);
-            status = new UimaASProcessStatusImpl(pt, cas, casReferenceId);
+        	  if ( uimaClient instanceof BaseUIMAAsynchronousEngine_impl ) {
+        		  String casReferenceId = 
+        				  ((BaseUIMAAsynchronousEngine_impl)uimaClient).sendAndReceiveCAS(cas, pt);
+                  status = new UimaASProcessStatusImpl(pt, cas, casReferenceId);
+        	  }
+           
           } catch (ResourceProcessException rpe) {
             //rpe.printStackTrace();
             status = new UimaASProcessStatusImpl(pt);
@@ -1108,12 +1126,12 @@ public abstract class BaseTestSupport ex
     }
   }
 
-  protected void spinShutdownThread(final BaseUIMAAsynchronousEngine_impl uimaEEEngine, long when)
+  protected void spinShutdownThread(final UimaAsynchronousEngine uimaAsClient, long when)
           throws Exception {
-    spinShutdownThread(uimaEEEngine, when, null, 0);
+    spinShutdownThread(uimaAsClient, when, null, 0);
   }
 
-  protected void spinShutdownThread(final BaseUIMAAsynchronousEngine_impl uimaEEEngine, long when,
+  protected void spinShutdownThread(final UimaAsynchronousEngine uimaAsClient, long when,
           final String[] aSpringContainerIds, final int stop_level) throws Exception {
     Date timeToRun = new Date(System.currentTimeMillis() + when);
     final Timer timer = new Timer();
@@ -1124,7 +1142,12 @@ public abstract class BaseTestSupport ex
         if (aSpringContainerIds == null) {
           isStopping = true;
           System.out.println(">>>> runTest: Stopping UIMA EE Engine");
-          uimaEEEngine.stop();
+          try {
+        	  uimaAsClient.stop();
+          } catch( Exception ex) {
+        	 ex.printStackTrace();;
+          }
+          
           isStopping = false;
           isStopped = true;
           System.out.println(">>>> runTest: UIMA EE Engine Stopped");
@@ -1139,7 +1162,7 @@ public abstract class BaseTestSupport ex
           try {
             System.out.println(">>>> runTest: Quiescing Service And Stopping it");
             for( int i = aSpringContainerIds.length; i > 0; i--) {
-              uimaEEEngine.undeploy(aSpringContainerIds[i-1], stop_level);
+            	uimaAsClient.undeploy(aSpringContainerIds[i-1], stop_level);
             }
           } catch (Exception e) {
             e.printStackTrace();

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml Mon May  7 21:27:43 2018
@@ -26,7 +26,7 @@
   <name>Top Level TAE</name>
   <description></description>
   
-  <deployment protocol="jms" provider="activemq">
+  <deployment protocol="${Protocol}" provider="${Provider}">
     <casPool numberOfCASes="5" initialFsHeapSize="500"/>
     <service>
       <inputQueue endpoint="TopLevelTaeQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
@@ -40,6 +40,7 @@
 	        <casMultiplier poolSize="5"/> 
           </analysisEngine>
 
+
           <remoteAnalysisEngine key="NoOp" remoteReplyQueueScaleout="3">
 	                  <inputQueue endpoint="NoOpAnnotatorQueue" brokerURL="${DefaultBrokerURL}"/>
 	                  <serializer method="xmi"/>

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml Mon May  7 21:27:43 2018
@@ -24,7 +24,7 @@
   <description/>
   <version/>
   <vendor/>
-  <deployment protocol="jms" provider="activemq">
+  <deployment protocol="${Protocol}" provider="${Provider}">
     <casPool numberOfCASes="3" initialFsHeapSize="2000000"/> 
     <service>
       <inputQueue endpoint="InnerAggregateQueue" brokerURL="${DefaultBrokerURL}" prefetch="0"/>

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml Mon May  7 21:27:43 2018
@@ -26,7 +26,7 @@
   <name>Top Level TAE</name>
   <description></description>
   
-  <deployment protocol="jms" provider="activemq">
+  <deployment protocol="${Protocol}" provider="${Provider}">
     <casPool numberOfCASes="5" initialFsHeapSize="500"/>
     <service>
       <inputQueue endpoint="TopLevelTaeQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
@@ -36,7 +36,7 @@
       <analysisEngine>
       <scaleout numberOfInstances="5"/> 
       
-        <!--delegates>
+        <delegates>
         
           <analysisEngine key="TestMultiplier">
 	        <casMultiplier poolSize="5"/> 
@@ -55,7 +55,7 @@
 	              </remoteAnalysisEngine>
 	          </delegates>
           </analysisEngine>
-        </delegates-->
+        </delegates>
       </analysisEngine>
     </service>
   </deployment>

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml Mon May  7 21:27:43 2018
@@ -28,13 +28,14 @@
   <description>Deploys the NoOp Annotator Primitive AE</description>
   
   <deployment protocol="jms" provider="activemq">
+  <!--deployment protocol="${Protocol}" provider="${Provider}"-->
     <casPool numberOfCASes="5" initialFsHeapSize="500"/> 
     <service>
       <inputQueue endpoint="NoOpAnnotatorQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
       <topDescriptor>
        <import location="../descriptors/analysis_engine/NoOpAnnotator.xml"/> 
       </topDescriptor>
-      <analysisEngine>
+      <analysisEngine async="false">
       <asyncPrimitiveErrorConfiguration>
            <!-- <processCasErrors thresholdCount="4" thresholdWindow="10" thresholdAction="terminate" /> -->
            <collectionProcessCompleteErrors additionalErrorAction="terminate" />

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml Mon May  7 21:27:43 2018
@@ -26,8 +26,7 @@
   
   <name>Room Number Annotator</name>
   <description>Deploys Person Title Annotator Primitive AE</description>
-  
-  <deployment protocol="jms" provider="activemq">
+    <deployment protocol="${Protocol}" provider="${Provider}">
     <casPool numberOfCASes="5" initialFsHeapSize="500"/> 
     <service>
       <inputQueue endpoint="PersonTitleAnnotatorQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
@@ -35,6 +34,7 @@
        <import location="../descriptors/analysis_engine/PersonTitleAnnotator.xml"/> 
       </topDescriptor>
       <analysisEngine>
+      <scaleout numberOfInstances="2" />
       <asyncPrimitiveErrorConfiguration>
            <!-- <processCasErrors thresholdCount="4" thresholdWindow="10" thresholdAction="terminate" /> -->
            <collectionProcessCompleteErrors additionalErrorAction="terminate" />

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml Mon May  7 21:27:43 2018
@@ -24,12 +24,13 @@
   <description/>
   <version/>
   <vendor/>
-  <deployment protocol="jms" provider="activemq">
+  <deployment protocol="${Protocol}" provider="${Provider}">
     <casPool numberOfCASes="5" initialFsHeapSize="2000000"/>
     <service>
       <inputQueue endpoint="TopLevelTaeQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/>
       <topDescriptor>
         <import location="../descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml"/>
+        <!--import location="../descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml"/-->
       </topDescriptor>
       <analysisEngine async="true">
         <casMultiplier poolSize="5" initialFsHeapSize="2000000"/>
@@ -56,16 +57,34 @@
             </asyncAggregateErrorConfiguration>
           </analysisEngine>
 			
-          <remoteAnalysisEngine key="InnerRemoteCMAggregate">
-            <casMultiplier poolSize="5" initialFsHeapSize="2000000"/> 
-            <inputQueue brokerURL="${DefaultBrokerURL}" endpoint="InnerAggregateQueue"/>
-            <serializer method="xmi"/>
+          <!--RemoteAnalysisEngine key="InnerRemoteCMAggregate"-->
+          <analysisEngine key="InnerRemoteCMAggregate" async="true">
+            <casMultiplier processParentLast="true"/> 
+            <!--inputQueue brokerURL="${DefaultBrokerURL}" endpoint="InnerAggregateQueue"/>
+            <serializer method="xmi"/-->
+              <delegates>
+                      <analysisEngine key="InnerTestMultiplier" async="false" inputQueueScaleout="2" internalReplyQueueScaleout="1">
+                       
+                        <scaleout numberOfInstances="1"/>
+                         <casMultiplier poolSize="4" initialFsHeapSize="2000000"/>
+            
+                      </analysisEngine>
+                      <analysisEngine key="NoOpCC" async="false" >
+                       
+                        <scaleout numberOfInstances="1"/>
+                        
+            
+                      </analysisEngine>
+                      
+                      
+                </delegates>       
             <asyncAggregateErrorConfiguration>
               <getMetadataErrors maxRetries="0" timeout="0" errorAction="disable"/>
               <processCasErrors maxRetries="0" timeout="0" continueOnRetryFailure="false" thresholdCount="1" thresholdWindow="0" thresholdAction="terminate"/>
               <collectionProcessCompleteErrors timeout="0" additionalErrorAction="terminate"/>
             </asyncAggregateErrorConfiguration>
-          </remoteAnalysisEngine>
+          </analysisEngine>
+         <!--/remoteAnalysisEngine-->
 			
           <analysisEngine key="NoOpCandidateAnswerCM" async="false">
             <casMultiplier poolSize="3" initialFsHeapSize="2000000"/>

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml Mon May  7 21:27:43 2018
@@ -68,7 +68,7 @@
       <nameValuePair>
         <name>NumberToGenerate</name>
         <value>
-          <integer>3</integer>
+          <integer>1</integer>
         </value>
       </nameValuePair>
     </configurationParameterSettings>

Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml?rev=1831129&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml Mon May  7 21:27:43 2018
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+	<!--
+	 ***************************************************************
+	 * Licensed to the Apache Software Foundation (ASF) under one
+	 * or more contributor license agreements.  See the NOTICE file
+	 * distributed with this work for additional information
+	 * regarding copyright ownership.  The ASF licenses this file
+	 * to you under the Apache License, Version 2.0 (the
+	 * "License"); you may not use this file except in compliance
+	 * with the License.  You may obtain a copy of the License at
+         *
+	 *   http://www.apache.org/licenses/LICENSE-2.0
+	 * 
+	 * Unless required by applicable law or agreed to in writing,
+	 * software distributed under the License is distributed on an
+	 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+	 * KIND, either express or implied.  See the License for the
+	 * specific language governing permissions and limitations
+	 * under the License.
+	 ***************************************************************
+   -->
+   
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier">
+  <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+  <primitive>false</primitive>
+  <delegateAnalysisEngineSpecifiers>
+    
+    <delegateAnalysisEngine key="4TestAggregateCM">
+      <import location="4TestAggregateCM.xml"/>
+    </delegateAnalysisEngine>
+
+  
+  </delegateAnalysisEngineSpecifiers>
+  <analysisEngineMetaData>
+    <name>Test Aggregate TAE</name>
+    <description>Detects Nothing</description>
+    <configurationParameters searchStrategy="language_fallback">
+      <configurationParameter>
+        <name>ActionAfterCasMultiplier</name>
+        <description>The action to be taken after a CAS has been input to a CAS Multiplier and the CAS Multiplier has finished processing it.
+		 Valid values are:
+			continue - the CAS continues on to the next element in the flow
+			stop - the CAS will no longer continue in the flow, and will be returned from the aggregate if possible.
+			drop - the CAS will no longer continue in the flow, and will be dropped (not returned from the aggregate) if possible.	 
+			dropIfNewCasProduced (the default) - if the CAS multiplier produced a new CAS as a result of processing this CAS, then this
+				CAS will be dropped.  If not, then this CAS will continue.</description>
+        <type>String</type>
+        <multiValued>false</multiValued>
+        <mandatory>false</mandatory>
+        <overrides>
+          <parameter>FixedFlowController/ActionAfterCasMultiplier</parameter>
+        </overrides>
+      </configurationParameter>
+    </configurationParameters>
+    
+    <configurationParameterSettings>
+      <nameValuePair>
+        <name>ActionAfterCasMultiplier</name>
+        <value>
+          <string>continue</string>
+        </value>
+      </nameValuePair>
+    </configurationParameterSettings>
+    <flowConstraints>
+      <fixedFlow>
+        <node>4TestAggregateCM</node>
+      </fixedFlow>
+    </flowConstraints>
+    <capabilities>
+      <capability>
+        <inputs/>
+        <outputs>
+        </outputs>
+        <languagesSupported>
+          <language>en</language>
+        </languagesSupported>
+      </capability>
+    </capabilities>
+	<operationalProperties>
+		<modifiesCas>true</modifiesCas>
+		<multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+		<outputsNewCASes>false</outputsNewCASes>
+	</operationalProperties>
+  </analysisEngineMetaData>
+</analysisEngineDescription>

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java Mon May  7 21:27:43 2018
@@ -138,6 +138,7 @@ public class UimaAsPriorityBasedThreadFa
              
 			  if ( !initFailed && !controller.getState().equals(ServiceState.FAILED) ) {
             	  try {
+            		  System.out.println(".....UimaAsPriorityBasedThreadFactory.run() - callint AE.initialize() - Thread:"+Thread.currentThread().getId());
             		  ((PrimitiveAnalysisEngineController)controller).initializeAnalysisEngine();
             	  } catch( Exception e) {
             		  initFailed = true;
@@ -148,6 +149,7 @@ public class UimaAsPriorityBasedThreadFa
             	  return; // there was failure previously so just return
               }
             }
+            System.out.println("............ Worker Thread Waiting for messages");
             // runs forever until controll is stopped
             while (!controller.isStopped()) {
             	// block until a message arrives or timeout. On timeout, the pool returns null
@@ -157,6 +159,7 @@ public class UimaAsPriorityBasedThreadFa
             		// nothing received, try again
             		continue;
             	}
+            	System.out.println(">>>>>>>>>>>>>>>>>> GOT MESSAGE .....");
             	// 'poison pill' sent when controller stops
             	if (m.getMessage() == null
             			&& m.getSemaphore() == null

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java Mon May  7 21:27:43 2018
@@ -27,6 +27,7 @@ import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
+import org.apache.uima.as.client.DirectListener.DirectListenerCallback;
 import org.apache.uima.util.Level;
 
 /**
@@ -49,7 +50,7 @@ public class UimaAsThreadFactory impleme
   
   private boolean isDaemon=false;
   
-  public static AtomicInteger poolIdGenerator = new AtomicInteger();
+  public static final AtomicInteger poolIdGenerator = new AtomicInteger();
   
   private final int poolId = poolIdGenerator.incrementAndGet();
   
@@ -59,6 +60,8 @@ public class UimaAsThreadFactory impleme
   
   private CountDownLatch latchToCountNumberOfInitedThreads;
 
+  private DirectListenerCallback callback = null;
+  
   public UimaAsThreadFactory() {
 	  
   }
@@ -75,10 +78,16 @@ public class UimaAsThreadFactory impleme
     this.latchToCountNumberOfTerminatedThreads = latchToCountNumberOfTerminatedThreads;
     this.latchToCountNumberOfInitedThreads = latchToCountNumberOfInitedThreads;
   }
+  
+  public UimaAsThreadFactory withCallback(DirectListenerCallback c) {
+	  callback = c;
+	  return this;
+  }
   public UimaAsThreadFactory withThreadGroup(ThreadGroup tGroup) {
 	  theThreadGroup = tGroup;
 	  return this;
   }
+  
   public UimaAsThreadFactory withPrimitiveController(PrimitiveAnalysisEngineController aController) {
 	  controller = aController;
 	  return this;
@@ -137,6 +146,9 @@ public class UimaAsThreadFactory impleme
             	  } catch( Exception e) {
             		  initFailed = true;
             		  e.printStackTrace();
+            		  if ( callback != null ) {
+            			  callback.onInitializationError(e);
+            		  }
             		  throw e;
             	  }  finally {
             		  if ( latchToCountNumberOfInitedThreads != null ) {

Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java?rev=1831129&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java Mon May  7 21:27:43 2018
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.client;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+
+public class UimaAS {
+/*	
+	private UimaAS() {
+		
+	}
+	
+	private static class UimaAsSingelton {
+		private UimaAsSingelton(){}
+		private static final UimaAS instance = 
+				new UimaAS();
+	}
+
+	public static UimaAS getInstance() {
+		return UimaAsSingelton.instance;
+	}
+	*/
+	public static UimaAsynchronousEngine newInstance(Transport transport) 
+			throws ClassNotFoundException, NoSuchMethodException, 
+			InstantiationException, IllegalAccessException, InvocationTargetException{
+		Class<?>[] type = {Transport.class};
+		Class<?> uimaClientClz = 
+				Class.forName("org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl");
+		Constructor<?> constructor = uimaClientClz.getConstructor(type);
+		Object[] argInstance = {transport};
+		//return (UimaAsynchronousEngine)uimaClientClz.newInstance();
+		return (UimaAsynchronousEngine)constructor.newInstance(argInstance);
+	}
+	/*
+	public static UimaAsynchronousEngine newJmsClient() 
+			throws ClassNotFoundException, NoSuchMethodException, 
+			InstantiationException, IllegalAccessException, 
+			InvocationTargetException, InvocationTargetException {
+		UimaAsynchronousEngine client = newClient(Transport.JMS);
+		return client;
+	}
+	public static UimaAsynchronousEngine newJavaClient() 
+			throws ClassNotFoundException, NoSuchMethodException, 
+			InstantiationException, IllegalAccessException, InvocationTargetException {
+		UimaAsynchronousEngine client = newClient(Transport.Java);
+		return client;
+	}
+	*/
+}

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java Mon May  7 21:27:43 2018
@@ -148,6 +148,9 @@ public interface UimaAsynchronousEngine
    */
   public static final String GetMetaTimeout = "GetMetaTimeout";
 
+  public static final String Protocol = "Protocol";
+  
+  public static final String Provider = "Provider";
   /**
    * Path to the XSLT processor to use when processing a deployment descriptor. The application provides it to the Uima AS 
    * client via System property, either on a command line using -D, or explicitly by using java's 
@@ -439,7 +442,15 @@ public interface UimaAsynchronousEngine
    */
   public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
           throws Exception;
-
+  
+  /**
+   * Undeploys all UIMA AS services deployed by this client.This method is synchronous
+   * and will block until all deployed services are destroyed.
+   * 
+   * 
+   * @throws Exception error
+   */
+  public void undeploy() throws Exception;
   /**
    * Undeploys specified UIMA AS container and all services running within it. Each UIMA AS
    * container has a unique id assigned to it during the deploy phase. This method is synchronous

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Mon May  7 21:27:43 2018
@@ -452,6 +452,7 @@ implements
     } catch (Exception e) {
       throw new AsynchAEException(e);
     }
+    System.out.println("------------------ "+getKey()+".processCollectionCompleteReplyFromDelegate() finished processing CPC reply from Delegate:"+aDelegateKey); 
   }
 
   private void sendCpcReply(Endpoint aClientEndpoint) throws Exception {
@@ -918,6 +919,7 @@ implements
           getInProcessCache().getCacheEntryForCAS(aNewCasReferenceId).setNewCas(true,
                   getComponentName());
           getLocalCache().lookupEntry(anInputCasReferenceId).decrementOutstandingFlowCounter();
+          
         } else {
           throw new AsynchAEException(
                   "Flow Object Not In Flow Cache. Expected Flow Object in FlowCache for Cas Reference Id:"
@@ -1117,8 +1119,12 @@ implements
             //  Controller. Release the semaphore immediately after acquiring it. This semaphore is 
             //  no longer needed. This synchronization is only necessary for blocking the parent
             //  CAS until all child CASes acquire their Flow objects.
+        	String cn = (getKey() != null ) ? getKey() : getComponentName();
+        	System.out.println("++++++++++++ Waiting to acquire semaphore - CAS:"+casStateEntry.getCasReferenceId()+" Controller:"+cn+" ThreadId:"+Thread.currentThread().getId());  
             casStateEntry.acquireFlowSemaphore();
             casStateEntry.releaseFlowSemaphore();
+        	System.out.println("++++++++++++ Releasedsemaphore - CAS:"+casStateEntry.getCasReferenceId()+" Controller:"+cn+" ThreadId:"+Thread.currentThread().getId());  
+
             if ( lastDelegateEndpoint.processParentLast()) {
               synchronized (super.finalStepMux) {
                 // Determine if the CAS should be held until all its children leave this aggregate.
@@ -1818,13 +1824,11 @@ implements
                     "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                     "UIMAEE_drop_cas_debug_FINEST",
                     new Object[] { getComponentName(), aStep.getForceCasToBeDropped(), aCasReferenceId, casStateEntry.isReplyReceived() });
-        
-        if (forceToDropTheCas(parentCasStateEntry, cacheEntry, aStep)) {
+       if (forceToDropTheCas(parentCasStateEntry,  cacheEntry, aStep)) {
         	
         	
             
-   
-        	
+       	
         	if (casStateEntry.isReplyReceived()) {
             if (isSubordinate) {
               // drop the flow since we no longer need it
@@ -2024,21 +2028,21 @@ implements
     return retValue;
   }
 
-  private boolean forceToDropTheCas(CasStateEntry entry, CacheEntry cacheEntry, FinalStep aStep) {
+  private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) {
    
 	  // Get the key of the Cas Producer
     String casProducer = cacheEntry.getCasProducerAggregateName();
     // CAS is considered new from the point of view of this service IF it was produced by it
     boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(
             casProducer));
-    if (entry != null && entry.isFailed() && isNewCas) {
+    if (parent != null && parent.isFailed() && isNewCas) {
       return true; // no point to continue if the CAS was produced in this aggregate and its parent
                    // failed here
     }
     // If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR
     // this component
     // is not a Cas Multiplier
-    if (isNewCas && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
+    if (isNewCas && parent.getSubordinateCasInPlayCount() == 0 && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
       return true;
     }
     return false;
@@ -2273,6 +2277,9 @@ implements
         // Find the top ancestor of this CAS. It is the input CAS sent by the client
 //        String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
         String inputCasId = casStateEntry.getInputCasReferenceId(); //getLocalCache().lookupInputCasReferenceId(casStateEntry);
+       
+        System.out.println("&&&&&&&&&&&&&&&&&&& sendReplyToCollocatedClient() - Sending CAS:"+casStateEntry.getCasReferenceId()+" Its Input CAS:"+inputCasId); 
+        
         // Modify the parent of this CAS.
         if (inputCasId != null ) {
 //          if ( !inputCasId.equals(casStateEntry.getParentCasReferenceId())) {

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Mon May  7 21:27:43 2018
@@ -22,6 +22,7 @@ package org.apache.uima.aae.controller;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.uima.UimaContext;
 import org.apache.uima.aae.AsynchAECasManager;
@@ -47,6 +48,7 @@ import org.apache.uima.as.client.DirectI
 import org.apache.uima.as.client.Listener;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.resource.ResourceSpecifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 public interface AnalysisEngineController extends ControllerLifecycle {
   public static final String CasPoolSize = "CasPoolSize";
@@ -64,7 +66,8 @@ public interface AnalysisEngineControlle
   public void setJmsInputChannel(InputChannel anInputChannel) throws Exception;
 
   public InputChannel getInputChannel(ENDPOINT_TYPE et);
-
+  public InputChannel getInputChannel();
+  
   public void addInputChannel(InputChannel anInputChannel) throws Exception;
 
   public String getServiceEndpointName();
@@ -77,7 +80,7 @@ public interface AnalysisEngineControlle
 
   public void takeAction(String anAction, String anEndpointName, ErrorContext anErrorContext);
 
-  public InputChannel getInputChannel();
+  public void setThreadFactory(ThreadPoolTaskExecutor factory);
 
   public List<Listener> getAllListeners();
 

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon May  7 21:27:43 2018
@@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 
 import javax.management.ObjectName;
 
@@ -69,6 +70,7 @@ import org.apache.uima.aae.error.ErrorHa
 import org.apache.uima.aae.error.ErrorHandlerChain;
 import org.apache.uima.aae.error.ForcedMessageTimeoutException;
 import org.apache.uima.aae.error.UimaAsUncaughtExceptionHandler;
+import org.apache.uima.aae.error.handler.CpcErrorHandler;
 import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
 import org.apache.uima.aae.jmx.JmxManagement;
 import org.apache.uima.aae.jmx.JmxManager;
@@ -101,6 +103,7 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.resource.Resource_ImplBase;
 import org.apache.uima.util.Level;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 public abstract class BaseAnalysisEngineController extends Resource_ImplBase implements
         AnalysisEngineController, EventSubscriber {
@@ -274,6 +277,8 @@ public abstract class BaseAnalysisEngine
   
   protected UimaContext uimaContext=null;
   
+  private ThreadPoolTaskExecutor threadFactory=null;
+  
   public abstract void dumpState(StringBuffer buffer, String lbl1);
   
   protected abstract void doWarmUp(CAS cas, String casReferenceId) throws Exception;
@@ -524,6 +529,10 @@ public abstract class BaseAnalysisEngine
 	  return uimaContext;
   }
 
+
+  public void setThreadFactory(ThreadPoolTaskExecutor factory) {
+	  threadFactory = factory;
+  }
   public String getPID() {
     return processPid;
   }
@@ -1539,7 +1548,10 @@ public abstract class BaseAnalysisEngine
     }
     List errorHandlerList = new ArrayList();
     errorHandlerList.add(new ProcessCasErrorHandler());
+//    errorHandlerList.add(new CpcErrorHandler(aDelegateMap))
     errorHandlerChain = new ErrorHandlerChain(errorHandlerList);
+    
+    
   }
 
   public void setErrorHandlerChain(ErrorHandlerChain errorHandlerChain) {
@@ -2071,6 +2083,12 @@ public abstract class BaseAnalysisEngine
       } catch (Exception e) {
       }
 
+    if ( threadFactory != null ) {
+    	
+    	// stop ThreadPoolFactory which handles priority based
+    	// messages between PriorityMessageHandler and InputChannel.
+    	threadFactory.shutdown();
+    }
     /*
      * Send an exception to the client if this is a top level service
      */
@@ -2338,6 +2356,7 @@ public abstract class BaseAnalysisEngine
   }
 
   public void terminate(Throwable cause, String aCasReferenceId) {
+//	  Thread.currentThread().dumpStack();
       if (stopLatch.getCount() > 0) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "terminate",
@@ -2758,6 +2777,7 @@ public abstract class BaseAnalysisEngine
     if (controllerListeners.isEmpty()) {
       return;
     }
+    boolean createTargetListener = true;
     for (int i = 0; i < controllerListeners.size(); i++) {
       // If there is an exception, notify listener with failure
       if (e != null) {
@@ -2767,7 +2787,10 @@ public abstract class BaseAnalysisEngine
     	 
               InputChannel ic = getInputChannel();
               try {
-                  ic.createListenerForTargetedMessages();
+       //     	  if ( createTargetListener ) {
+     //       		  createTargetListener = false; // only one needed
+   //                   ic.createListenerForTargetedMessages();
+      //      	  }
                   ((ControllerCallbackListener) controllerListeners.get(i))
                   .notifyOnInitializationSuccess(this);
               } catch( Exception ex) {

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Mon May  7 21:27:43 2018
@@ -516,7 +516,10 @@ public class LocalCache extends Concurre
       synchronized( monitor ) {
         if ( childCasOutstandingFlowCounter.incrementAndGet() == 1  ) {
           try {
+          	System.out.println("::::::::::  Waiting to acquire semaphore - CAS:"+getCasReferenceId()+" ThreadId:"+Thread.currentThread().getId());  
+
             acquireFlowSemaphore();
+          	System.out.println(":::::::::: acquired semaphore - CAS:"+getCasReferenceId()+" ThreadId:"+Thread.currentThread().getId());  
           } catch( InterruptedException e) {
           }
         }
@@ -535,6 +538,8 @@ public class LocalCache extends Concurre
             childCasOutstandingFlowCounter.decrementAndGet();
           }
           releaseFlowSemaphore();
+        	System.out.println(":::::::::: released semaphore - CAS:"+getCasReferenceId()+" ThreadId:"+Thread.currentThread().getId());  
+
         }
       }
     }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Mon May  7 21:27:43 2018
@@ -540,9 +540,14 @@ public class PrimitiveAnalysisEngineCont
     long start = super.getCpuTime();
     localCache.dumpContents();
     try {
+    	String delegateKey = getKey();
+    	System.out.println("...... "+delegateKey+".collectionProcessComplete() - calling checkout instance");;
       ae = aeInstancePool.checkout();
+  	System.out.println("...... "+delegateKey+".collectionProcessComplete() - got instance");;
       if (ae != null) {
         ae.collectionProcessComplete();
+      	System.out.println("...... "+delegateKey+".collectionProcessComplete() - ae.CPC() returned");;
+
       }
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(),
@@ -561,7 +566,11 @@ public class PrimitiveAnalysisEngineCont
         getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint, null, false);
       }
 */
+      
+    	System.out.println("...... "+delegateKey+".collectionProcessComplete() - trying to send CPC reply");;
+
       getOutputChannel(anEndpoint).sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint, null, false);
+    	System.out.println("...... "+delegateKey+".collectionProcessComplete() - sent CPC reply");;
 
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(),
@@ -570,6 +579,7 @@ public class PrimitiveAnalysisEngineCont
       }
 
     } catch (Exception e) {
+    	e.printStackTrace();
       ErrorContext errorContext = new ErrorContext();
       errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
       errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Mon May  7 21:27:43 2018
@@ -822,7 +822,7 @@ public abstract class Delegate {
    *          - command for which the timer is started
    */
   private void startDelegateGetMetaTimer(final String aCasReferenceId, final int aCommand) {
-	  Thread.dumpStack();
+	//  Thread.dumpStack();
     synchronized( getMetaTimerLock ) {
       final long timeToWait = getTimeoutValueForCommand(aCommand);
       Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Mon May  7 21:27:43 2018
@@ -398,6 +398,7 @@ public class ProcessCasErrorHandler exte
                     .getAction());
             if (disabledDueToExceededThreshold) {
               delegateKey = key;
+          	System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!! DISABLE LISTENER FOR DELEGATE:"+key);
               anErrorContext.add(AsynchAEMessage.SkipPendingLists, "true");
             }
             if (ErrorHandler.TERMINATE.equalsIgnoreCase(threshold.getAction())) {

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java Mon May  7 21:27:43 2018
@@ -29,6 +29,10 @@ public interface UimaASService {
 		Asynchronous,
 		Synchronous
 	};
+	public static final int QUIESCE_AND_STOP = 1000;
+
+	public static final int STOP_NOW = 1001;
+	  
 	public String getEndpoint();
 	public String getId();
 	public void start() throws Exception;