You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC

svn commit: r1825401 [1/11] - in /uima/uima-as/branches/uima-as-3: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/main/java/org/apache/uima...

Author: cwiklik
Date: Mon Feb 26 18:54:11 2018
New Revision: 1825401

URL: http://svn.apache.org/viewvc?rev=1825401&view=rev
Log:
UIMA-5280 modified to not depend on dd2spring, refactored code to support calling a service without a broker, many internal changes to support local transport better

Added:
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaASClientDirectMessage.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaASClientMessage.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/AbstractUimaASService.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/AsynchronousUimaASService.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/ScaleoutSpecification.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/ServiceNotFoundException.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/ServiceRegistry.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaAsServiceRegistry.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java
Modified:
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/JmxManagement.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/JmxManager.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
    uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
    uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java Mon Feb 26 18:54:11 2018
@@ -67,9 +67,9 @@ import org.springframework.jms.listener.
  * 
  */
 public class ConcurrentMessageListener implements SessionAwareMessageListener {
-  private static final Class CLASS_NAME = ConcurrentMessageListener.class;
+  private static final Class<?> CLASS_NAME = ConcurrentMessageListener.class;
 
-  private SessionAwareMessageListener delegateListener;
+  private SessionAwareMessageListener<Message> delegateListener;
 
   private int concurrentThreadCount = 0;
 
@@ -98,14 +98,14 @@ public class ConcurrentMessageListener i
    *          - JmsInputChannel instance to delegate CAS to
    * @throws InvalidClassException
    */
-  public ConcurrentMessageListener(int concurrentThreads, Object delegateListener, String destination, ThreadGroup threadGroup, String threadPrefix)
+  public ConcurrentMessageListener(int concurrentThreads,  SessionAwareMessageListener<Message> inputChannel, String destination, ThreadGroup threadGroup, String threadPrefix)
           throws InvalidClassException {
-    if (!(delegateListener instanceof SessionAwareMessageListener)) {
-      throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"
-              + SessionAwareMessageListener.class + " Received:" + delegateListener.getClass());
-    }
+//    if (!(delegateListener instanceof SessionAwareMessageListener)) {
+//      throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"
+//              + SessionAwareMessageListener.class + " Received:" + delegateListener.getClass());
+//    }
     concurrentThreadCount = concurrentThreads;
-    this.delegateListener = (SessionAwareMessageListener) delegateListener;
+    this.delegateListener = inputChannel;
     if (concurrentThreads > 1) {
       //  created an unbounded queue. The throttling is controlled by the
       //  semaphore in the UimaBlockingExecutor initialized below
@@ -140,7 +140,34 @@ public class ConcurrentMessageListener i
   private boolean isMessageFromCasMultiplier(final Message message) throws JMSException {
     return message.propertyExists(AsynchAEMessage.CasSequence);
   }
+  private void incrementChildCASes(String parentCASid) {
+      try {
+//          String parentCasReferenceId = message
+//                  .getStringProperty(AsynchAEMessage.InputCasReference);
+          // Fetch parent CAS entry from the local cache
+          CasStateEntry parentEntry = controller.getLocalCache().lookupEntry(parentCASid);
+          // increment number of child CASes this parent has in play
+          parentEntry.incrementSubordinateCasInPlayCount();
+          //  increment a counter that counts number of child CASes that have no
+          //  flow object yet. The flow object is created for each child CAS from
+          //  the parent flow object. The method below will actually acquire a 
+          //  permit from a binary semaphore to force the parent to block until
+          //  the last of its children acquires its Flow object.
+          parentEntry.incrementOutstandingFlowCounter();
 
+        } catch (Exception e) {
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+            
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                    "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAEE_service_exception_WARNING", controller.getComponentName());
+            
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                    "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_exception__WARNING", e);
+          }
+        }
+  }
   /**
    * Intercept a message to increment a child count of the input CAS. This method is always called
    * in a single thread, guaranteeing order of processing. The child CAS will always come here
@@ -150,12 +177,16 @@ public class ConcurrentMessageListener i
    * 
    */
   public void onMessage(final Message message, final Session session) throws JMSException {
-    try {
-      // Wait until the controller is plugged in
-      controllerLatch.await();
-    } catch (InterruptedException e) {
-    }
+	  System.out.println("..........ConcurrentMessageListener.onMessage() got message");
+
+//	  try {
+//      // Wait until the controller is plugged in
+//      controllerLatch.await();
+//    } catch (InterruptedException e) {
+//    }
     if (isMessageFromCasMultiplier(message)) {
+    	  System.out.println("..........ConcurrentMessageListener.onMessage() message from remote CM");
+
       // Check if the message came from a Cas Multiplier and it contains a new Process Request
       int command = message.getIntProperty(AsynchAEMessage.Command);
       int messageType = message.getIntProperty(AsynchAEMessage.MessageType);
@@ -171,6 +202,8 @@ public class ConcurrentMessageListener i
             delegate.setConcurrentConsumersOnReplyQueue();
           }
         }
+        incrementChildCASes(message.getStringProperty(AsynchAEMessage.InputCasReference));
+/*
         try {
           String parentCasReferenceId = message
                   .getStringProperty(AsynchAEMessage.InputCasReference);
@@ -197,6 +230,7 @@ public class ConcurrentMessageListener i
                     "UIMAJMS_exception__WARNING", e);
           }
         }
+        */
       }
 
     }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Mon Feb 26 18:54:11 2018
@@ -55,6 +55,7 @@ import org.apache.uima.aae.UIMAEE_Consta
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.BaseAnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
 import org.apache.uima.aae.delegate.Delegate;
@@ -124,8 +125,9 @@ public class JmsEndpointConnection_impl
     } else {
       //  If this is a reply to a client, use the same broker URL that manages this service input queue.
       //  Otherwise this is a request so use a broker specified in the endpoint object.
-      serverUri = (anEndpoint.isReplyEndpoint()) ? 
-              ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() : anEndpoint.getServerURI();
+//      serverUri = (anEndpoint.isReplyEndpoint()) ? 
+//              ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() : anEndpoint.getServerURI();
+      serverUri = anEndpoint.getServerURI();
     }
     isReplyEndpoint = anEndpoint.isReplyEndpoint();
     controller = aController;
