You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [1/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima...
Author: cwiklik
Date: Mon Feb 26 18:54:11 2018
New Revision: 1825401
URL: http://svn.apache.org/viewvc?rev=1825401&view=rev
Log:
UIMA-5280 modified to not depend on dd2spring, refactored code to support calling a service without a broker, many internal changes to support local transport better
Added:
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/ServiceDeployers.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaASUtils.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaASClientDirectMessage.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaASClientMessage.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/AbstractUimaASService.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/AsynchronousUimaASService.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/ScaleoutSpecification.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/ServiceNotFoundException.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/ServiceRegistry.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaAsServiceRegistry.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java
Modified:
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/JmxManagement.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/JmxManager.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java
uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java Mon Feb 26 18:54:11 2018
@@ -67,9 +67,9 @@ import org.springframework.jms.listener.
*
*/
public class ConcurrentMessageListener implements SessionAwareMessageListener {
- private static final Class CLASS_NAME = ConcurrentMessageListener.class;
+ private static final Class<?> CLASS_NAME = ConcurrentMessageListener.class;
- private SessionAwareMessageListener delegateListener;
+ private SessionAwareMessageListener<Message> delegateListener;
private int concurrentThreadCount = 0;
@@ -98,14 +98,14 @@ public class ConcurrentMessageListener i
* - JmsInputChannel instance to delegate CAS to
* @throws InvalidClassException
*/
- public ConcurrentMessageListener(int concurrentThreads, Object delegateListener, String destination, ThreadGroup threadGroup, String threadPrefix)
+ public ConcurrentMessageListener(int concurrentThreads, SessionAwareMessageListener<Message> inputChannel, String destination, ThreadGroup threadGroup, String threadPrefix)
throws InvalidClassException {
- if (!(delegateListener instanceof SessionAwareMessageListener)) {
- throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"
- + SessionAwareMessageListener.class + " Received:" + delegateListener.getClass());
- }
+// if (!(delegateListener instanceof SessionAwareMessageListener)) {
+// throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"
+// + SessionAwareMessageListener.class + " Received:" + delegateListener.getClass());
+// }
concurrentThreadCount = concurrentThreads;
- this.delegateListener = (SessionAwareMessageListener) delegateListener;
+ this.delegateListener = inputChannel;
if (concurrentThreads > 1) {
// created an unbounded queue. The throttling is controlled by the
// semaphore in the UimaBlockingExecutor initialized below
@@ -140,7 +140,34 @@ public class ConcurrentMessageListener i
private boolean isMessageFromCasMultiplier(final Message message) throws JMSException {
return message.propertyExists(AsynchAEMessage.CasSequence);
}
+ private void incrementChildCASes(String parentCASid) {
+ try {
+// String parentCasReferenceId = message
+// .getStringProperty(AsynchAEMessage.InputCasReference);
+ // Fetch parent CAS entry from the local cache
+ CasStateEntry parentEntry = controller.getLocalCache().lookupEntry(parentCASid);
+ // increment number of child CASes this parent has in play
+ parentEntry.incrementSubordinateCasInPlayCount();
+ // increment a counter that counts number of child CASes that have no
+ // flow object yet. The flow object is created for each child CAS from
+ // the parent flow object. The method below will actually acquire a
+ // permit from a binary semaphore to force the parent to block until
+ // the last of its children acquires its Flow object.
+ parentEntry.incrementOutstandingFlowCounter();
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", controller.getComponentName());
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
+ }
+ }
+ }
/**
* Intercept a message to increment a child count of the input CAS. This method is always called
* in a single thread, guaranteeing order of processing. The child CAS will always come here
@@ -150,12 +177,16 @@ public class ConcurrentMessageListener i
*
*/
public void onMessage(final Message message, final Session session) throws JMSException {
- try {
- // Wait until the controller is plugged in
- controllerLatch.await();
- } catch (InterruptedException e) {
- }
+ System.out.println("..........ConcurrentMessageListener.onMessage() got message");
+
+// try {
+// // Wait until the controller is plugged in
+// controllerLatch.await();
+// } catch (InterruptedException e) {
+// }
if (isMessageFromCasMultiplier(message)) {
+ System.out.println("..........ConcurrentMessageListener.onMessage() message from remote CM");
+
// Check if the message came from a Cas Multiplier and it contains a new Process Request
int command = message.getIntProperty(AsynchAEMessage.Command);
int messageType = message.getIntProperty(AsynchAEMessage.MessageType);
@@ -171,6 +202,8 @@ public class ConcurrentMessageListener i
delegate.setConcurrentConsumersOnReplyQueue();
}
}
+ incrementChildCASes(message.getStringProperty(AsynchAEMessage.InputCasReference));
+/*
try {
String parentCasReferenceId = message
.getStringProperty(AsynchAEMessage.InputCasReference);
@@ -197,6 +230,7 @@ public class ConcurrentMessageListener i
"UIMAJMS_exception__WARNING", e);
}
}
+ */
}
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Mon Feb 26 18:54:11 2018
@@ -55,6 +55,7 @@ import org.apache.uima.aae.UIMAEE_Consta
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.BaseAnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
@@ -124,8 +125,9 @@ public class JmsEndpointConnection_impl
} else {
// If this is a reply to a client, use the same broker URL that manages this service input queue.
// Otherwise this is a request so use a broker specified in the endpoint object.
- serverUri = (anEndpoint.isReplyEndpoint()) ?
- ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() : anEndpoint.getServerURI();
+// serverUri = (anEndpoint.isReplyEndpoint()) ?
+// ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() : anEndpoint.getServerURI();
+ serverUri = anEndpoint.getServerURI();
}
isReplyEndpoint = anEndpoint.isReplyEndpoint();
controller = aController;
@@ -183,7 +185,7 @@ public class JmsEndpointConnection_impl
// If replying to http request, reply to a queue managed by this service broker using tcp
// protocol
if (isReplyEndpoint && brokerUri.startsWith("http")) {
- brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
+ brokerUri = ((JmsOutputChannel) aController.getOutputChannel(ENDPOINT_TYPE.JMS)).getServerURI();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
@@ -193,7 +195,7 @@ public class JmsEndpointConnection_impl
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_override_connection_to_endpoint__FINE",
new Object[] { aComponentName, getEndpoint(),
- ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
+ ((JmsOutputChannel) aController.getOutputChannel(ENDPOINT_TYPE.JMS)).getServerURI() });
}
} else if ( !brokerUri.startsWith("http") && !brokerUri.startsWith("failover") && !brokerUri.startsWith("vm://localhost?broker.persistent=false")){
String prefix = "";
@@ -772,7 +774,7 @@ public class JmsEndpointConnection_impl
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), key, endpointName});
+ "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), endpointName,serverUri});
}
CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
// Mark the CAS as failed so that the CAS is released and cache cleaned up
@@ -889,14 +891,14 @@ public class JmsEndpointConnection_impl
* @param delegateKey
* @throws Exception
*/
- private void createListener(String delegateKey) throws Exception {
- if (controller instanceof AggregateAnalysisEngineController) {
- // Fetch an InputChannel that handles messages for a given delegate
- InputChannel iC = controller.getReplyInputChannel(delegateKey);
- // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
- iC.createListener(delegateKey, null);
- }
- }
+// private void createListener(String delegateKey) throws Exception {
+// if (controller instanceof AggregateAnalysisEngineController) {
+// // Fetch an InputChannel that handles messages for a given delegate
+// InputChannel iC = controller.getReplyInputChannel(delegateKey);
+// // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
+// iC.createListener(delegateKey, null);
+// }
+// }
private synchronized boolean handleJmsException(JMSException ex) {
if (!failed) {
@@ -990,7 +992,7 @@ public class JmsEndpointConnection_impl
if ( casState.isSubordinate()) {
try {
- String inputCasId = casState.getInputCasReferenceId();
+ String inputCasId = casState.getParentCasReferenceId();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Mon Feb 26 18:54:11 2018
@@ -41,8 +41,10 @@ import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.Endpoint_impl;
@@ -59,8 +61,11 @@ import org.apache.uima.aae.message.Messa
import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.message.JmsMessageContext;
+import org.apache.uima.as.client.Listener;
+import org.apache.uima.as.client.Listener.Type;
import org.apache.uima.util.Level;
import org.springframework.jms.listener.SessionAwareMessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* Thin adapter for receiving JMS messages from Spring. It delegates processing of all messages to
@@ -69,17 +74,17 @@ import org.springframework.jms.listener.
*
*/
public class JmsInputChannel implements InputChannel, JmsInputChannelMBean,
- SessionAwareMessageListener {
+ SessionAwareMessageListener<Message> {
/**
*
*/
private static final long serialVersionUID = -3318400773113552290L;
- private static final Class CLASS_NAME = JmsInputChannel.class;
+ private static final Class<?> CLASS_NAME = JmsInputChannel.class;
- private transient final CountDownLatch msgHandlerLatch = new CountDownLatch(1);
+ private final transient CountDownLatch msgHandlerLatch = new CountDownLatch(1);
- private transient final CountDownLatch controllerLatch = new CountDownLatch(1);
+ private final transient CountDownLatch controllerLatch = new CountDownLatch(1);
// Reference to the first Message Handler in the Chain.
private transient Handler handler;
@@ -92,9 +97,9 @@ public class JmsInputChannel implements
private int sessionAckMode;
- private transient UimaDefaultMessageListenerContainer messageListener;
+ //private transient UimaDefaultMessageListenerContainer messageListener;
- private transient Session jmsSession;
+ //private transient Session jmsSession;
private String brokerURL = "";
@@ -110,21 +115,37 @@ public class JmsInputChannel implements
private transient RemoteJMXServer remoteJMXServer = null;
// synchronizes initialization of RemotBroker
- private Object brokerMux = new Object();
+ // private Object brokerMux = new Object();
- private ConcurrentHashMap<String, UimaDefaultMessageListenerContainer> failedListenerMap = new ConcurrentHashMap<String, UimaDefaultMessageListenerContainer>();
+ private ConcurrentHashMap<String, Listener> failedListenerMap =
+ new ConcurrentHashMap<>();
// A global flag that determines if we should create a connection to broker's MBeanServer to be
// able to determine if client's reply queue exists before processing a CAS.
public static transient boolean attachToBrokerMBeanServer=true;
- private static Map<String, List<UimaDefaultMessageListenerContainer>> listenerMap =
- new ConcurrentHashMap<String, List<UimaDefaultMessageListenerContainer>>();
+ private ENDPOINT_TYPE type = ENDPOINT_TYPE.JMS;
+ private static Map<String, List<Listener>> listenerMap =
+ new ConcurrentHashMap<>();
+
+ private ChannelType channelType;
+
+ public JmsInputChannel(ChannelType type) {
+ this.channelType = type;
+ }
+ public ChannelType getChannelType() {
+ return channelType;
+ }
+
public AnalysisEngineController getController() {
return controller;
}
-
+ public ENDPOINT_TYPE getType() {
+ return type;
+ }
+
+
public String getName() {
return endpointName;
}
@@ -652,18 +673,18 @@ public class JmsInputChannel implements
.getIntProperty(AsynchAEMessage.MessageType));
String msgSentFromIP = null;
-
+ // System.out.println("<<<<<<<<<<<<< Service:"+ getController().getComponentName()+ " Received JMS Message - command:"+command+" type:"+messageType);
if (aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response
&& aMessage.propertyExists(AsynchAEMessage.ServerIP)) {
msgSentFromIP = aMessage.getStringProperty(AsynchAEMessage.ServerIP);
}
- // System.out.println("***********************************************************************************"
- // +
- // " \n**CONTROLLER::"+controller.getName()+"**** Received New Message From [ "+aMessage.getStringProperty(AsynchAEMessage.MessageFrom)+" ]**************"
- // +
- // " \n**MSGTYPE::"+messageType+" COMMAND:"+command +
- // " Cas Reference Id::"+casRefId+
- // " \n******************************************************************************");
+ System.out.println("***********************************************************************************"
+ +
+ " \n**CONTROLLER::"+controller.getName()+"**** Received New Message From [ "+aMessage.getStringProperty(AsynchAEMessage.MessageFrom)+" ]**************"
+ +
+ " \n**MSGTYPE::"+messageType+" COMMAND:"+command +
+ " Cas Reference Id::"+casRefId+
+ " \n******************************************************************************");
String msgFrom = (String) aMessage.getStringProperty(AsynchAEMessage.MessageFrom);
if (controller != null && msgFrom != null) {
@@ -674,7 +695,7 @@ public class JmsInputChannel implements
if ( ackClient(messageContext) ) {
try {
// Any exception while sending an ACK results in a dropped request
- getController().getOutputChannel().sendReply(AsynchAEMessage.ServiceInfo,
+ getController().getOutputChannel(ENDPOINT_TYPE.JMS).sendReply(AsynchAEMessage.ServiceInfo,
messageContext.getEndpoint(), aMessage.getStringProperty(AsynchAEMessage.CasReference), true);
} catch( Exception ex) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
@@ -800,30 +821,31 @@ public class JmsInputChannel implements
public String getServerUri() {
return brokerURL;
}
- public List<UimaDefaultMessageListenerContainer> registerListener(UimaDefaultMessageListenerContainer messageListener) {
- List<UimaDefaultMessageListenerContainer> ll = null;
+ public List<Listener> registerListener(Listener messageListener) {
+ List<Listener> ll = null;
if ( listenerMap.containsKey(getController().getComponentName()+getController().hashCode()) ) {
ll = listenerMap.get(getController().getComponentName()+getController().hashCode());
} else {
- ll = new CopyOnWriteArrayList<UimaDefaultMessageListenerContainer>();
+ ll = new CopyOnWriteArrayList<>();
listenerMap.put(getController().getComponentName()+getController().hashCode(), ll);
}
if (!ll.contains(messageListener)) {
ll.add(messageListener);
} else {
}
- for(Object container: ll ) {
- UimaDefaultMessageListenerContainer c = (UimaDefaultMessageListenerContainer) container;
- }
+// for(Object container: ll ) {
+// UimaDefaultMessageListenerContainer c = (UimaDefaultMessageListenerContainer) container;
+// }
return ll;
}
- public List<UimaDefaultMessageListenerContainer> getListeners() {
- List<UimaDefaultMessageListenerContainer> ll = null;
+ public List<Listener> getListeners() {
+ List<Listener> ll = null;
if ( listenerMap.containsKey(getController().getComponentName()+getController().hashCode()) ) {
ll = listenerMap.get(getController().getComponentName()+getController().hashCode());
}
return ll;
}
+ /*
public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer jmsL) {
this.messageListener = jmsL;
System.setProperty("BrokerURI", messageListener.getBrokerUrl());
@@ -844,7 +866,13 @@ public class JmsInputChannel implements
}
}
}
-
+*/
+ public synchronized void addListenerContainer(UimaDefaultMessageListenerContainer messageListener) {
+ // Add new message listener to the list
+ registerListener(messageListener);
+
+ }
+ /*
public ActiveMQConnectionFactory getConnectionFactory() {
if (messageListener == null) {
return null;
@@ -852,7 +880,7 @@ public class JmsInputChannel implements
return (ActiveMQConnectionFactory) messageListener.getConnectionFactory();
}
}
-
+*/
public void ackMessage(MessageContext aMessageContext) {
if (aMessageContext != null && sessionAckMode == Session.CLIENT_ACKNOWLEDGE) {
try {
@@ -877,15 +905,7 @@ public class JmsInputChannel implements
}
public String getInputQueueName() {
- if (messageListener != null)
- if (messageListener.getDestination() != null) {
- return messageListener.getDestination().toString();
- } else {
- return messageListener.getDestinationName();// getEndpointName();
- }
- else {
- return "";
- }
+ return endpointName;
}
public ServiceInfo getServiceInfo() {
@@ -914,7 +934,47 @@ public class JmsInputChannel implements
.setBrokerURL(brokerURL);
}
}
+ private void stopListener(final UimaDefaultMessageListenerContainer mL) throws Exception {
+ System.out.println(".... "+getController().getComponentName()+" Stopping Listener Type:"+mL.getType());
+ mL.setTerminating();
+ mL.setAcceptMessagesWhileStopping(false);
+
+ mL.stop();
+ System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Calling destroy()");
+ System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Done Calling destroy()");
+
+ if ( mL.getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
+ System.out.println(".... "+getController().getComponentName()+" Stopping ThreadPoolTaskExecutor");
+ ThreadPoolTaskExecutor tpe = ((ThreadPoolTaskExecutor)mL.getTaskExecutor());
+ tpe.destroy();
+ } else {
+ System.out.println(".... "+getController().getComponentName()+" ActiveConsumerCount:"+mL.getActiveConsumerCount());
+ if ( mL.getTaskExecutor() != null ) {
+ System.out.println(".... "+getController().getComponentName()+" TaskExecutor Class:"+ mL.getTaskExecutor().getClass().getName());
+ } else {
+ //((ActiveMQDestination)mL.getDestination()).;
+ //mL.getTransport().;
+ // mL.notifyAll();
+ if ( mL.getActiveConsumerCount() > 0 ) {
+ Thread.dumpStack();
+ }
+ System.out.println(".... "+getController().getComponentName()+" Destroying Listener");
+ mL.doDestroy();
+ System.out.println(".... "+getController().getComponentName()+" ActiveConsumerCount:"+mL.getActiveConsumerCount());
+
+ }
+
+
+
+
+ }
+ System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Stopped Task Executor");
+ mL.shutdown();
+
+ System.out.println("Stopped Listener ");
+
+ }
private void stopChannel(UimaDefaultMessageListenerContainer mL, boolean shutdownNow) throws Exception {
String eName = mL.getEndpointName();
if (eName != null) {
@@ -924,9 +984,9 @@ public class JmsInputChannel implements
new Object[] { eName, shutdownNow });
}
}
- mL.delegateStop();
+ // mL.delegateStop();
if (shutdownNow) {
- mL.destroy(shutdownNow);
+ stopListener(mL);
}
String selector = "";
@@ -958,7 +1018,7 @@ public class JmsInputChannel implements
public void stop(boolean shutdownNow) throws Exception {
stop(InputChannel.CloseAllChannels, shutdownNow);
listenerContainerList.clear();
- List<UimaDefaultMessageListenerContainer> ll = getListeners();
+ List<Listener> ll = getListeners();
if ( ll != null ) {
ll.clear();
}
@@ -969,26 +1029,31 @@ public class JmsInputChannel implements
}
}
public void disconnectListenersFromQueue() throws Exception {
- List<UimaDefaultMessageListenerContainer> ll = getListeners();
- for (UimaDefaultMessageListenerContainer listenerObject : ll) {
- stopChannel(listenerObject, false);
+ List<Listener> ll = getListeners();
+ for (Listener listenerObject : ll) {
+ disconnectListenerFromQueue(listenerObject);
}
}
+ public void disconnectListenerFromQueue(Listener listener) throws Exception {
+ stopChannel((UimaDefaultMessageListenerContainer)listener, false);
+ }
public void setTerminating() {
- List<UimaDefaultMessageListenerContainer> ll = getListeners();
- for (UimaDefaultMessageListenerContainer listenerObject : ll) {
- listenerObject.setTerminating();
+ List<Listener> ll = getListeners();
+ for (Listener listenerObject : ll) {
+ ((UimaDefaultMessageListenerContainer)listenerObject).setTerminating();
}
}
public void terminate() {
try {
- List<UimaDefaultMessageListenerContainer> ll = getListeners();
- for (UimaDefaultMessageListenerContainer listenerObject : ll) {
- listenerObject.closeConnection();
+ List<Listener> ll = getListeners();
+ for (Listener listenerObject : ll) {
+ UimaDefaultMessageListenerContainer lc =
+ (UimaDefaultMessageListenerContainer)listenerObject;
+ lc.closeConnection();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- String msg = "................. JmsInputChannel.terminate() - Listener -"+listenerObject.getDestinationName();
+ String msg = "................. JmsInputChannel.terminate() - Listener -"+lc.getDestinationName();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "terminate",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
@@ -1004,8 +1069,8 @@ public class JmsInputChannel implements
}
public synchronized void stop(int channelsToClose, boolean shutdownNow) throws Exception {
- List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
- List<UimaDefaultMessageListenerContainer> ll = getListeners();
+ // List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
+ List<Listener> ll = getListeners();
String msg = ">>>>>>>>>>>> JmsInputChannel. stop() - Controller:"+controller.getComponentName()+ " Listener Count:"+ll.size();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stop",
@@ -1013,13 +1078,14 @@ public class JmsInputChannel implements
new Object[] { msg });
}
- for (UimaDefaultMessageListenerContainer listenerObject : ll) {
-
- if (listenerObject != null && doCloseChannel(listenerObject, channelsToClose)) {
- listenerObject.setRecoveryInterval(0); // https://issues.apache.org/jira/browse/UIMA-3437
- listenerObject.setTerminating();
- listenerObject.setAutoStartup(false);
- stopChannel(listenerObject, shutdownNow);
+ for (Listener listenerObject : ll) {
+ UimaDefaultMessageListenerContainer lc =
+ (UimaDefaultMessageListenerContainer)listenerObject;
+ if (listenerObject != null && doCloseChannel(lc, channelsToClose)) {
+ lc.setRecoveryInterval(0); // https://issues.apache.org/jira/browse/UIMA-3437
+ lc.setTerminating();
+ lc.setAutoStartup(false);
+ stopChannel(lc, shutdownNow);
} else {
if (getController() != null) {
@@ -1041,9 +1107,9 @@ public class JmsInputChannel implements
return stopped;
}
- public int getConcurrentConsumerCount() {
- return messageListener.getConcurrentConsumers();
- }
+// public int getConcurrentConsumerCount() {
+// return messageListener.getConcurrentConsumers();
+// }
private void testIfBrokerRunning(String aBrokerUrl) throws Exception {
ActiveMQConnectionFactory f = new ActiveMQConnectionFactory(aBrokerUrl);
@@ -1061,7 +1127,8 @@ public class JmsInputChannel implements
}
}
public void createListenerOnTempQueue(ConnectionFactory cf, boolean isFreeCasDestination ) throws Exception {
- TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName());
+ TempDestinationResolver resolver =
+ new TempDestinationResolver(controller.getComponentName(),"");
UimaDefaultMessageListenerContainer connector = new UimaDefaultMessageListenerContainer(true);
connector.setConnectionFactory(cf);
resolver.setListener(connector);
@@ -1069,9 +1136,9 @@ public class JmsInputChannel implements
connector.setDestinationResolver(resolver);
connector.setController(getController());
connector.setMessageListener(this);
- connector.initializeContainer();
+ //connector.initializeContainer();
connector.getDestination();
- connector.afterPropertiesSet(false);
+ connector.afterPropertiesSet();
UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,getController().getComponentName()+"-JmsInputChannel.createListenerOnTempQueue()-starting new Listener" );
connector.start();
boolean log = true;
@@ -1103,10 +1170,10 @@ public class JmsInputChannel implements
}
if ( isFreeCasDestination ) {
- ((JmsOutputChannel) getController().getOutputChannel())
+ ((JmsOutputChannel) getController().getOutputChannel(ENDPOINT_TYPE.JMS))
.setFreeCasQueue(connector.getListenerEndpoint());
}
- setListenerContainer(connector);
+ addListenerContainer(connector);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
@@ -1116,7 +1183,7 @@ public class JmsInputChannel implements
}
}
public void createListenerForTargetedMessages() throws Exception {
- List<UimaDefaultMessageListenerContainer> listeners =
+ List<Listener> listeners =
getListeners();
// the TargetServiceId property value will become part of a jms selector.
String targetStringSelector = "";
@@ -1130,49 +1197,63 @@ public class JmsInputChannel implements
// find a listener instance which handles Process requests. The targeted
// listener created here will share a Connection Factory and ThreadFactory.
//
- for(UimaDefaultMessageListenerContainer listener : listeners ) {
- // Is this a Process listener instance? Check the selector
- if ( listener.getMessageSelector().endsWith(UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX) ) {
- // this will be a dedicated listener which handles targeted messages
- UimaDefaultMessageListenerContainer targetedListener = new UimaDefaultMessageListenerContainer();
- // setup jms selector
- if ( getController().isCasMultiplier()) {
- targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
- } else {
- targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
- }
-
- // use shared ConnectionFactory
- targetedListener.setConnectionFactory(listener.getConnectionFactory());
- // mark the listener as a 'Targeted' listener
- targetedListener.setTargetedListener();
- targetedListener.setController(getController());
- // there will only be one AMQ delivery thread. Its job will be to
- // add a targeted message to a BlockingQueue. Such thread will block
- // in an enqueue if a dequeue is not available. This will be prevent
- // the overwhelming the service with messages.
- targetedListener.setConcurrentConsumers(1);
- if ( listener.getMessageListener() instanceof PriorityMessageHandler ) {
- // the targeted listener will use the same message handler as the
- // Process listener. This handler will add a message wrapper
- // to enable prioritizing messages.
- targetedListener.setMessageListener(listener.getMessageListener());
+ for(Listener l : listeners ) {
+ UimaDefaultMessageListenerContainer listener =
+ (UimaDefaultMessageListenerContainer) l;
+ if ( listener.getTransport().equals(Transport.JMS)) {
+ // Ignore listeners for replies (message selector = null)
+ if ( listener.getMessageSelector() == null ) {
+ continue;
}
- // Same queue as the Process queue
- targetedListener.setDestination(listener.getDestination());
- registerListener(targetedListener);
- targetedListener.afterPropertiesSet();
- targetedListener.initialize();
- targetedListener.start();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_TARGET_LISTENER__INFO",
- new Object[] {targetedListener.getMessageSelector(), controller.getComponentName() });
- }
- break;
+ // Is this a Process listener instance? Check the selector
+ if ( listener.getMessageSelector().endsWith(UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX) ) {
+ // this will be a dedicated listener which handles targeted messages
+ UimaDefaultMessageListenerContainer targetedListener = new UimaDefaultMessageListenerContainer();
+ // setup jms selector
+ if ( getController().isCasMultiplier()) {
+ targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+ } else {
+ targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+ }
+
+ // use shared ConnectionFactory
+ targetedListener.setConnectionFactory(listener.getConnectionFactory());
+ // mark the listener as a 'Targeted' listener
+ targetedListener.setTargetedListener();
+ targetedListener.setController(getController());
+ // there will only be one delivery thread. Its job will be to
+ // add a targeted message to a BlockingQueue. Such thread will block
+ // in an enqueue if a dequeue is not available. This will be prevent
+ // the overwhelming the service with messages.
+ ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+ threadExecutor.setCorePoolSize(1);
+ threadExecutor.setMaxPoolSize(1);
+ targetedListener.setTaskExecutor(threadExecutor);
+ targetedListener.setConcurrentConsumers(1);
+ if ( listener.getMessageListener() instanceof PriorityMessageHandler ) {
+ // the targeted listener will use the same message handler as the
+ // Process listener. This handler will add a message wrapper
+ // to enable prioritizing messages.
+ targetedListener.setMessageListener(listener.getMessageListener());
+ }
+ // Same queue as the Process queue
+ targetedListener.setDestination(listener.getDestination());
+ registerListener(targetedListener);
+ targetedListener.afterPropertiesSet();
+ targetedListener.setType(Type.Target);
+ targetedListener.initialize();
+ targetedListener.start();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_TARGET_LISTENER__INFO",
+ new Object[] {targetedListener.getMessageSelector(), controller.getComponentName() });
+ }
+ break;
+
+ }
+ }
- }
}
}
public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception {
@@ -1190,7 +1271,7 @@ public class JmsInputChannel implements
newListener.setMessageListener(this);
newListener.setController(getController());
- TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName());
+ TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName(), aDelegateKey);
resolver.setConnectionFactory(f);
resolver.setListener(newListener);
newListener.setDestinationResolver(resolver);
@@ -1212,7 +1293,7 @@ public class JmsInputChannel implements
}
newListener.afterPropertiesSet();
if ( controller != null && controller.isStopped() ) {
- newListener.destroy(true); // shutdownNow
+ stopListener(newListener); // shutdownNow
// we are aborting, the controller has been stopped
return;
}
@@ -1231,11 +1312,13 @@ public class JmsInputChannel implements
}
}
public boolean isListenerActiveOnDestination(Destination destination ) {
- List<UimaDefaultMessageListenerContainer> ll = getListeners();
- for (UimaDefaultMessageListenerContainer mListener : ll ) {
- if ( mListener.getDestination() != null &&
- mListener.getDestination() == destination &&
- mListener.isRunning()) {
+ List<Listener> ll = getListeners();
+ for (Listener mListener : ll ) {
+ UimaDefaultMessageListenerContainer lc =
+ (UimaDefaultMessageListenerContainer)mListener;
+ if ( lc.getDestination() != null &&
+ lc.getDestination() == destination &&
+ lc.isRunning()) {
return true;
}
}
@@ -1252,23 +1335,25 @@ public class JmsInputChannel implements
*
* @return - list of listeners
*/
- private UimaDefaultMessageListenerContainer[] getListenersForEndpoint(String anEndpointName) {
- List<UimaDefaultMessageListenerContainer> ll = getListeners();
+ private Listener[] getListenersForEndpoint(String anEndpointName) {
+ List<Listener> ll = getListeners();
- List<UimaDefaultMessageListenerContainer> listeners = new ArrayList<UimaDefaultMessageListenerContainer>();
- for (UimaDefaultMessageListenerContainer mListener : ll) {
- if (mListener.getDestinationName() != null
- && mListener.getDestinationName().equals(anEndpointName)) {
+ List<Listener> listeners = new ArrayList<>();
+ for (Listener mListener : ll) {
+ UimaDefaultMessageListenerContainer lc =
+ (UimaDefaultMessageListenerContainer)mListener;
+ if (lc.getDestinationName() != null
+ && lc.getDestinationName().equals(anEndpointName)) {
listeners.add(mListener);
- } else if (mListener.getDestination() != null
- && mListener.getDestination().toString().equals(anEndpointName)) {
+ } else if (lc.getDestination() != null
+ && lc.getDestination().toString().equals(anEndpointName)) {
listeners.add(mListener);
}
}
- if (listeners.size() > 0) {
- UimaDefaultMessageListenerContainer[] listenerArray = new UimaDefaultMessageListenerContainer[listeners
- .size()];
+ if (!listeners.isEmpty()) {
+ Listener[] listenerArray = new Listener[listeners.size()];
+
listeners.toArray(listenerArray);
return listenerArray;
}
@@ -1285,34 +1370,36 @@ public class JmsInputChannel implements
return;
}
// Fetch all associated listeners.
- final UimaDefaultMessageListenerContainer[] mListeners = getListenersForEndpoint(anEndpointName);
+ final Listener[] mListeners = getListenersForEndpoint(anEndpointName);
if (mListeners == null) {
return;
}
// Stop each listener
- for (final UimaDefaultMessageListenerContainer mListener : mListeners) {
- if (!mListener.isRunning()) {
+ for (final Listener mListener : mListeners) {
+ UimaDefaultMessageListenerContainer lc =
+ (UimaDefaultMessageListenerContainer)mListener;
+ if (!lc.isRunning()) {
continue; // Already Stopped
}
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)
- && mListener.getDestination() != null) {
+ && lc.getDestination() != null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_stop_listener__INFO",
- new Object[] { mListener.getDestination().toString() });
+ new Object[] { lc.getDestination().toString() });
}
- mListener.stop();
+ lc.stop();
if (getController() != null) {
Endpoint endpoint = ((AggregateAnalysisEngineController) getController()).lookUpEndpoint(
aDelegateKey, false);
endpoint.setStatus(Endpoint.FAILED);
- List<UimaDefaultMessageListenerContainer> ll = null;
+ List<Listener> ll = null;
if ( listenerMap.containsKey(getController().getComponentName()+getController().hashCode()) ) {
ll = listenerMap.get(getController().getComponentName()+getController().hashCode());
}
- if (mListener.getConnectionFactory() != null) {
+ if (lc.getConnectionFactory() != null) {
if (getController() instanceof AggregateAnalysisEngineController) {
if (!failedListenerMap.containsKey(aDelegateKey)) {
failedListenerMap.put(aDelegateKey, mListener);
@@ -1345,7 +1432,7 @@ public class JmsInputChannel implements
}
}
public boolean isListenerForDestination(String anEndpointName) {
- UimaDefaultMessageListenerContainer[] mListeners = getListenersForEndpoint(anEndpointName);
+ Listener[] mListeners = getListenersForEndpoint(anEndpointName);
if (mListeners == null) {
return false;
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon Feb 26 18:54:11 2018
@@ -60,6 +60,7 @@ import org.apache.uima.aae.UIMAEE_Consta
import org.apache.uima.aae.UimaSerializer;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
@@ -87,7 +88,7 @@ import com.thoughtworks.xstream.XStream;
public class JmsOutputChannel implements OutputChannel {
- private static final Class CLASS_NAME = JmsOutputChannel.class;
+ private static final Class<?> CLASS_NAME = JmsOutputChannel.class;
private static final long INACTIVITY_TIMEOUT = 5; // MINUTES
@@ -159,7 +160,7 @@ public class JmsOutputChannel implements
serverURI = aServerURI;
}
- protected void setFreeCasQueue(Destination destination) {
+ public void setFreeCasQueue(Destination destination) {
freeCASTempQueue = destination;
}
@@ -175,11 +176,12 @@ public class JmsOutputChannel implements
*
* @param connectionFactory
*/
+ /*
public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
this.connectionFactory.setTrustAllPackages(true);
}
-
+*/
public void setServiceInputEndpoint(String anEnpoint) {
serviceInputEndpoint = anEnpoint;
}
@@ -359,11 +361,12 @@ public class JmsOutputChannel implements
Connection conn = brokerConnectionEntry.getConnection();
try {
if ( conn != null)
- if ( conn != null && !((ActiveMQConnection)conn).isClosed()) {
+ if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
.entrySet()) {
endpoints.getValue().close(); // close session and producer
}
+ brokerConnectionEntry.getConnection().stop();
brokerConnectionEntry.getConnection().close();
brokerConnectionEntry.setConnection(null);
}
@@ -446,6 +449,7 @@ public class JmsOutputChannel implements
try {
connectionSemaphore.acquire();
+ /*
// If sending a Free Cas Request to a remote Cas Multiplier always use the CM's
// broker
if ( anEndpoint.isFreeCasEndpoint() && anEndpoint.isCasMultiplier() && anEndpoint.isReplyEndpoint()) {
@@ -456,6 +460,9 @@ public class JmsOutputChannel implements
brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
}
+ */
+ brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
+
String key = getLookupKey(anEndpoint);
String destination = getDestinationName(anEndpoint);
@@ -540,7 +547,7 @@ public class JmsOutputChannel implements
// Returns InputChannel if the Reply Listener for the delegate has previously failed.
// If the listener hasnt failed the getReplyInputChannel returns null
// InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
- InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString());
+ InputChannel iC = getAnalysisEngineController().getInputChannel();
if ( iC != null ) {
try {
// Create a new Listener, new Temp Queue and associate the listener with the Input Channel
@@ -713,9 +720,12 @@ public class JmsOutputChannel implements
case AsynchAEMessage.Process:
logRequest("UIMAEE_service_sending_process_request__FINE", anEndpoint);
- serializeCasAndSend(getAnalysisEngineController().
- getInProcessCache().
- getCacheEntryForCAS(aCasReferenceId), anEndpoint);
+ CasStateEntry stateEntry =
+ getAnalysisEngineController().getLocalCache().lookupEntry(aCasReferenceId);
+ serializeCasAndSend(stateEntry, anEndpoint);
+// serializeCasAndSend(getAnalysisEngineController().
+// getInProcessCache().
+// getCacheEntryForCAS(aCasReferenceId), anEndpoint);
return; /// <<<<< RETURN - Done here >>>>
@@ -747,7 +757,7 @@ public class JmsOutputChannel implements
}
}
- private void serializeCasAndSend(CacheEntry entry, Endpoint anEndpoint) throws Exception {
+ private void serializeCasAndSend(CasStateEntry entry, Endpoint anEndpoint) throws Exception {
if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
String serializedCAS = getSerializedCasAndReleaseIt(false, entry.getCasReferenceId(), anEndpoint,
anEndpoint.isRetryEnabled());
@@ -785,7 +795,8 @@ public class JmsOutputChannel implements
}
- public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
+// public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
+ public void sendReply(CasStateEntry entry, Endpoint anEndpoint) throws AsynchAEException {
try {
anEndpoint.setReplyEndpoint(true);
if (anEndpoint.isRemote()) {
@@ -844,14 +855,15 @@ public class JmsOutputChannel implements
// If this service is a Cas Multiplier add to the message a FreeCasQueue.
// The client may need send Stop request to that queue.
if (aCommand == AsynchAEMessage.ServiceInfo
- && getAnalysisEngineController().isCasMultiplier() ) {
- if ( freeCASTempQueue != null ) {
+ && getAnalysisEngineController().isCasMultiplier() ) {
+ if ( freeCASTempQueue != null ) {
// Attach a temp queue to the outgoing message. This a queue where
// Free CAS notifications need to be sent from the client
tm.setJMSReplyTo(freeCASTempQueue);
- }
+
+ }
// new services will receive FreeCas request via a targeted queue
- StringBuffer selector = new StringBuffer().
+ StringBuilder selector = new StringBuilder().
append("TargetServiceId = ").
append("'").append(hostIP).append(":").
append(getAnalysisEngineController().getPID()).
@@ -1044,9 +1056,15 @@ public class JmsOutputChannel implements
}
long msgSize = 0;
+ System.out.println("------------ Service:"+getAnalysisEngineController().getComponentName()+" Sending GetMeta Reply");
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
-
+ Object destination = anEndpoint.getReplyDestination();
+ if ( destination == null ) {
+ destination = anEndpoint.getDestination();
+ }
+ System.out.println(".......... Service:"+getAnalysisEngineController().getComponentName()+" replying with GetMeta to reply queue:"+destination);
+
anEndpoint.setReplyEndpoint(true);
// Initialize JMS connection to given endpoint
@@ -1237,7 +1255,7 @@ public class JmsOutputChannel implements
CAS cas = null;
try {
String serializedCAS = null;
- // Using Cas reference Id retrieve CAS from the shared Cash
+ // Using Cas reference Id retrieve CAS from the shared Cache
cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
if (cas == null) {
@@ -1509,7 +1527,7 @@ public class JmsOutputChannel implements
this.controllerInputEndpoint = controllerInputEndpoint;
}
- private void dispatch(Message aMessage, Endpoint anEndpoint, CacheEntry entry, boolean isRequest,
+ private void dispatch(Message aMessage, Endpoint anEndpoint, CasStateEntry entry, boolean isRequest,
JmsEndpointConnection_impl endpointConnection, long msgSize) throws Exception {
if ( anEndpoint == null ) {
@@ -1627,10 +1645,9 @@ public class JmsOutputChannel implements
}
}
- private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CacheEntry entry,
+ private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CasStateEntry casStateEntry,
Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
ServiceShutdownException {
- CasStateEntry casStateEntry = null;
long msgSize = 0;
try {
if (aborting) {
@@ -1639,9 +1656,7 @@ public class JmsOutputChannel implements
// If this is a reply to a client, use the same broker URL that manages this service input queue.
// Otherwise this is a request so use a broker specified in the endpoint object.
String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
-
- casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
- entry.getCasReferenceId());
+/*
if (casStateEntry == null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.WARNING,
@@ -1651,12 +1666,12 @@ public class JmsOutputChannel implements
"UIMAJMS_unable_to_send_reply__WARNING",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getDestination(), brokerConnectionURL,
- entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(),
- entry.getCasReferenceId(), 0,
+ casStateEntry.getParentCasReferenceId() == null ? "" : casStateEntry.getParentCasReferenceId(),
+ casStateEntry.getCasReferenceId(), 0,
new Exception("Unable to lookup entry in Local Cache for a given Cas Id") });
return;
}
-
+*/
// Get the connection object for a given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
@@ -1689,19 +1704,22 @@ public class JmsOutputChannel implements
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_send_reply__WARNING",
new Object[] { getAnalysisEngineController().getComponentName(),
- anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex });
+ anEndpoint.getDestination(), brokerConnectionURL, casStateEntry.getParentCasReferenceId() == null ? "" : casStateEntry.getParentCasReferenceId(), casStateEntry.getCasReferenceId(), 0, ex });
return;
}
// Add Cas Reference Id to the outgoing JMS Header
- tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
+ tm.setStringProperty(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
// Add common properties to the JMS Header
if (isRequest == true) {
populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
} else {
- populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
- tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
+ populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
+ CacheEntry cacheEntry =
+ getAnalysisEngineController().
+ getInProcessCache().getCacheEntryForCAS(casStateEntry.getCasReferenceId());
+ tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, cacheEntry.sentDeltaCas());
}
- // The following is true when the analytic is a CAS Multiplier
+ // The following is true when the service is a CAS Multiplier
if (casStateEntry.isSubordinate() && !isRequest) {
// Override MessageType set in the populateHeaderWithContext above.
// Make the reply message look like a request. This message will contain a new CAS
@@ -1710,10 +1728,10 @@ public class JmsOutputChannel implements
tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
isRequest = true;
// Save the id of the parent CAS
- tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
+ tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(casStateEntry
.getCasReferenceId()));
// Add a sequence number assigned to this CAS by the controller
- tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
+ tm.setLongProperty(AsynchAEMessage.CasSequence, casStateEntry.getSequenceNumber());
// If this is a Cas Multiplier, add a reference to a special queue where
// the client sends Free Cas Notifications
if (freeCASTempQueue != null) {
@@ -1729,11 +1747,11 @@ public class JmsOutputChannel implements
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_send_cas_to_collocated_service_detail__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
- anEndpoint.getEndpoint(), entry.getCasReferenceId(),
- entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
+ anEndpoint.getEndpoint(), casStateEntry.getCasReferenceId(),
+ casStateEntry.getParentCasReferenceId(), casStateEntry.getParentCasReferenceId() });
}
}
- dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
+ dispatch(tm, anEndpoint, casStateEntry, isRequest, endpointConnection, msgSize);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
@@ -1757,21 +1775,25 @@ public class JmsOutputChannel implements
private Delegate lookupDelegate(String aDelegateKey) {
if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
- Delegate delegate = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+ return ((AggregateAnalysisEngineController) getAnalysisEngineController())
.lookupDelegate(aDelegateKey);
- return delegate;
}
return null;
}
- private void addCasToOutstandingList(CacheEntry entry, boolean isRequest, String aDelegateKey) {
+ private void addCasToOutstandingList(CasStateEntry casStateEntry, boolean isRequest, String aDelegateKey)
+ throws AsynchAEException {
Delegate delegate = null;
if (isRequest && (delegate = lookupDelegate(aDelegateKey)) != null) {
- delegate.addCasToOutstandingList(entry.getCasReferenceId(), entry.getCas().hashCode(), false); // false=dont start timer thread per CAS
+ CacheEntry cacheEntry =
+ getAnalysisEngineController().
+ getInProcessCache().getCacheEntryForCAS(casStateEntry.getCasReferenceId());
+
+ delegate.addCasToOutstandingList(casStateEntry.getCasReferenceId(), cacheEntry.getCas().hashCode(), false); // false=dont start timer thread per CAS
}
}
- private void removeCasFromOutstandingList(CacheEntry entry, boolean isRequest, String aDelegateKey) {
+ private void removeCasFromOutstandingList(CasStateEntry entry, boolean isRequest, String aDelegateKey) {
Delegate delegate = null;
if (isRequest && (delegate = lookupDelegate(aDelegateKey)) != null) {
delegate.removeCasFromOutstandingList(entry.getCasReferenceId());
@@ -1787,7 +1809,7 @@ public class JmsOutputChannel implements
if (casStateEntry.isSubordinate()) {
// Recurse until the top CAS reference Id is found
- return getTopParentCasReferenceId(casStateEntry.getInputCasReferenceId());
+ return getTopParentCasReferenceId(casStateEntry.getParentCasReferenceId());
}
// Return the top ancestor CAS id
return casStateEntry.getCasReferenceId();
@@ -2097,7 +2119,7 @@ public class JmsOutputChannel implements
private static class RecoveryThread implements Runnable {
Endpoint endpoint;
- CacheEntry entry;
+ CasStateEntry entry;
boolean isRequest;
@@ -2105,7 +2127,7 @@ public class JmsOutputChannel implements
JmsOutputChannel outputChannel;
- public RecoveryThread(JmsOutputChannel channel, Endpoint anEndpoint, CacheEntry anEntry,
+ public RecoveryThread(JmsOutputChannel channel, Endpoint anEndpoint, CasStateEntry anEntry,
boolean isRequest, AnalysisEngineController aController) {
endpoint = anEndpoint;
entry = anEntry;
@@ -2124,9 +2146,8 @@ public class JmsOutputChannel implements
// Mark this delegate as Failed
delegate.getEndpoint().setStatus(Endpoint.FAILED);
// Destroy listener associated with a reply queue for this delegate
- InputChannel ic = controller.getInputChannel(delegate.getEndpoint().getDestination()
- .toString());
- if (ic != null && delegate != null && delegate.getEndpoint() != null) {
+ InputChannel ic = controller.getInputChannel();
+ if (delegate.getEndpoint() != null) {
ic.destroyListener(delegate.getEndpoint().getDestination().toString(), endpoint
.getDelegateKey());
}
@@ -2142,4 +2163,7 @@ public class JmsOutputChannel implements
}
}
+ public ENDPOINT_TYPE getType() {
+ return ENDPOINT_TYPE.JMS;
+ }
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java Mon Feb 26 18:54:11 2018
@@ -35,10 +35,13 @@ public class TempDestinationResolver imp
private String serviceName = "";
+ private String endpoint = "";
+
public TempDestinationResolver() {
}
- public TempDestinationResolver(String name) {
+ public TempDestinationResolver(String name, String endpoint) {
serviceName = name;
+ this.endpoint = endpoint;
}
/**
* This method is called by the Spring listener code. It creates a single temp queue for all
@@ -48,7 +51,9 @@ public class TempDestinationResolver imp
*/
public Destination resolveDestinationName(Session session, String destinationName,
boolean pubSubDomain) throws JMSException {
- synchronized (mutex) {
+ System.out.println("************ resolveDestinationName() Controller:"+serviceName+" Endpoint:"+endpoint+"************************");
+
+ synchronized (mutex) {
if (destination == null) {
destination = session.createTemporaryQueue();
if (listener != null) {