@@ -183,7 +185,7 @@ public class JmsEndpointConnection_impl
 		        // If replying to http request, reply to a queue managed by this service broker using tcp
 		        // protocol
 		        if (isReplyEndpoint && brokerUri.startsWith("http")) {
-		          brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
+		          brokerUri = ((JmsOutputChannel) aController.getOutputChannel(ENDPOINT_TYPE.JMS)).getServerURI();
 
 		          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
 		            UIMAFramework.getLogger(CLASS_NAME).logrb(
@@ -193,7 +195,7 @@ public class JmsEndpointConnection_impl
 		                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
 		                    "UIMAJMS_override_connection_to_endpoint__FINE",
 		                    new Object[] {  aComponentName, getEndpoint(),
-		                      ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
+		                      ((JmsOutputChannel) aController.getOutputChannel(ENDPOINT_TYPE.JMS)).getServerURI() });
 		          }
 		        } else if ( !brokerUri.startsWith("http") && !brokerUri.startsWith("failover") && !brokerUri.startsWith("vm://localhost?broker.persistent=false")){
 				  String prefix = "";
@@ -772,7 +774,7 @@ public class JmsEndpointConnection_impl
     		        }
     	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
     	                  "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-    	                  "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), key, endpointName});
+    	                  "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), endpointName,serverUri});
     		  }
    		      CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
    		      // Mark the CAS as failed so that the CAS is released and cache cleaned up
@@ -889,14 +891,14 @@ public class JmsEndpointConnection_impl
    * @param delegateKey
    * @throws Exception
    */
-  private void createListener(String delegateKey) throws Exception {
-    if (controller instanceof AggregateAnalysisEngineController) {
-      // Fetch an InputChannel that handles messages for a given delegate
-      InputChannel iC = controller.getReplyInputChannel(delegateKey);
-      // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
-      iC.createListener(delegateKey, null);
-    }
-  }
+//  private void createListener(String delegateKey) throws Exception {
+//    if (controller instanceof AggregateAnalysisEngineController) {
+//      // Fetch an InputChannel that handles messages for a given delegate
+//      InputChannel iC = controller.getReplyInputChannel(delegateKey);
+//      // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
+//      iC.createListener(delegateKey, null);
+//    }
+//  }
 
   private synchronized boolean handleJmsException(JMSException ex) {
     if (!failed) {
@@ -990,7 +992,7 @@ public class JmsEndpointConnection_impl
 			if ( casState.isSubordinate()) {
 				try {
 					
-					String inputCasId = casState.getInputCasReferenceId();
+					String inputCasId = casState.getParentCasReferenceId();
 					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
 						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
 								"UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Mon Feb 26 18:54:11 2018
@@ -41,8 +41,10 @@ import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.Endpoint_impl;
@@ -59,8 +61,11 @@ import org.apache.uima.aae.message.Messa
 import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.message.JmsMessageContext;
+import org.apache.uima.as.client.Listener;
+import org.apache.uima.as.client.Listener.Type;
 import org.apache.uima.util.Level;
 import org.springframework.jms.listener.SessionAwareMessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 /**
  * Thin adapter for receiving JMS messages from Spring. It delegates processing of all messages to
@@ -69,17 +74,17 @@ import org.springframework.jms.listener.
  * 
  */
 public class JmsInputChannel implements InputChannel, JmsInputChannelMBean,
-        SessionAwareMessageListener {
+        SessionAwareMessageListener<Message> {
   /**
 	 * 
 	 */
   private static final long serialVersionUID = -3318400773113552290L;
 
-  private static final Class CLASS_NAME = JmsInputChannel.class;
+  private static final Class<?> CLASS_NAME = JmsInputChannel.class;
 
-  private transient final CountDownLatch msgHandlerLatch = new CountDownLatch(1);
+  private final transient CountDownLatch msgHandlerLatch = new CountDownLatch(1);
 
-  private transient final CountDownLatch controllerLatch = new CountDownLatch(1);
+  private final transient  CountDownLatch controllerLatch = new CountDownLatch(1);
 
   // Reference to the first Message Handler in the Chain.
   private transient Handler handler;
@@ -92,9 +97,9 @@ public class JmsInputChannel implements
 
   private int sessionAckMode;
 
-  private transient UimaDefaultMessageListenerContainer messageListener;
+  //private transient UimaDefaultMessageListenerContainer messageListener;
 
-  private transient Session jmsSession;
+  //private transient Session jmsSession;
 
   private String brokerURL = "";
 
@@ -110,21 +115,37 @@ public class JmsInputChannel implements
 
   private transient RemoteJMXServer remoteJMXServer = null;
   //  synchronizes initialization of RemotBroker
-  private Object brokerMux = new Object();
+ // private Object brokerMux = new Object();
   
-  private ConcurrentHashMap<String, UimaDefaultMessageListenerContainer> failedListenerMap = new ConcurrentHashMap<String, UimaDefaultMessageListenerContainer>();
+  private ConcurrentHashMap<String, Listener> failedListenerMap = 
+		  new ConcurrentHashMap<>();
 
   //  A global flag that determines if we should create a connection to broker's MBeanServer to be
   //  able to determine if client's reply queue exists before processing a CAS. 
   public static transient boolean attachToBrokerMBeanServer=true;
   
-  private static Map<String, List<UimaDefaultMessageListenerContainer>> listenerMap =
-          new ConcurrentHashMap<String, List<UimaDefaultMessageListenerContainer>>();
+  private ENDPOINT_TYPE type = ENDPOINT_TYPE.JMS;
 
+  private static Map<String, List<Listener>> listenerMap =
+          new ConcurrentHashMap<>();
+
+  private ChannelType channelType;
+  
+  public JmsInputChannel(ChannelType type) {
+	  this.channelType = type;
+  }
+  public ChannelType getChannelType() {
+	  return channelType;
+  }
+  
   public AnalysisEngineController getController() {
     return controller;
   }
-
+  public ENDPOINT_TYPE getType() {
+	  return type;
+  }
+  
+  
   public String getName() {
     return endpointName;
   }
@@ -652,18 +673,18 @@ public class JmsInputChannel implements
                 .getIntProperty(AsynchAEMessage.MessageType));
       
         String msgSentFromIP = null;
-
+     //   System.out.println("<<<<<<<<<<<<< Service:"+ getController().getComponentName()+ " Received JMS Message - command:"+command+" type:"+messageType);
         if (aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response
                 && aMessage.propertyExists(AsynchAEMessage.ServerIP)) {
           msgSentFromIP = aMessage.getStringProperty(AsynchAEMessage.ServerIP);
         }
-        // System.out.println("***********************************************************************************"
-        // +
-        // "           \n**CONTROLLER::"+controller.getName()+"**** Received New Message From [ "+aMessage.getStringProperty(AsynchAEMessage.MessageFrom)+" ]**************"
-        // +
-        // "           \n**MSGTYPE::"+messageType+" COMMAND:"+command +
-        // " Cas Reference Id::"+casRefId+
-        // "           \n******************************************************************************");
+         System.out.println("***********************************************************************************"
+         +
+         "           \n**CONTROLLER::"+controller.getName()+"**** Received New Message From [ "+aMessage.getStringProperty(AsynchAEMessage.MessageFrom)+" ]**************"
+         +
+         "           \n**MSGTYPE::"+messageType+" COMMAND:"+command +
+        " Cas Reference Id::"+casRefId+
+        "           \n******************************************************************************");
 
         String msgFrom = (String) aMessage.getStringProperty(AsynchAEMessage.MessageFrom);
         if (controller != null && msgFrom != null) {
@@ -674,7 +695,7 @@ public class JmsInputChannel implements
         	if ( ackClient(messageContext)  ) {
         		try {
         			//	Any exception while sending an ACK results in a dropped request
-        			getController().getOutputChannel().sendReply(AsynchAEMessage.ServiceInfo,
+        			getController().getOutputChannel(ENDPOINT_TYPE.JMS).sendReply(AsynchAEMessage.ServiceInfo,
                             messageContext.getEndpoint(), aMessage.getStringProperty(AsynchAEMessage.CasReference), true);
         		} catch( Exception ex) {
         			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
@@ -800,30 +821,31 @@ public class JmsInputChannel implements
   public String getServerUri() {
     return brokerURL;
   }
- public List<UimaDefaultMessageListenerContainer> registerListener(UimaDefaultMessageListenerContainer messageListener) {
-	    List<UimaDefaultMessageListenerContainer> ll = null;
+ public List<Listener> registerListener(Listener messageListener) {
+	    List<Listener> ll = null;
 	    if ( listenerMap.containsKey(getController().getComponentName()+getController().hashCode()) ) {
 	      ll = listenerMap.get(getController().getComponentName()+getController().hashCode());
 	    } else {
-	      ll = new CopyOnWriteArrayList<UimaDefaultMessageListenerContainer>();
+	      ll = new CopyOnWriteArrayList<>();
 	      listenerMap.put(getController().getComponentName()+getController().hashCode(), ll);
 	    }
 	    if (!ll.contains(messageListener)) {
 	      ll.add(messageListener);
 	    } else {
 	    }
-	    for(Object container: ll ) {
-	      UimaDefaultMessageListenerContainer c = (UimaDefaultMessageListenerContainer) container;
-	    }
+//	    for(Object container: ll ) {
+//	      UimaDefaultMessageListenerContainer c = (UimaDefaultMessageListenerContainer) container;
+//	    }
 	    return ll;
   }
-  public List<UimaDefaultMessageListenerContainer> getListeners() {
-	    List<UimaDefaultMessageListenerContainer> ll = null;
+  public List<Listener> getListeners() {
+	    List<Listener> ll = null;
 	    if ( listenerMap.containsKey(getController().getComponentName()+getController().hashCode()) ) {
 	      ll = listenerMap.get(getController().getComponentName()+getController().hashCode());
 	    }
 	    return ll;
   }
+  /*
   public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer jmsL) {
     this.messageListener = jmsL;
     System.setProperty("BrokerURI", messageListener.getBrokerUrl());
@@ -844,7 +866,13 @@ public class JmsInputChannel implements
       }
     } 
   }
-
+*/
+  public synchronized void addListenerContainer(UimaDefaultMessageListenerContainer messageListener) {
+	// Add new message listener to the list
+	  registerListener(messageListener);
+	 
+  }
+  /*
   public ActiveMQConnectionFactory getConnectionFactory() {
     if (messageListener == null) {
       return null;
@@ -852,7 +880,7 @@ public class JmsInputChannel implements
       return (ActiveMQConnectionFactory) messageListener.getConnectionFactory();
     }
   }
-
+*/
   public void ackMessage(MessageContext aMessageContext) {
     if (aMessageContext != null && sessionAckMode == Session.CLIENT_ACKNOWLEDGE) {
       try {
@@ -877,15 +905,7 @@ public class JmsInputChannel implements
   }
 
   public String getInputQueueName() {
-    if (messageListener != null)
-      if (messageListener.getDestination() != null) {
-        return messageListener.getDestination().toString();
-      } else {
-        return messageListener.getDestinationName();// getEndpointName();
-      }
-    else {
-      return "";
-    }
+     return endpointName;
   }
 
   public ServiceInfo getServiceInfo() {
@@ -914,7 +934,47 @@ public class JmsInputChannel implements
               .setBrokerURL(brokerURL);
     }
   }
+  private void stopListener(final UimaDefaultMessageListenerContainer mL) throws Exception {
+	  System.out.println(".... "+getController().getComponentName()+" Stopping Listener Type:"+mL.getType());
+	  mL.setTerminating();
+	  mL.setAcceptMessagesWhileStopping(false);
+
+	  mL.stop();
+	  System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Calling destroy()");
+	  System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Done Calling destroy()");
+
+	  if ( mL.getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
+		  System.out.println(".... "+getController().getComponentName()+" Stopping ThreadPoolTaskExecutor");
+		  ThreadPoolTaskExecutor tpe = ((ThreadPoolTaskExecutor)mL.getTaskExecutor());
+		  tpe.destroy();
 
+	  } else {
+		  System.out.println(".... "+getController().getComponentName()+" ActiveConsumerCount:"+mL.getActiveConsumerCount());
+		  if ( mL.getTaskExecutor() != null ) {
+			  System.out.println(".... "+getController().getComponentName()+" TaskExecutor Class:"+ mL.getTaskExecutor().getClass().getName());
+		  } else {
+			  //((ActiveMQDestination)mL.getDestination()).;
+			  //mL.getTransport().;
+			 // mL.notifyAll();
+			  if ( mL.getActiveConsumerCount() > 0 ) {
+				  Thread.dumpStack();
+			  }
+			  System.out.println(".... "+getController().getComponentName()+" Destroying Listener");
+			  mL.doDestroy();
+			  System.out.println(".... "+getController().getComponentName()+" ActiveConsumerCount:"+mL.getActiveConsumerCount());
+
+		  }
+
+		  
+		 
+		  
+	  }
+	  System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Stopped Task Executor");
+	  mL.shutdown();
+
+		System.out.println("Stopped Listener ");
+
+  }
   private void stopChannel(UimaDefaultMessageListenerContainer mL, boolean shutdownNow) throws Exception {
     String eName = mL.getEndpointName();
     if (eName != null) {
@@ -924,9 +984,9 @@ public class JmsInputChannel implements
                 new Object[] { eName, shutdownNow });
       }
     }
-    mL.delegateStop();
+   // mL.delegateStop();
     if (shutdownNow) {
-        mL.destroy(shutdownNow);
+    	stopListener(mL);
     }
    
     String selector = "";
@@ -958,7 +1018,7 @@ public class JmsInputChannel implements
   public void stop(boolean shutdownNow) throws Exception {
     stop(InputChannel.CloseAllChannels, shutdownNow);
     listenerContainerList.clear();
-    List<UimaDefaultMessageListenerContainer> ll = getListeners();
+    List<Listener> ll = getListeners();
     if ( ll != null ) {
        ll.clear();
     }
@@ -969,26 +1029,31 @@ public class JmsInputChannel implements
     }
   }
   public void disconnectListenersFromQueue() throws Exception {
-	List<UimaDefaultMessageListenerContainer> ll = getListeners();
-	for (UimaDefaultMessageListenerContainer listenerObject : ll) {
-		  	 stopChannel(listenerObject, false);
+	List<Listener> ll = getListeners();
+	for (Listener listenerObject : ll) {
+		disconnectListenerFromQueue(listenerObject);
 	}	
 
   }
+  public void disconnectListenerFromQueue(Listener listener) throws Exception {
+	stopChannel((UimaDefaultMessageListenerContainer)listener, false);
+  }
   public void setTerminating() {
-    List<UimaDefaultMessageListenerContainer> ll = getListeners();
-    for (UimaDefaultMessageListenerContainer listenerObject : ll) {
-        listenerObject.setTerminating();
+    List<Listener> ll = getListeners();
+    for (Listener listenerObject : ll) {
+    	((UimaDefaultMessageListenerContainer)listenerObject).setTerminating();
       } 
 
   }
   public void terminate() {
 	 try {
-	    List<UimaDefaultMessageListenerContainer> ll = getListeners();
-	    for (UimaDefaultMessageListenerContainer listenerObject : ll) {
-		      listenerObject.closeConnection();
+	    List<Listener> ll = getListeners();
+	    for (Listener listenerObject : ll) {
+	    	UimaDefaultMessageListenerContainer lc =
+	    			(UimaDefaultMessageListenerContainer)listenerObject;
+		      lc.closeConnection();
 		      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-		        String msg = "................. JmsInputChannel.terminate() - Listener -"+listenerObject.getDestinationName();
+		        String msg = "................. JmsInputChannel.terminate() - Listener -"+lc.getDestinationName();
                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "terminate",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
                   new Object[] { msg });
@@ -1004,8 +1069,8 @@ public class JmsInputChannel implements
 	  
   }
   public synchronized void stop(int channelsToClose, boolean shutdownNow) throws Exception {
-	  List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
-      List<UimaDefaultMessageListenerContainer> ll = getListeners();
+	//  List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
+      List<Listener> ll = getListeners();
       String msg = ">>>>>>>>>>>> JmsInputChannel. stop() - Controller:"+controller.getComponentName()+ " Listener Count:"+ll.size();
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stop",
@@ -1013,13 +1078,14 @@ public class JmsInputChannel implements
                   new Object[] { msg });
       }
 
-      for (UimaDefaultMessageListenerContainer listenerObject : ll) {
-
-       if (listenerObject != null && doCloseChannel(listenerObject, channelsToClose)) {
-           listenerObject.setRecoveryInterval(0);  // https://issues.apache.org/jira/browse/UIMA-3437
-          listenerObject.setTerminating();
-          listenerObject.setAutoStartup(false);
-          stopChannel(listenerObject, shutdownNow);
+      for (Listener listenerObject : ll) {
+    	  UimaDefaultMessageListenerContainer lc =
+    			  (UimaDefaultMessageListenerContainer)listenerObject;
+       if (listenerObject != null && doCloseChannel(lc, channelsToClose)) {
+           lc.setRecoveryInterval(0);  // https://issues.apache.org/jira/browse/UIMA-3437
+          lc.setTerminating();
+          lc.setAutoStartup(false);
+          stopChannel(lc, shutdownNow);
 
       } else {
         if (getController() != null) {
@@ -1041,9 +1107,9 @@ public class JmsInputChannel implements
     return stopped;
   }
 
-  public int getConcurrentConsumerCount() {
-    return messageListener.getConcurrentConsumers();
-  }
+//  public int getConcurrentConsumerCount() {
+//    return messageListener.getConcurrentConsumers();
+//  }
 
   private void testIfBrokerRunning(String aBrokerUrl) throws Exception {
     ActiveMQConnectionFactory f = new ActiveMQConnectionFactory(aBrokerUrl);
@@ -1061,7 +1127,8 @@ public class JmsInputChannel implements
     }
   }
   public void createListenerOnTempQueue(ConnectionFactory cf, boolean isFreeCasDestination ) throws Exception {
-	  TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName());
+	  TempDestinationResolver resolver = 
+			  new TempDestinationResolver(controller.getComponentName(),"");
 	  UimaDefaultMessageListenerContainer connector = new UimaDefaultMessageListenerContainer(true);
 	  connector.setConnectionFactory(cf);
 	  resolver.setListener(connector);
@@ -1069,9 +1136,9 @@ public class JmsInputChannel implements
 	  connector.setDestinationResolver(resolver);
 	  connector.setController(getController());
 	  connector.setMessageListener(this);
-	  connector.initializeContainer();
+	  //connector.initializeContainer();
 	  connector.getDestination();
-	  connector.afterPropertiesSet(false);
+	  connector.afterPropertiesSet();
 	  UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,getController().getComponentName()+"-JmsInputChannel.createListenerOnTempQueue()-starting new Listener" );
 	  connector.start();
 	  boolean log = true;
@@ -1103,10 +1170,10 @@ public class JmsInputChannel implements
 	  }
 
 	  if ( isFreeCasDestination ) {
-		  ((JmsOutputChannel) getController().getOutputChannel())
+		  ((JmsOutputChannel) getController().getOutputChannel(ENDPOINT_TYPE.JMS))
           .setFreeCasQueue(connector.getListenerEndpoint());
 	  }
-      setListenerContainer(connector);
+      addListenerContainer(connector);
 
 	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
 		  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
@@ -1116,7 +1183,7 @@ public class JmsInputChannel implements
 	  }
   }
   public void createListenerForTargetedMessages() throws Exception {
-	  List<UimaDefaultMessageListenerContainer> listeners =
+	  List<Listener> listeners =
 			  getListeners();
 	  // the TargetServiceId property value will become part of a jms selector. 
 	  String targetStringSelector = "";
@@ -1130,49 +1197,63 @@ public class JmsInputChannel implements
 	  // find a listener instance which handles Process requests. The targeted 
 	  // listener created here will share a Connection Factory and ThreadFactory.
 	  // 
-	  for(UimaDefaultMessageListenerContainer listener : listeners ) {
-	      // Is this a Process listener instance? Check the selector
-		  if ( listener.getMessageSelector().endsWith(UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX) ) {
-	          // this will be a dedicated listener which handles targeted messages
-			  UimaDefaultMessageListenerContainer targetedListener = new UimaDefaultMessageListenerContainer();
-			  // setup jms selector
-			  if ( getController().isCasMultiplier()) {
-				  targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
- 	          } else {
- 				  targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
- 	          }
-			  
-			  // use shared ConnectionFactory
-	          targetedListener.setConnectionFactory(listener.getConnectionFactory());
-	          // mark the listener as a 'Targeted' listener
-	          targetedListener.setTargetedListener();
-	          targetedListener.setController(getController());
-	          // there will only be one AMQ delivery thread. Its job will be to
-	          // add a targeted message to a BlockingQueue. Such thread will block
-	          // in an enqueue if a dequeue is not available. This will be prevent
-	          // the overwhelming the service with messages.
-	          targetedListener.setConcurrentConsumers(1);
-			  if ( listener.getMessageListener() instanceof PriorityMessageHandler ) {
-				  // the targeted listener will use the same message handler as the
-				  // Process listener. This handler will add a message wrapper 
-				  // to enable prioritizing messages. 
-				  targetedListener.setMessageListener(listener.getMessageListener());
+	  for(Listener l : listeners ) {
+		  UimaDefaultMessageListenerContainer listener = 
+		     (UimaDefaultMessageListenerContainer) l;
+		  if ( listener.getTransport().equals(Transport.JMS)) {
+			  // Ignore listeners for replies (message selector = null)
+			  if ( listener.getMessageSelector() == null ) {
+				  continue;
 			  }
-			  // Same queue as the Process queue
-			  targetedListener.setDestination(listener.getDestination());
-	          registerListener(targetedListener);
-	          targetedListener.afterPropertiesSet();
-	          targetedListener.initialize();
-	          targetedListener.start();
-	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-	                    "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-	                    "UIMAJMS_TARGET_LISTENER__INFO",
-	                    new Object[] {targetedListener.getMessageSelector(), controller.getComponentName() });
-	          }
-	          break;
+		      // Is this a Process listener instance? Check the selector
+			  if ( listener.getMessageSelector().endsWith(UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX) ) {
+		          // this will be a dedicated listener which handles targeted messages
+				  UimaDefaultMessageListenerContainer targetedListener = new UimaDefaultMessageListenerContainer();
+				  // setup jms selector
+				  if ( getController().isCasMultiplier()) {
+					  targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+	 	          } else {
+	 				  targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+	 	          }
+				  
+				  // use shared ConnectionFactory
+		          targetedListener.setConnectionFactory(listener.getConnectionFactory());
+		          // mark the listener as a 'Targeted' listener
+		          targetedListener.setTargetedListener();
+		          targetedListener.setController(getController());
+		          // there will only be one delivery thread. Its job will be to
+		          // add a targeted message to a BlockingQueue. Such thread will block
+		          // in an enqueue if a dequeue is not available. This will be prevent
+		          // the overwhelming the service with messages.
+		  		  ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+				  threadExecutor.setCorePoolSize(1);
+				  threadExecutor.setMaxPoolSize(1);
+				  targetedListener.setTaskExecutor(threadExecutor);
+		          targetedListener.setConcurrentConsumers(1);
+				  if ( listener.getMessageListener() instanceof PriorityMessageHandler ) {
+					  // the targeted listener will use the same message handler as the
+					  // Process listener. This handler will add a message wrapper 
+					  // to enable prioritizing messages. 
+					  targetedListener.setMessageListener(listener.getMessageListener());
+				  }
+				  // Same queue as the Process queue
+				  targetedListener.setDestination(listener.getDestination());
+		          registerListener(targetedListener);
+		          targetedListener.afterPropertiesSet();
+		          targetedListener.setType(Type.Target);
+		          targetedListener.initialize();
+		          targetedListener.start();
+		          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+		            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+		                    "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                    "UIMAJMS_TARGET_LISTENER__INFO",
+		                    new Object[] {targetedListener.getMessageSelector(), controller.getComponentName() });
+		          }
+		          break;
+
+		      }
+		  }
 
-	      }
 	  }
   }
   public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception {
@@ -1190,7 +1271,7 @@ public class JmsInputChannel implements
         newListener.setMessageListener(this);
         newListener.setController(getController());
 
-        TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName());
+        TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName(), aDelegateKey);
         resolver.setConnectionFactory(f);
         resolver.setListener(newListener);
         newListener.setDestinationResolver(resolver);
@@ -1212,7 +1293,7 @@ public class JmsInputChannel implements
         }
         newListener.afterPropertiesSet();
         if ( controller != null && controller.isStopped() ) {
-          newListener.destroy(true);  // shutdownNow
+          stopListener(newListener);  // shutdownNow
           //  we are aborting, the controller has been stopped
           return;
         }
@@ -1231,11 +1312,13 @@ public class JmsInputChannel implements
     }
   }
   public boolean isListenerActiveOnDestination(Destination destination ) {
-   List<UimaDefaultMessageListenerContainer> ll = getListeners();
-   for (UimaDefaultMessageListenerContainer mListener : ll ) {
-     if ( mListener.getDestination() != null && 
-          mListener.getDestination() == destination &&
-          mListener.isRunning()) {
+   List<Listener> ll = getListeners();
+   for (Listener mListener : ll ) {
+	   UimaDefaultMessageListenerContainer lc =
+			   (UimaDefaultMessageListenerContainer)mListener;
+     if ( lc.getDestination() != null && 
+          lc.getDestination() == destination &&
+          lc.isRunning()) {
        return true;
      }
    }
@@ -1252,23 +1335,25 @@ public class JmsInputChannel implements
    * 
    * @return - list of listeners
    */
-  private UimaDefaultMessageListenerContainer[] getListenersForEndpoint(String anEndpointName) {
-	    List<UimaDefaultMessageListenerContainer> ll = getListeners();
+  private Listener[] getListenersForEndpoint(String anEndpointName) {
+	    List<Listener> ll = getListeners();
 
 	    
-	    List<UimaDefaultMessageListenerContainer> listeners = new ArrayList<UimaDefaultMessageListenerContainer>();
-	    for (UimaDefaultMessageListenerContainer mListener : ll) {
-	      if (mListener.getDestinationName() != null
-	              && mListener.getDestinationName().equals(anEndpointName)) {
+	    List<Listener> listeners = new ArrayList<>();
+	    for (Listener mListener : ll) {
+	    	UimaDefaultMessageListenerContainer lc = 
+	    			(UimaDefaultMessageListenerContainer)mListener;
+	      if (lc.getDestinationName() != null
+	              && lc.getDestinationName().equals(anEndpointName)) {
 	        listeners.add(mListener);
-	      } else if (mListener.getDestination() != null
-	              && mListener.getDestination().toString().equals(anEndpointName)) {
+	      } else if (lc.getDestination() != null
+	              && lc.getDestination().toString().equals(anEndpointName)) {
 	        listeners.add(mListener);
 	      }
 	    }
-	    if (listeners.size() > 0) {
-	      UimaDefaultMessageListenerContainer[] listenerArray = new UimaDefaultMessageListenerContainer[listeners
-	              .size()];
+	    if (!listeners.isEmpty()) {
+	      Listener[] listenerArray = new Listener[listeners.size()];
+	              
 	      listeners.toArray(listenerArray);
 	      return listenerArray;
 	    }
@@ -1285,34 +1370,36 @@ public class JmsInputChannel implements
       return;
     }
     // Fetch all associated listeners.
-    final UimaDefaultMessageListenerContainer[] mListeners = getListenersForEndpoint(anEndpointName);
+    final Listener[] mListeners = getListenersForEndpoint(anEndpointName);
     if (mListeners == null) {
       return;
     }
     // Stop each listener
-    for (final UimaDefaultMessageListenerContainer mListener : mListeners) {
-      if (!mListener.isRunning()) {
+    for (final Listener mListener : mListeners) {
+    	UimaDefaultMessageListenerContainer lc = 
+    			(UimaDefaultMessageListenerContainer)mListener;
+      if (!lc.isRunning()) {
         continue; // Already Stopped
       }
 
       try {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)
-                && mListener.getDestination() != null) {
+                && lc.getDestination() != null) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                   "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAJMS_stop_listener__INFO",
-                  new Object[] { mListener.getDestination().toString() });
+                  new Object[] { lc.getDestination().toString() });
         }
-        mListener.stop();
+        lc.stop();
         if (getController() != null) {
           Endpoint endpoint = ((AggregateAnalysisEngineController) getController()).lookUpEndpoint(
                   aDelegateKey, false);
           endpoint.setStatus(Endpoint.FAILED);
-          List<UimaDefaultMessageListenerContainer> ll = null;
+          List<Listener> ll = null;
           if ( listenerMap.containsKey(getController().getComponentName()+getController().hashCode()) ) {
             ll = listenerMap.get(getController().getComponentName()+getController().hashCode());
           }
-          if (mListener.getConnectionFactory() != null) {
+          if (lc.getConnectionFactory() != null) {
             if (getController() instanceof AggregateAnalysisEngineController) {
               if (!failedListenerMap.containsKey(aDelegateKey)) {
                 failedListenerMap.put(aDelegateKey, mListener);
@@ -1345,7 +1432,7 @@ public class JmsInputChannel implements
     }
   }
   public boolean isListenerForDestination(String anEndpointName) {
-    UimaDefaultMessageListenerContainer[] mListeners = getListenersForEndpoint(anEndpointName);
+    Listener[] mListeners = getListenersForEndpoint(anEndpointName);
     if (mListeners == null) {
       return false;
     }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon Feb 26 18:54:11 2018
@@ -60,6 +60,7 @@ import org.apache.uima.aae.UIMAEE_Consta
 import org.apache.uima.aae.UimaSerializer;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
@@ -87,7 +88,7 @@ import com.thoughtworks.xstream.XStream;
 
 public class JmsOutputChannel implements OutputChannel {
 
-  private static final Class CLASS_NAME = JmsOutputChannel.class;
+  private static final Class<?> CLASS_NAME = JmsOutputChannel.class;
 
   private static final long INACTIVITY_TIMEOUT = 5; // MINUTES
 
@@ -159,7 +160,7 @@ public class JmsOutputChannel implements
     serverURI = aServerURI;
   }
 
-  protected void setFreeCasQueue(Destination destination) {
+  public void setFreeCasQueue(Destination destination) {
     freeCASTempQueue = destination;
   }
 
@@ -175,11 +176,12 @@ public class JmsOutputChannel implements
    * 
    * @param connectionFactory
    */
+  /*
   public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
     this.connectionFactory = connectionFactory;
     this.connectionFactory.setTrustAllPackages(true);
   }
-
+*/
   public void setServiceInputEndpoint(String anEnpoint) {
     serviceInputEndpoint = anEnpoint;
   }
@@ -359,11 +361,12 @@ public class JmsOutputChannel implements
     Connection conn = brokerConnectionEntry.getConnection();
     try {
     	if ( conn != null)
-       if ( conn != null && !((ActiveMQConnection)conn).isClosed()) {
+       if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
            for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
                    .entrySet()) {
               endpoints.getValue().close(); // close session and producer
            }
+           brokerConnectionEntry.getConnection().stop();
            brokerConnectionEntry.getConnection().close();
            brokerConnectionEntry.setConnection(null);
        }
@@ -446,6 +449,7 @@ public class JmsOutputChannel implements
     
     try {
         connectionSemaphore.acquire();
+        /*
         //  If sending a Free Cas Request to a remote Cas Multiplier always use the CM's
         //  broker
         if ( anEndpoint.isFreeCasEndpoint() && anEndpoint.isCasMultiplier() && anEndpoint.isReplyEndpoint()) {
@@ -456,6 +460,9 @@ public class JmsOutputChannel implements
           brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
           
         }
+        */
+        brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
+
         String key = getLookupKey(anEndpoint);
         String destination = getDestinationName(anEndpoint);
 
@@ -540,7 +547,7 @@ public class JmsOutputChannel implements
                     //  Returns InputChannel if the Reply Listener for the delegate has previously failed.
                     //  If the listener hasnt failed the getReplyInputChannel returns null
 //                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
-                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString());
+                    InputChannel iC = getAnalysisEngineController().getInputChannel();
                     if ( iC != null ) { 
                       try {
                         // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
@@ -713,9 +720,12 @@ public class JmsOutputChannel implements
         
         case AsynchAEMessage.Process:
           logRequest("UIMAEE_service_sending_process_request__FINE", anEndpoint);
-          serializeCasAndSend(getAnalysisEngineController().
-                  getInProcessCache().
-                    getCacheEntryForCAS(aCasReferenceId), anEndpoint);
+          CasStateEntry stateEntry =
+        		  getAnalysisEngineController().getLocalCache().lookupEntry(aCasReferenceId);
+          serializeCasAndSend(stateEntry, anEndpoint);
+//          serializeCasAndSend(getAnalysisEngineController().
+//                  getInProcessCache().
+//                    getCacheEntryForCAS(aCasReferenceId), anEndpoint);
           return;  /// <<<<< RETURN - Done here >>>>
           
         
@@ -747,7 +757,7 @@ public class JmsOutputChannel implements
     }
   }
   
-  private void serializeCasAndSend(CacheEntry entry, Endpoint anEndpoint) throws Exception {
+  private void serializeCasAndSend(CasStateEntry entry, Endpoint anEndpoint) throws Exception {
     if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
       String serializedCAS = getSerializedCasAndReleaseIt(false, entry.getCasReferenceId(), anEndpoint,
               anEndpoint.isRetryEnabled());
@@ -785,7 +795,8 @@ public class JmsOutputChannel implements
   }
 
 
-  public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
+//  public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
+	  public void sendReply(CasStateEntry entry, Endpoint anEndpoint) throws AsynchAEException {
 	  try {
       anEndpoint.setReplyEndpoint(true);
       if (anEndpoint.isRemote()) {
@@ -844,14 +855,15 @@ public class JmsOutputChannel implements
 	        // If this service is a Cas Multiplier add to the message a FreeCasQueue.
 	        // The client may need send Stop request to that queue.
 	        if (aCommand == AsynchAEMessage.ServiceInfo
-	                && getAnalysisEngineController().isCasMultiplier() ) {
-	          if ( freeCASTempQueue != null ) {
+	                && getAnalysisEngineController().isCasMultiplier()  ) {
+	        	if ( freeCASTempQueue != null ) {
 		        	// Attach a temp queue to the outgoing message. This a queue where
 		          // Free CAS notifications need to be sent from the client
 		          tm.setJMSReplyTo(freeCASTempQueue);
-	          }
+	          
+	        	}
 	          // new services will receive FreeCas request via a targeted queue
-	          StringBuffer selector = new StringBuffer().
+	          StringBuilder selector = new StringBuilder().
 	        		  append("TargetServiceId = ").
 	        		  append("'").append(hostIP).append(":").
 	        		  append(getAnalysisEngineController().getPID()).
@@ -1044,9 +1056,15 @@ public class JmsOutputChannel implements
     }
     long msgSize = 0;
 
+    System.out.println("------------ Service:"+getAnalysisEngineController().getComponentName()+" Sending GetMeta Reply");
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     try {
-    	
+    	Object destination = anEndpoint.getReplyDestination();
+    	if ( destination == null ) {
+    		destination = anEndpoint.getDestination();
+    	}
+    	System.out.println(".......... Service:"+getAnalysisEngineController().getComponentName()+" replying with GetMeta to reply queue:"+destination);
+
     	
       anEndpoint.setReplyEndpoint(true);
       // Initialize JMS connection to given endpoint
@@ -1237,7 +1255,7 @@ public class JmsOutputChannel implements
     CAS cas = null;
     try {
       String serializedCAS = null;
-      // Using Cas reference Id retrieve CAS from the shared Cash
+      // Using Cas reference Id retrieve CAS from the shared Cache
       cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
       ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
       if (cas == null) {
@@ -1509,7 +1527,7 @@ public class JmsOutputChannel implements
     this.controllerInputEndpoint = controllerInputEndpoint;
   }
 
-  private void dispatch(Message aMessage, Endpoint anEndpoint, CacheEntry entry, boolean isRequest,
+  private void dispatch(Message aMessage, Endpoint anEndpoint, CasStateEntry entry, boolean isRequest,
           JmsEndpointConnection_impl endpointConnection, long msgSize) throws Exception {
 	  
 	  if ( anEndpoint == null ) { 
@@ -1627,10 +1645,9 @@ public class JmsOutputChannel implements
     }
   }
 
-  private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CacheEntry entry,
+  private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CasStateEntry casStateEntry,
           Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
           ServiceShutdownException {
-    CasStateEntry casStateEntry = null;
     long msgSize = 0;
     try {
       if (aborting) {
@@ -1639,9 +1656,7 @@ public class JmsOutputChannel implements
       //  If this is a reply to a client, use the same broker URL that manages this service input queue.
       //  Otherwise this is a request so use a broker specified in the endpoint object.
       String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
-
-      casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
-              entry.getCasReferenceId());
+/*
       if (casStateEntry == null) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(
                 Level.WARNING,
@@ -1651,12 +1666,12 @@ public class JmsOutputChannel implements
                 "UIMAJMS_unable_to_send_reply__WARNING",
                 new Object[] { getAnalysisEngineController().getComponentName(),
                   anEndpoint.getDestination(), brokerConnectionURL, 
-                  entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), 
-                          entry.getCasReferenceId(), 0, 
+                  casStateEntry.getParentCasReferenceId() == null ? "" : casStateEntry.getParentCasReferenceId(), 
+                		  casStateEntry.getCasReferenceId(), 0, 
                           new Exception("Unable to lookup entry in Local Cache for a given Cas Id")  });
         return;
       }
-
+*/
       // Get the connection object for a given endpoint
       JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
 
@@ -1689,19 +1704,22 @@ public class JmsOutputChannel implements
                   JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAJMS_unable_to_send_reply__WARNING",
                   new Object[] { getAnalysisEngineController().getComponentName(),
-                  	anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex  });
+                  	anEndpoint.getDestination(), brokerConnectionURL, casStateEntry.getParentCasReferenceId() == null ? "" : casStateEntry.getParentCasReferenceId(), casStateEntry.getCasReferenceId(), 0, ex  });
         return;
       }
       // Add Cas Reference Id to the outgoing JMS Header
-      tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
+      tm.setStringProperty(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
       // Add common properties to the JMS Header
       if (isRequest == true) {
         populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
       } else {
-        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);   
-        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
+        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);  
+        CacheEntry cacheEntry = 
+        		getAnalysisEngineController().
+        		     getInProcessCache().getCacheEntryForCAS(casStateEntry.getCasReferenceId());
+        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, cacheEntry.sentDeltaCas());
       }
-      // The following is true when the analytic is a CAS Multiplier
+      // The following is true when the service is a CAS Multiplier
       if (casStateEntry.isSubordinate() && !isRequest) {
         // Override MessageType set in the populateHeaderWithContext above.
         // Make the reply message look like a request. This message will contain a new CAS
@@ -1710,10 +1728,10 @@ public class JmsOutputChannel implements
         tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
         isRequest = true;
         // Save the id of the parent CAS
-        tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
+        tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(casStateEntry
                 .getCasReferenceId()));
         // Add a sequence number assigned to this CAS by the controller
-        tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
+        tm.setLongProperty(AsynchAEMessage.CasSequence, casStateEntry.getSequenceNumber());
         // If this is a Cas Multiplier, add a reference to a special queue where
         // the client sends Free Cas Notifications
         if (freeCASTempQueue != null) {
@@ -1729,11 +1747,11 @@ public class JmsOutputChannel implements
                   JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
                   new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
-                      anEndpoint.getEndpoint(), entry.getCasReferenceId(),
-                      entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
+                      anEndpoint.getEndpoint(), casStateEntry.getCasReferenceId(),
+                      casStateEntry.getParentCasReferenceId(), casStateEntry.getParentCasReferenceId() });
         }
       }
-      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
+      dispatch(tm, anEndpoint, casStateEntry, isRequest, endpointConnection, msgSize);
 
     } catch (JMSException e) {
       // Unable to establish connection to the endpoint. Logit and continue
@@ -1757,21 +1775,25 @@ public class JmsOutputChannel implements
 
   private Delegate lookupDelegate(String aDelegateKey) {
     if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
-      Delegate delegate = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+      return ((AggregateAnalysisEngineController) getAnalysisEngineController())
               .lookupDelegate(aDelegateKey);
-      return delegate;
     }
     return null;
   }
 
-  private void addCasToOutstandingList(CacheEntry entry, boolean isRequest, String aDelegateKey) {
+  private void addCasToOutstandingList(CasStateEntry casStateEntry, boolean isRequest, String aDelegateKey) 
+  throws AsynchAEException {
     Delegate delegate = null;
     if (isRequest && (delegate = lookupDelegate(aDelegateKey)) != null) {
-      delegate.addCasToOutstandingList(entry.getCasReferenceId(), entry.getCas().hashCode(), false);  // false=dont start timer thread per CAS
+        CacheEntry cacheEntry = 
+        		getAnalysisEngineController().
+        		     getInProcessCache().getCacheEntryForCAS(casStateEntry.getCasReferenceId());
+
+      delegate.addCasToOutstandingList(casStateEntry.getCasReferenceId(), cacheEntry.getCas().hashCode(), false);  // false=dont start timer thread per CAS
     }
   }
 
-  private void removeCasFromOutstandingList(CacheEntry entry, boolean isRequest, String aDelegateKey) {
+  private void removeCasFromOutstandingList(CasStateEntry entry, boolean isRequest, String aDelegateKey) {
     Delegate delegate = null;
     if (isRequest && (delegate = lookupDelegate(aDelegateKey)) != null) {
       delegate.removeCasFromOutstandingList(entry.getCasReferenceId());
@@ -1787,7 +1809,7 @@ public class JmsOutputChannel implements
 
     if (casStateEntry.isSubordinate()) {
       // Recurse until the top CAS reference Id is found
-      return getTopParentCasReferenceId(casStateEntry.getInputCasReferenceId());
+      return getTopParentCasReferenceId(casStateEntry.getParentCasReferenceId());
     }
     // Return the top ancestor CAS id
     return casStateEntry.getCasReferenceId();
@@ -2097,7 +2119,7 @@ public class JmsOutputChannel implements
   private static class RecoveryThread implements Runnable {
     Endpoint endpoint;
 
-    CacheEntry entry;
+    CasStateEntry entry;
 
     boolean isRequest;
 
@@ -2105,7 +2127,7 @@ public class JmsOutputChannel implements
 
     JmsOutputChannel outputChannel;
 
-    public RecoveryThread(JmsOutputChannel channel, Endpoint anEndpoint, CacheEntry anEntry,
+    public RecoveryThread(JmsOutputChannel channel, Endpoint anEndpoint, CasStateEntry anEntry,
             boolean isRequest, AnalysisEngineController aController) {
       endpoint = anEndpoint;
       entry = anEntry;
@@ -2124,9 +2146,8 @@ public class JmsOutputChannel implements
         // Mark this delegate as Failed
         delegate.getEndpoint().setStatus(Endpoint.FAILED);
         // Destroy listener associated with a reply queue for this delegate
-        InputChannel ic = controller.getInputChannel(delegate.getEndpoint().getDestination()
-                .toString());
-        if (ic != null && delegate != null && delegate.getEndpoint() != null) {
+        InputChannel ic = controller.getInputChannel();
+        if (delegate.getEndpoint() != null) {
           ic.destroyListener(delegate.getEndpoint().getDestination().toString(), endpoint
                   .getDelegateKey());
         }
@@ -2142,4 +2163,7 @@ public class JmsOutputChannel implements
 
     }
   }
+  public ENDPOINT_TYPE getType() {
+		return ENDPOINT_TYPE.JMS;
+  }
 }

Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java Mon Feb 26 18:54:11 2018
@@ -35,10 +35,13 @@ public class TempDestinationResolver imp
 
   private String serviceName = "";
   
+  private String endpoint = "";
+  
   public TempDestinationResolver() {
   }
-  public TempDestinationResolver(String name) {
+  public TempDestinationResolver(String name, String endpoint) {
 	  serviceName = name;
+	  this.endpoint = endpoint;
   }
   /**
    * This method is called by the Spring listener code. It creates a single temp queue for all
@@ -48,7 +51,9 @@ public class TempDestinationResolver imp
    */
   public Destination resolveDestinationName(Session session, String destinationName,
           boolean pubSubDomain) throws JMSException {
-    synchronized (mutex) {
+	  System.out.println("************ resolveDestinationName() Controller:"+serviceName+" Endpoint:"+endpoint+"************************");
+
+	  synchronized (mutex) {
       if (destination == null) {
         destination = session.createTemporaryQueue();
         if (listener != null) {