You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/05/07 21:27:44 UTC
svn commit: r1831129 [1/5] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima/...
Author: cwiklik
Date: Mon May 7 21:27:43 2018
New Revision: 1831129
URL: http://svn.apache.org/viewvc?rev=1831129&view=rev
Log:
UIMA-5501
Added:
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTopLevelTestAggregate.xml
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAS.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessServiceInfoResponseCommand.java
Modified:
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/AbstractUimaASDeployer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/UimaAsServiceDeployer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_CMAggregateWithCollocatedCM.xml
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_ComplexAggregate.xml
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotator.xml
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelComplexAggregateCM.xml
uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/CasMultiplierAggregateWith2Multipliers.xml
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsPriorityBasedThreadFactory.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java
uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Mon May 7 21:27:43 2018
@@ -181,7 +181,9 @@ public class JmsEndpointConnection_impl
ServiceShutdownException, ConnectException {
synchronized (lock) {
try {
-
+if ( !controller.isPrimitive() && isReplyEndpoint ) {
+ System.out.println("JmsOutputChannel.openChannel()");
+}
// If replying to http request, reply to a queue managed by this service broker using tcp
// protocol
if (isReplyEndpoint && brokerUri.startsWith("http")) {
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Mon May 7 21:27:43 2018
@@ -935,18 +935,26 @@ public class JmsInputChannel implements
}
}
private void stopListener(final UimaDefaultMessageListenerContainer mL) throws Exception {
+ // Thread.currentThread().dumpStack();
System.out.println(".... "+getController().getComponentName()+" Stopping Listener Type:"+mL.getType());
mL.setTerminating();
mL.setAcceptMessagesWhileStopping(false);
mL.stop();
- System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Calling destroy()");
+ System.out.println(".... "+getController().getComponentName()+" Stopping "+mL.getType()+" Listener - Calling destroy()");
System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Done Calling destroy()");
if ( mL.getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
System.out.println(".... "+getController().getComponentName()+" Stopping ThreadPoolTaskExecutor");
ThreadPoolTaskExecutor tpe = ((ThreadPoolTaskExecutor)mL.getTaskExecutor());
- tpe.destroy();
+
+ try {
+ tpe.shutdown();
+
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+
} else {
System.out.println(".... "+getController().getComponentName()+" ActiveConsumerCount:"+mL.getActiveConsumerCount());
@@ -969,6 +977,7 @@ public class JmsInputChannel implements
}
+
System.out.println(".... "+getController().getComponentName()+" Stopping Listener - Stopped Task Executor");
mL.shutdown();
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon May 7 21:27:43 2018
@@ -461,8 +461,15 @@ public class JmsOutputChannel implements
}
*/
- brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
-
+ /*
+ if ( anEndpoint.isFreeCasEndpoint() && anEndpoint.isCasMultiplier() && anEndpoint.isReplyEndpoint()) {
+ brokerConnectionURL = anEndpoint.getServerURI();
+ } else {
+ brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
+ }
+ */
+ brokerConnectionURL = anEndpoint.getServerURI();
+
String key = getLookupKey(anEndpoint);
String destination = getDestinationName(anEndpoint);
@@ -1063,14 +1070,17 @@ public class JmsOutputChannel implements
if ( destination == null ) {
destination = anEndpoint.getDestination();
}
- System.out.println(".......... Service:"+getAnalysisEngineController().getComponentName()+" replying with GetMeta to reply queue:"+destination);
-
+ if ( analysisEngineController instanceof AggregateAnalysisEngineController ) {
+ System.out.println("Aggregate Controller replying with GetMeta");
+ }
+
anEndpoint.setReplyEndpoint(true);
// Initialize JMS connection to given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ System.out.println(".......... Service:"+getAnalysisEngineController().getComponentName()+" replying with GetMeta to reply queue:"+destination+" Broker:"+endpointConnection.getServerUri());
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_produce_txt_msg__FINE",
new Object[] {});
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java Mon May 7 21:27:43 2018
@@ -49,7 +49,7 @@ import org.springframework.jms.listener.
public class PriorityMessageHandler implements SessionAwareMessageListener {
private PriorityBlockingQueue<MessageWrapper> queue =
- new PriorityBlockingQueue<MessageWrapper>();
+ new PriorityBlockingQueue<>();
private Semaphore targetedListenerSemaphore = null;
private Semaphore processListenerSemaphore = null;
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java Mon May 7 21:27:43 2018
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
import org.springframework.jms.support.destination.DestinationResolver;
public class TempDestinationResolver implements DestinationResolver {
@@ -52,15 +53,20 @@ public class TempDestinationResolver imp
public Destination resolveDestinationName(Session session, String destinationName,
boolean pubSubDomain) throws JMSException {
System.out.println("************ resolveDestinationName() Controller:"+serviceName+" Endpoint:"+endpoint+"************************");
-
- synchronized (mutex) {
- if (destination == null) {
- destination = session.createTemporaryQueue();
- if (listener != null) {
- listener.setDestination(destination);
- }
- }
- }
+ try {
+ synchronized (mutex) {
+ // if (destination == null) {
+ if ( listener.getDestination() == null || ((ActiveMQDestination)listener.getDestination()).isTemporary() ) {
+ destination = session.createTemporaryQueue();
+ if (listener != null) {
+ listener.setDestination(destination);
+ }
+ }
+ }
+ } catch( Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
return destination;
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Mon May 7 21:27:43 2018
@@ -79,6 +79,7 @@ import org.springframework.jms.listener.
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.jms.support.JmsUtils;
+import org.springframework.jms.support.destination.CachingDestinationResolver;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -168,6 +169,7 @@ public class UimaDefaultMessageListenerC
return transport;
}
public void setType(Type t) {
+ System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>> Listener.setType() - assigning type:"+t);
this.type = t;
}
public Type getType() {
@@ -180,7 +182,29 @@ public class UimaDefaultMessageListenerC
this();
this.freeCasQueueListener = freeCasQueueListener;
}
-
+ protected void refreshDestination() {
+ String destName = getDestinationName();
+ Destination d = getDestination();
+ if (destName != null) {
+ System.out.println(">>>>>>>>>>>>>>> refreshDestination() - destination:"+d+" DestName:"+destName+" Type:"+type);
+ DestinationResolver destResolver = getDestinationResolver();
+ if (destResolver instanceof CachingDestinationResolver) {
+ ((CachingDestinationResolver) destResolver).removeFromCache(destName);
+ }
+ if( Type.Reply.equals(type) || Type.FreeCAS.equals(type)) {
+ try {
+ ActiveMQConnection c = (ActiveMQConnection)getSharedConnection();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ getDestinationResolver().resolveDestinationName(s,"",false);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ }
+ //super.refreshDestination();
+ }
public void setTargetedListener() {
targetedListener = true;
}
@@ -198,39 +222,38 @@ public class UimaDefaultMessageListenerC
* Overriden Spring's method that tries to recover from lost connection. We dont
* want to recover when the service is stopping.
*/
- protected void refreshConnectionUntilSuccessful() {
- // System.out.println("............refreshConnectionUntilSuccessful() called");
- while (isRunning()) {
-
- try {
-
- if (sharedConnectionEnabled()) {
- refreshSharedConnection();
- } else {
- Connection con = createConnection();
- JmsUtils.closeConnection(con);
- }
- // Use UIMA-AS custom Destination Resolver to create a new temp queue for this reply listener
- if( Type.Reply.equals(type) || Type.FreeCAS.equals(type)) {
- getDestinationResolver().resolveDestinationName(getSharedConnection().createSession(false, Session.AUTO_ACKNOWLEDGE),"",false);
- }
- logger.info(getType().name()+" Listener Successfully refreshed JMS Connection to broker: "+getBrokerUrl()+" Endpoint:"+getDestination());
- logListenerFailure = true;
- break;
- }
- catch (Exception ex) {
-
- if (ex instanceof JMSException) {
- if (logListenerFailure && ex.getCause() instanceof ConnectException) {
-
- logger.info(getType().name()+" Listener lost connection to broker: "+getBrokerUrl()+"- Retrying until successfull ...");
- logListenerFailure = false;
- } else {
- invokeExceptionListener((JMSException) ex);
- }
- }
- }
- }
+// protected void refreshConnectionUntilSuccessful() {
+// System.out.println("............refreshConnectionUntilSuccessful() called - Listener Hashcode:"+this.hashCode()+ " Thread Name:"+Thread.currentThread().getName());;
+// while (isRunning()) {
+//
+// try {
+//
+// if (sharedConnectionEnabled()) {
+// refreshSharedConnection();
+// }
+// // Use UIMA-AS custom Destination Resolver to create a new temp queue for this reply listener
+// if( Type.Reply.equals(type) || Type.FreeCAS.equals(type)) {
+// getDestinationResolver().resolveDestinationName(getSharedConnection().createSession(false, Session.AUTO_ACKNOWLEDGE),"",false);
+// Thread.currentThread().dumpStack();
+// }
+// logger.info(getType().name()+" Listener Successfully refreshed JMS Connection to broker: "+getBrokerUrl()+" Endpoint:"+getDestination());
+// logListenerFailure = true;
+// break;
+// }
+// catch (Exception ex) {
+//
+// if (ex instanceof JMSException) {
+// if (logListenerFailure && ex.getCause() instanceof ConnectException) {
+//
+// logger.info(getType().name()+" Listener lost connection to broker: "+getBrokerUrl()+"- Retrying until successfull ...");
+// logListenerFailure = false;
+// } else {
+// ex.printStackTrace();
+// invokeExceptionListener((JMSException) ex);
+// }
+// }
+// }
+// }
/*
boolean doLogFailureMsg = true;
try {
@@ -323,12 +346,12 @@ public class UimaDefaultMessageListenerC
} catch( IllegalStateException e ) {
}
*/
- }
- protected void recoverAfterListenerSetupFailure() {
- if ( !terminating ) {
- super.recoverAfterListenerSetupFailure();
- }
- }
+ //}
+// protected void recoverAfterListenerSetupFailure() {
+// if ( !terminating ) {
+// super.recoverAfterListenerSetupFailure();
+// }
+// }
public void setTerminating() {
terminating = true;
@@ -635,16 +658,18 @@ public class UimaDefaultMessageListenerC
/**
* This method is called by Spring when a listener fails
*/
- protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
- if ( t.getCause() instanceof InterruptedException ) {
-// System.out.println("............handleListenerFailure(Throwable t, boolean alreadyHandled) called - Cause:"+t);
- return;
- }
- // If shutdown already, nothing to do
- // If controller is stopping no need to recover the connection
- if (!super.isRunning() || awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
- return;
- }
+// protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
+// if ( t.getCause() instanceof InterruptedException ) {
+//// System.out.println("............handleListenerFailure(Throwable t, boolean alreadyHandled) called - Cause:"+t);
+// return;
+// }
+// // If shutdown already, nothing to do
+// // If controller is stopping no need to recover the connection
+// if (!super.isRunning() || awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
+// return;
+// }
+//
+
/*
if ( controller != null ) {
controller.changeState(ServiceState.FAILED);
@@ -711,7 +736,7 @@ public class UimaDefaultMessageListenerC
failed = true;
}
*/
- }
+// }
public Endpoint getEndpoint() {
return endpoint;
}
@@ -1120,18 +1145,21 @@ public class UimaDefaultMessageListenerC
if ( isRunning()) {
return;
}
- int consumerThreadCount=-1;
- if ( getTaskExecutor() instanceof ThreadPoolTaskExecutor) {
+ // int consumerThreadCount=-1;
+ /*
+ if ( !Type.ProcessCAS.equals(type) && getTaskExecutor() instanceof ThreadPoolTaskExecutor) {
((ThreadPoolTaskExecutor)getTaskExecutor()).initialize();
// if this listener is a handling Process requests, the prestartAllCoreThreads() below
// will force initialization of AEs if this is a primitive service.
((ThreadPoolTaskExecutor)getTaskExecutor()).getThreadPoolExecutor().prestartAllCoreThreads();
}
+
+ */
super.afterPropertiesSet();
super.initialize();
super.start();
- System.out.println(">>>>>>> Listener Service:"+controller.getComponentName()+" Broker URL:"+getBrokerUrl()+" Endpoint:"+__listenerRef.getEndpoint()+" ConsumerThreadCount:"+consumerThreadCount);
+ System.out.println(">>>>>>> Listener Service:"+controller.getComponentName()+" Broker URL:"+getBrokerUrl()+" Endpoint:"+__listenerRef.getEndpoint()+" ConsumerThreadCount:"+getConcurrentConsumers()+ " Type:"+getType());
}
private Object getPojoListener() {
@@ -1500,14 +1528,26 @@ public class UimaDefaultMessageListenerC
* Called by Spring to inject TaskExecutor
*/
public void setTaskExecutor(TaskExecutor aTaskExecutor) {
- taskExecutor = aTaskExecutor;
+
+ taskExecutor = aTaskExecutor;
+ /*
+ if ( Type.ProcessCAS.equals(getType())) {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(1);
+ executor.setMaxPoolSize(1);
+ executor.setQueueCapacity(1);
+ executor.initialize();
+ super.setTaskExecutor(executor);
+ } else {
+ super.setTaskExecutor(aTaskExecutor);
+ }
+ */
super.setTaskExecutor(aTaskExecutor);
}
-
public TaskExecutor getTaskExecutor() {
- return taskExecutor;
+
+ return taskExecutor;
}
-
/**
* This method initializes ThreadPoolExecutor with a custom ThreadPool. Each thread produced by
* the ThreadPool is used to first initialize an instance of the AE before the thread is added to
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Mon May 7 21:27:43 2018
@@ -109,6 +109,7 @@ import org.apache.uima.resource.Resource
import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
import org.apache.uima.util.Level;
import org.apache.xmlbeans.XmlDocumentProperties;
+import org.apache.xmlbeans.XmlOptions;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
@@ -165,14 +166,25 @@ public class BaseUIMAAsynchronousEngine_
private Thread dispatchThread;
+ private Transport transport;
+
protected volatile boolean stopped = false;
public BaseUIMAAsynchronousEngine_impl() {
+ this(Transport.JMS); // default
+ /*
super();
UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
"UIMA Version " + UIMAFramework.getVersionString() +
" UIMA-AS Version " + UimaAsVersion.getVersionString());
+ */
+ }
+ public BaseUIMAAsynchronousEngine_impl(Transport transport) {
+ super();
+ this.transport = transport;
+ UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
+ "UIMA Version " + UIMAFramework.getVersionString() +
+ " UIMA-AS Version " + UimaAsVersion.getVersionString());
}
-
protected void beforeProcessReply(String casReferenceId) {
if ( service != null ) {
service.removeFromCache(casReferenceId);
@@ -698,13 +710,7 @@ public class BaseUIMAAsynchronousEngine_
// Create a singleton shared connection object
SharedConnection sharedConnection = sharedConnections.get(aBrokerURI);
sharedConnection.setConnectionFactory(connectionFactory);
-/*
- new SharedConnection(
- connectionFactory,
- //new ActiveMQConnectionFactory(aBrokerURI),
- aBrokerURI);
- sharedConnection.setSemaphore(semaphore);
- */
+
// Add AMQ specific connection validator
sharedConnection
.setConnectionValidator(connectionValidator);
@@ -798,7 +804,7 @@ public class BaseUIMAAsynchronousEngine_
private void addPrefetch(ActiveMQConnection aConnection) {
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(5);
- ((ActiveMQConnection) aConnection).setPrefetchPolicy(prefetchPolicy);
+ aConnection.setPrefetchPolicy(prefetchPolicy);
}
protected SharedConnection validateConnection(String aBrokerURI) throws Exception {
@@ -971,13 +977,17 @@ public class BaseUIMAAsynchronousEngine_
reset();
Properties performanceTuningSettings = new Properties();
fetchRequiredProperties(anApplicationContext, performanceTuningSettings);
- transport = (Transport)anApplicationContext.get(UimaAsynchronousEngine.ClientTransport);
+ //transport = (Transport)anApplicationContext.get(UimaAsynchronousEngine.ClientTransport);
if ( Transport.JMS.equals(transport)) {
initializeJMS(anApplicationContext);
} else if ( Transport.Java.equals(transport)) {
if ( service == null ) {
service = UimaAsServiceRegistry.getInstance().lookupByEndpoint(endpoint);
}
+ if ( !(service instanceof AsynchronousUimaASService ) ) {
+ // WRONG SERVICE TYPE FOR THIS CLIENT
+ throw new ResourceInitializationException();
+ }
brokerURI = "java";
initializeLocal(anApplicationContext);
@@ -1145,7 +1155,7 @@ public class BaseUIMAAsynchronousEngine_
}
private AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
org.apache.xmlbeans.XmlOptions options = new org.apache.xmlbeans.XmlOptions();
-
+ options.setValidateOnSet();
XMLReader xmlReader = XMLReaderFactory.createXMLReader();
xmlReader.setFeature("http://xml.org/sax/features/external-general-entities", false);
@@ -1165,23 +1175,13 @@ public class BaseUIMAAsynchronousEngine_
provider = UimaAsDirectServiceBuilder.resolvePlaceholder(provider);
System.out.println("...... provider() - "+provider);
- if (Provider.JAVA.get().equals(provider)) {
- return Provider.JAVA;
- } else if (Provider.ACTIVEMQ.get().equals(provider)) {
- return Provider.ACTIVEMQ;
+ if (org.apache.uima.as.deployer.ServiceDeployers.Provider.JAVA.get().equals(provider)) {
+ return org.apache.uima.as.deployer.ServiceDeployers.Provider.JAVA;
+ } else if (org.apache.uima.as.deployer.ServiceDeployers.Provider.ACTIVEMQ.get().equals(provider)) {
+ return org.apache.uima.as.deployer.ServiceDeployers.Provider.ACTIVEMQ;
} else {
- throw new RuntimeException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
+ throw new IllegalArgumentException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
}
-
- /*
- if (Provider.JAVA.get().equals(provider)) {
- return Provider.JAVA;
- } else if (Provider.ACTIVEMQ.get().equals(provider)) {
- return Provider.ACTIVEMQ;
- } else {
- throw new RuntimeException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
- }
- */
}
private Protocol protocol( AnalysisEngineDeploymentDescriptionDocument dd) {
String protocol =
@@ -1190,23 +1190,14 @@ public class BaseUIMAAsynchronousEngine_
protocol = UimaAsDirectServiceBuilder.resolvePlaceholder(protocol);
System.out.println("...... protocol() - "+protocol);
- if (Protocol.JAVA.get().equalsIgnoreCase(protocol) ) {
- return Protocol.JAVA;
- } else if (Protocol.JMS.get().equalsIgnoreCase(protocol) ) {
- return Protocol.JMS;
+ if (org.apache.uima.as.deployer.ServiceDeployers.Protocol.JAVA.get().equalsIgnoreCase(protocol) ) {
+ return org.apache.uima.as.deployer.ServiceDeployers.Protocol.JAVA;
+ } else if (org.apache.uima.as.deployer.ServiceDeployers.Protocol.JMS.get().equalsIgnoreCase(protocol) ) {
+ return org.apache.uima.as.deployer.ServiceDeployers.Protocol.JMS;
} else {
throw new RuntimeException("Invalid protocol attribute value in Deployment Descriptor :{"+protocol+"} please check <deployment> element. Expected \"java\" or \"jms\"");
}
- /*
- if (Protocol.JAVA.get().equals(protocol)) {
- return Protocol.JAVA;
- } else if (Protocol.JMS.get().equals(protocol)) {
- return Protocol.JMS;
- } else {
- throw new RuntimeException("Invalid protocol attribute value in Deployment Descriptor :{"+protocol+"} please check <deployment> element. Expected \"java\" or \"jms\"");
- }
- */
}
/**
* First generates a Spring context from a given deploy descriptor and than deploys the context
@@ -1220,38 +1211,81 @@ public class BaseUIMAAsynchronousEngine_
* @return - a unique spring container id
*
*/
- public String deploy(String deploymentDescriptorPath, Map anApplicationContext) throws Exception {
- AnalysisEngineDeploymentDescriptionDocument dd =
- parseDD(deploymentDescriptorPath);
-
- XmlDocumentProperties dp = dd.documentProperties();
- System.out.println(dp.getSourceName());
-
- // Use factory to create deployer instance for a given protocol and provider
- UimaAsServiceDeployer deployer =
- ServiceDeployers.newDeployer(protocol(dd), provider(dd));
-
- service = deployer.deploy(dd, anApplicationContext);
-
- UimaAsServiceRegistry.getInstance().register(service);
+ public String deploy(String deploymentDescriptorPath, Map anApplicationContext) throws Exception {
+ // parse provided deployment descriptor and produce object representation for it
+ AnalysisEngineDeploymentDescriptionDocument dd = parseDD(deploymentDescriptorPath);
+/*
+ ArrayList<?> validationErrors = new ArrayList<>();
+ XmlOptions validationOptions = new XmlOptions();
+ validationOptions.setErrorListener(validationErrors);
+ validationOptions.setLoadLineNumbers(XmlOptions.LOAD_LINE_NUMBERS_END_ELEMENT);
+ validationOptions.setLoadLineNumbers();
+ boolean isValid = dd.validate(validationOptions);
+ if ( !isValid ) {
+ Iterator<?> iter = validationErrors.iterator();
+ System.out.println("*** *** *** *** *** *** *** *** *** ***");
+ while (iter.hasNext())
+ {
+ System.out.println("*** DD Validation ERROR>> " + iter.next() + "\n");
+ }
+ throw new AsynchAEException("*** ERROR deployment descriptor validation failed");
+ }
+ */
+ XmlDocumentProperties dp = dd.documentProperties();
+ System.out.println(dp.getSourceName());
+
+ String protocolOverride = null;
+ if ( anApplicationContext!= null && anApplicationContext.containsKey(UimaAsynchronousEngine.Protocol) ) {
+ protocolOverride = (String)anApplicationContext.get(UimaAsynchronousEngine.Protocol);
+ }
+ String providerOverride = null;
+ if ( anApplicationContext!= null && anApplicationContext.containsKey(UimaAsynchronousEngine.Provider)) {
+ providerOverride = (String)anApplicationContext.get(UimaAsynchronousEngine.Provider);
+ }
+ UimaAsServiceDeployer deployer;
+ // if client does not override provider and protocol, use
+ // the DD settings
+ if ( protocolOverride == null && providerOverride == null) {
+ // Use factory to create deployer instance for a given
+ // protocol and provider defined in the DD
+ deployer =
+ ServiceDeployers.newDeployer(protocol(dd), provider(dd));
+ } else {
+ Protocol deploymentProtocol = null;
+ try {
+ deploymentProtocol =
+ org.apache.uima.as.deployer.ServiceDeployers.Protocol.valueOf(protocolOverride.toUpperCase());
+ } catch( IllegalArgumentException e) {
+ System.out.println("***\n*** ERROR specified protocol not supported. Only 'java' and 'jms' are expected. You've provided "+protocolOverride+"\n***");
+ throw e;
+ }
+ Provider deploymentProvider = null;
+ try {
+ deploymentProvider =
+ org.apache.uima.as.deployer.ServiceDeployers.Provider.valueOf(providerOverride.toUpperCase());
+
+ } catch( IllegalArgumentException e) {
+ System.out.println("***\n*** ERROR specified provider not supported. Only 'java' and 'activemq' are expected. You've provided "+providerOverride+" \n***");
+ throw e;
- return service.getId();
+ }
+ deployer =
+ ServiceDeployers.newDeployer(deploymentProtocol, deploymentProvider);
+ }
- }
-
- protected UimaASService getServiceReference() {
- return service;
- }
+ service = deployer.deploy(dd, anApplicationContext);
+
+ UimaAsServiceRegistry.getInstance().register(service);
+
+ return service.getId();
+
+ }
- private void disposeContextFiles(String ...contextFiles) {
- for( String contextFile: contextFiles) {
- File file = new File(contextFile);
- if ( file.exists()) {
- file.delete();
- }
- }
- }
- /**
+ protected UimaASService getServiceReference() {
+ return service;
+ }
+
+ /**
*
*/
public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
@@ -1334,8 +1368,8 @@ public class BaseUIMAAsynchronousEngine_
*/
}
- public void undeploy(String aSpringContainerId) throws Exception {
- this.undeploy(aSpringContainerId, SpringContainerDeployer.STOP_NOW);
+ public void undeploy(String serviceId) throws Exception {
+ this.undeploy(serviceId, SpringContainerDeployer.STOP_NOW);
}
/**
@@ -1343,124 +1377,36 @@ public class BaseUIMAAsynchronousEngine_
* registered in the local registry under a unique id.
*
*/
- public void undeploy(String aSpringContainerId, int stop_level) throws Exception {
- if (aSpringContainerId == null ) {
+ public void undeploy(String serviceId, int stop_level) throws Exception {
+ if (serviceId == null ) {
return;
}
// UimaEEAdminSpringContext adminContext = null;
final UimaASService deployedService =
- UimaAsServiceRegistry.getInstance().lookupById(aSpringContainerId);
+ UimaAsServiceRegistry.getInstance().lookupById(serviceId);
if ( deployedService == null ) {
throw new InvalidContainerException(
"Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
}
switch (stop_level) {
- case SpringContainerDeployer.QUIESCE_AND_STOP:
- //((AnalysisEngineController) ctrer).quiesceAndStop();
- //service.stop();
+ case UimaASService.QUIESCE_AND_STOP:
deployedService.quiesce();
break;
- case SpringContainerDeployer.STOP_NOW:
- // ((AnalysisEngineController) ctrer).terminate();
- //service.stop();
+ case UimaASService.STOP_NOW:
deployedService.stop();
-
-
- break;
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported argument value in the undeploy() call. Please use stop level "
+ + "UimaASService.QUIESCE_AND_STOP" + " OR " + "UimaASService.STOP_NOW"
+ + " as an argument to undeploy() method.");
+
+
}
UimaAsServiceRegistry.getInstance().unregister(deployedService);
- /*
-// if (!springContainerRegistry.containsKey(aSpringContainerId)) {
-// return;
-// // throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId +
-// // ". Unable to undeploy the Spring container");
-// }
-// // Fetch an administrative context which contains a Spring Container
-// adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
-// if (adminContext == null) {
-// throw new InvalidContainerException(
-// "Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
-// }
- // Fetch instance of the Container from its context
- ApplicationContext ctx = adminContext.getSpringContainer();
- // Query the container for objects that implement
- // ControllerLifecycle interface. These
- // objects are typically of type AnalysisEngineController or
- // UimacppServiceController.
- String[] asyncServiceList = ctx
- .getBeanNamesForType(org.apache.uima.aae.controller.ControllerLifecycle.class);
- // Given a valid list of controllers select the first from the list
- // and
- // initiate a shutdown. We don't care which controller will be
- // invoked. In case of
- // AggregateAnalysisEngineController the terminate event will
- // propagate all the way
- // to the top controller in the hierarchy and the shutdown will take
- // place from there.
- // If the controller is of kind UimecppServiceController or
- // PrimitiveAnalysisController
- // the termination logic will be immediately triggered in the
- // terminate() method.
- if (asyncServiceList != null && asyncServiceList.length > 0) {
- boolean topLevelController = false;
- ControllerLifecycle ctrer = null;
- int indx = 0;
- while (!topLevelController) {
- ctrer = (ControllerLifecycle) ctx.getBean(asyncServiceList[indx++]);
- if (ctrer instanceof UimacppServiceController
- || ((AnalysisEngineController) ctrer).isTopLevelComponent()) {
- topLevelController = true;
- }
- }
- // Send a trigger to initiate shutdown.
- if (ctrer != null) {
- if (ctrer instanceof AnalysisEngineController &&
- ((AnalysisEngineController) ctrer).getControllerLatch() != null ) {
- ((AnalysisEngineController) ctrer).getControllerLatch().release();
- }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- String msg = "++++++++++++++++++++++ calling terminate()-service:"+((AnalysisEngineController) ctrer).getComponentName();
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
- new Object[] { msg });
- }
- switch (stop_level) {
- case SpringContainerDeployer.QUIESCE_AND_STOP:
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- String msg = "++++++++++++++++++++++ calling quiesceAndStop()";
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
- new Object[] { msg });
- }
- ((AnalysisEngineController) ctrer).quiesceAndStop();
-
- break;
- case SpringContainerDeployer.STOP_NOW:
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- String msg = "++++++++++++++++++++++ calling terminate()";
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
- new Object[] { msg });
- }
- ((AnalysisEngineController) ctrer).terminate();
- break;
- }
- }
- }
- if (ctx instanceof FileSystemXmlApplicationContext) {
- ((FileSystemXmlApplicationContext) ctx).destroy();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- String msg = "---------------------- Destroying Application Context:"+((FileSystemXmlApplicationContext) ctx).getApplicationName();
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
- new Object[] { msg });
- }
- }
- // Remove the container from a local registry
- springContainerRegistry.remove(aSpringContainerId);
- */
+
}
private void initJMX() throws Exception {
// Generate unique identifier
@@ -1715,33 +1661,7 @@ public class BaseUIMAAsynchronousEngine_
"UIMAJMS_exception__WARNING", e);
}
}
- /*
- PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.Stop);
- msg.addProperty(AsynchAEMessage.Destination, cmFreeCasQueue);
- msg.addProperty(AsynchAEMessage.CasReference, casReferenceId);
- try {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue});
- }
- sender.dispatchMessage(msg, this, false);
-
- } catch (Exception ex) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING",
- ex);
- }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
- }
- }
- */
}
protected void dispatchFreeCasRequest(String casReferenceId, Message message) throws Exception {
PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.ReleaseCAS);
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java Mon May 7 21:27:43 2018
@@ -19,24 +19,22 @@
package org.apache.uima.adapter.jms.service;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
import java.io.InvalidClassException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Scanner;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UimaASApplicationExitEvent;
import org.apache.uima.aae.UimaAsVersion;
-import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.client.UimaAS;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
import org.apache.uima.aae.jmx.monitor.BasicUimaJmxMonitorListener;
import org.apache.uima.aae.jmx.monitor.JmxMonitor;
import org.apache.uima.aae.jmx.monitor.JmxMonitorListener;
import org.apache.uima.aae.service.UimaASService;
-import org.apache.uima.aae.service.UimaAsServiceRegistry;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
@@ -60,6 +58,19 @@ public class UIMA_Service implements App
private JmxMonitor monitor = null;
private Thread monitorThread = null;
+
+ private void setDefaultBrokerURL(String[] args) {
+ String brokerURL = getArg("-brokerURL", args);
+ // Check if broker URL is specified on the command line. If it is not, use the default
+ // localhost:61616. In either case, set the System property defaultBrokerURL. It will be used
+ // by Spring Framework to substitute a place holder in Spring xml.
+ if (brokerURL != "") {
+ System.setProperty("defaultBrokerURL", brokerURL);
+ System.out.println(">>> Setting defaultBrokerURL to:" + brokerURL);
+ } else if ( System.getProperty("defaultBrokerURL") == null) {
+ System.setProperty("defaultBrokerURL", "tcp://localhost:61616");
+ }
+ }
/**
* Parse command args, run dd2spring on the deployment descriptors to generate Spring context
@@ -74,99 +85,32 @@ public class UIMA_Service implements App
UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
"UIMA-AS version " + UimaAsVersion.getFullVersionString());
- int nbrOfArgs = args.length;
- String[] springConfigFileArray;
String[] deploymentDescriptors = getMultipleArg("-d", args);
if (deploymentDescriptors.length == 0) {
// allow multiple args for one key
deploymentDescriptors = getMultipleArg2("-dd", args);
}
- /*
- String saxonURL = getArg("-saxonURL", args);
- String xslTransform = getArg("-xslt", args);
- String uimaAsDebug = getArg("-uimaEeDebug", args);
-
- if (nbrOfArgs < 1
- || (args[0].startsWith("-") && (deploymentDescriptors.length == 0
- || saxonURL.equals("") || xslTransform.equals("")))) {
- printUsageMessage();
- return null;
- }
- */
- String brokerURL = getArg("-brokerURL", args);
- // Check if broker URL is specified on the command line. If it is not, use the default
- // localhost:61616. In either case, set the System property defaultBrokerURL. It will be used
- // by Spring Framework to substitute a place holder in Spring xml.
- if (brokerURL != "") {
- System.setProperty("defaultBrokerURL", brokerURL);
- System.out.println(">>> Setting defaultBrokerURL to:" + brokerURL);
- } else if ( System.getProperty("defaultBrokerURL") == null) { // perhaps already set using -D
- System.setProperty("defaultBrokerURL", "tcp://localhost:61616");
- }
+
+ setDefaultBrokerURL(args);
if (System.getProperty(JmsConstants.SessionTimeoutOverride) != null) {
System.out.println(">>> Setting Inactivity Timeout To: "
+ System.getProperty(JmsConstants.SessionTimeoutOverride));
}
- /*
- if (deploymentDescriptors.length == 0) {
- // array of context files passed in
- springConfigFileArray = args;
- } else {
- // create a String array of spring context files
- springConfigFileArray = new String[deploymentDescriptors.length];
-
- Dd2spring aDd2Spring = new Dd2spring();
- for (int dd = 0; dd < deploymentDescriptors.length; dd++) {
- String deploymentDescriptor = deploymentDescriptors[dd];
-
- File springConfigFile = aDd2Spring.convertDd2Spring(deploymentDescriptor, xslTransform,
- saxonURL, uimaAsDebug);
-
- // if any are bad, fail
- if (null == springConfigFile) {
- return null;
- }
- springConfigFileArray[dd] = springConfigFile.getAbsolutePath();
- // get the descriptor to register with the engine controller
- String deployDescriptor = "";
- File afile = null;
- FileInputStream fis = null;
- try {
- afile = new File(deploymentDescriptor);
- fis = new FileInputStream(afile);
- byte[] bytes = new byte[(int) afile.length()];
- fis.read(bytes);
- deployDescriptor = new String(bytes);
- // Log Deployment Descriptor
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "main",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_deploy_desc__FINEST",
- new Object[] { deployDescriptor });
- } catch (IOException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
- } finally {
- if (fis != null) {
- try {
- fis.close();
- } catch (IOException e) {
- }
- }
- }
- }
- }
-
- return springConfigFileArray;
- */
return deploymentDescriptors;
}
+ /**
+ * @deprecated Spring context files are no longer generated or used in favor of direct parsing of deployment descriptors.
+ * @param springContextFiles
+ * @param listener
+ * @return
+ * @throws Exception
+ */
public SpringContainerDeployer deploy(String[] springContextFiles, ApplicationListener<ApplicationEvent> listener) throws Exception {
- SpringContainerDeployer springDeployer;
+ SpringContainerDeployer springDeployer = null;
+ /*
if ( listener == null ) {
springDeployer = new SpringContainerDeployer(this);
} else {
@@ -193,7 +137,7 @@ public class UIMA_Service implements App
FileSystemXmlApplicationContext context = springDeployer.getSpringContext();
context.addApplicationListener(this);
springDeployer.startListeners();
-
+ */
return springDeployer;
}
@@ -411,15 +355,11 @@ public class UIMA_Service implements App
/**
* The main routine for starting the deployment of a UIMA-AS instance. The args are either: 1 or
- * more "paths" to Spring XML descriptors representing the information needed or some number of
+ * more "paths" to DD XML descriptors representing the information needed or some number of
* parameters, preceeded by a "-" sign. If the first arg doesn't start with a "-" it is presumed
* to be the first format.
*
- * For the 2nd style, the arguments are: -saxonURL a-URL-to-the-saxon-jar usually starting with
- * "file:", -xslt path-to-the-dd2spring.xsl file, -d path-to-UIMA-deployment-descriptor [-d
- * path-to-another-dd ...] these arguments may be in any order)
- *
- * For the 3rd style, like #2 but with multiple dd-files following a single -dd Useful for calling
+ * For the 2nd style, like #2 but with multiple dd-files following a single -dd Useful for calling
* from scripts.
*
* @param args
@@ -427,43 +367,18 @@ public class UIMA_Service implements App
public static void main(String[] args) {
try {
UIMA_Service service = new UIMA_Service();
- /*
- // parse command args and run dd2spring to generate spring context
- // files from deployment descriptors
- String contextFiles[] = service.initialize(args);
- // If no context files generated there is nothing to do
- if (contextFiles == null) {
- return;
- }
- */
- String dd[] = service.initialize(args);
- /*
- // Deploy components defined in Spring context files. This method blocks until
- // the container is fully initialized and all UIMA-AS components are succefully
- // deployed.
- SpringContainerDeployer serviceDeployer = service.deploy(contextFiles);
- if (serviceDeployer == null) {
- System.out.println(">>> Failed to Deploy UIMA Service. Check Logs for Details");
- System.exit(0);
- }
-
- // remove temporary spring context files generated from DD
- for( String contextFile: contextFiles) {
- File file = new File(contextFile);
- if ( file.exists()) {
- file.delete();
- }
- }
- */
- BaseUIMAAsynchronousEngine_impl engine =
- new BaseUIMAAsynchronousEngine_impl();
+ // fetch deployment descriptors from the command line
+ String[] dd = service.initialize(args);
+
+ UimaAsynchronousEngine uimaAS =
+ UimaAS.newInstance(Transport.JMS);
+ List<String> serviceList = new ArrayList<>();
for( String deploymentDescriptorPath : dd ) {
- engine.deploy(deploymentDescriptorPath, new HashMap<>());
+ serviceList.add(uimaAS.deploy(deploymentDescriptorPath, new HashMap<>()));
}
// Add a shutdown hook to catch kill signal and to force quiesce and stop
- ServiceShutdownHook shutdownHook = new ServiceShutdownHook(engine);
-// ServiceShutdownHook shutdownHook = new ServiceShutdownHook(serviceDeployer);
+ ServiceShutdownHook shutdownHook = new ServiceShutdownHook(uimaAS);
Runtime.getRuntime().addShutdownHook(shutdownHook);
// Check if we should start an optional JMX-based monitor that will provide service metrics
// The monitor is enabled by existence of -Duima.jmx.monitor.interval=<number> parameter. By
@@ -476,39 +391,43 @@ public class UIMA_Service implements App
service.startMonitor(Long.parseLong(monitorCheckpointFrequency));
}
-// AnalysisEngineController topLevelControllor = serviceDeployer.getTopLevelController();
String prompt = "Press 'q'+'Enter' to quiesce and stop the service or 's'+'Enter' to stop it now.\nNote: selected option is not echoed on the console.";
- // if (topLevelControllor != null) {
- System.out.println(prompt);
+ System.out.println(prompt);
// Loop forever or until the service is stopped
- while ( engine.isRunning() ) {
-// while (!topLevelControllor.isStopped()) {
+ boolean stop = false;
+ while ( !stop) {
+ Scanner in = null;
+ try {
+ in = new Scanner(System.in);
+ String cmd = in.nextLine();
+ System.out.println("You've Entered .... "+cmd);
+
+ if ( cmd.equalsIgnoreCase("s")) {
+ System.out.println("Calling STOP....");
+ for( String serviceId : serviceList ) {
+ uimaAS.undeploy(serviceId, UimaASService.STOP_NOW);
+ }
+
+ stop = true;
+ // System.exit(0);
+ } else if ( cmd.equalsIgnoreCase("q") ) {
+ for( String serviceId : serviceList ) {
+ System.out.println("Calling QUIT....");
+
+ uimaAS.undeploy(serviceId, UimaASService.QUIESCE_AND_STOP);
+ }
+
+ stop = true;
+
+ }
+ } finally {
+ if ( in != null ) {
+ in.close();
+ }
+ }
+ }
+ System.exit(0);
- if (System.in.available() > 0) {
- int c = System.in.read();
- if (c == 's') {
- service.stopMonitor();
- engine.undeploy();
- //serviceDeployer.undeploy(SpringContainerDeployer.STOP_NOW);
- System.exit(0);
- } else if (c == 'q') {
- engine.undeploy();
- //service.stopMonitor();
- // serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
- System.exit(0);
-
- } else if (Character.isLetter(c) || Character.isDigit(c)) {
- System.out.println(prompt);
- }
- }
- // This is a polling loop. Sleep for 1 sec
-// try {
-// if (!topLevelControllor.isStopped())
-// Thread.sleep(1000);
-// } catch (InterruptedException ex) {
-// }
- } // while
- //}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
@@ -520,13 +439,14 @@ public class UIMA_Service implements App
static class ServiceShutdownHook extends Thread {
public SpringContainerDeployer serviceDeployer;
- BaseUIMAAsynchronousEngine_impl engine;
+
+ private UimaAsynchronousEngine client;
public ServiceShutdownHook(SpringContainerDeployer serviceDeployer) {
this.serviceDeployer = serviceDeployer;
}
- public ServiceShutdownHook(BaseUIMAAsynchronousEngine_impl engine) {
- this.engine = engine;
+ public ServiceShutdownHook(UimaAsynchronousEngine client) {
+ this.client = client;
}
public void run() {
try {
@@ -538,7 +458,7 @@ public class UIMA_Service implements App
"UIMAJMS_caught_signal__INFO", new Object[] { "TopLevelService" });
// "UIMAJMS_caught_signal__INFO", new Object[] { topLevelController.getComponentName() });
// serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
- engine.undeploy();
+ client.undeploy();
Runtime.getRuntime().halt(0);
//}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java Mon May 7 21:27:43 2018
@@ -18,6 +18,7 @@
*/
package org.apache.uima.adapter.jms.service;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -26,9 +27,12 @@ import java.util.concurrent.ThreadFactor
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.AsynchAECasManager_impl;
import org.apache.uima.aae.InProcessCache;
import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.UimaAsPriorityBasedThreadFactory;
import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -44,14 +48,17 @@ import org.apache.uima.aae.handler.input
import org.apache.uima.aae.service.AbstractUimaASService;
import org.apache.uima.aae.service.ScaleoutSpecification;
import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.PriorityMessageHandler;
import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
import org.apache.uima.adapter.jms.service.builder.ActiveMQFactory;
import org.apache.uima.adapter.jms.service.builder.JmsMessageListenerBuilder;
import org.apache.uima.as.client.Listener.Type;
import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.util.Level;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class UimaASJmsService extends AbstractUimaASService
@@ -189,31 +196,76 @@ implements UimaASService {
}
return true;
}
- private UimaDefaultMessageListenerContainer createListener(Type type, int scaleout) throws Exception{
+ private UimaDefaultMessageListenerContainer createListener(Type type, int consumerCount) throws Exception{
if ( inputChannel == null ) {
withInputChannel();
}
if ( outputChannel == null ) {
withOutputChannel();
- outputChannel.setServerURI(getBrokerURL());
}
- ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
- if (controller.isPrimitive() && Type.ProcessCAS.equals(type)) {
+ PriorityMessageHandler h = null;
+
+ ThreadPoolTaskExecutor jmsListenerThreadExecutor =
+ new ThreadPoolTaskExecutor();
+
+
+ if ( Type.ProcessCAS.equals(type)) {
+ outputChannel.setServerURI(getBrokerURL());
+/*
// Create a Custom Thread Factory. Provide it with an instance of
// PrimitiveController so that every thread can call it to initialize
// the next available instance of a AE.
- ThreadFactory tf = new UimaAsThreadFactory().
- withThreadGroup(Thread.currentThread().getThreadGroup()).
- withPrimitiveController((PrimitiveAnalysisEngineController) controller).
- withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
- withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
- ((UimaAsThreadFactory)tf).setDaemon(true);
- // This ThreadExecutor will use custom thread factory instead of defult one
- ((ThreadPoolTaskExecutor) threadExecutor).setThreadFactory(tf);
- }
- threadExecutor.setCorePoolSize(scaleout);
- threadExecutor.setMaxPoolSize(scaleout);
+// ThreadFactory tf = new UimaAsThreadFactory().
+// withThreadGroup(Thread.currentThread().getThreadGroup()).
+// withPrimitiveController((PrimitiveAnalysisEngineController) controller).
+// withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
+// withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
+// ((UimaAsThreadFactory)tf).setDaemon(true);
+
+ */
+ if ( controller.isPrimitive() ) {
+ h = new PriorityMessageHandler(consumerCount);
+ ThreadPoolTaskExecutor threadExecutor =
+ new ThreadPoolTaskExecutor();
+ controller.setThreadFactory(threadExecutor);
+
+ latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount);
+ // Create a Custom Thread Factory. Provide it with an instance of
+ // PrimitiveController so that every thread can call it to initialize
+ // the next available instance of a AE.
+ ThreadFactory tf =
+ new UimaAsPriorityBasedThreadFactory(Thread.currentThread().
+ getThreadGroup(), controller, latchToCountNumberOfTerminatedThreads)
+ .withQueue(h.getQueue()).withChannel(controller.getInputChannel(ENDPOINT_TYPE.JMS));
+
+
+ ((UimaAsPriorityBasedThreadFactory)tf).setDaemon(true);
+ // This ThreadExecutor will use custom thread factory instead of default one
+ threadExecutor.setThreadFactory(tf);
+ threadExecutor.setCorePoolSize(consumerCount);
+ threadExecutor.setMaxPoolSize(consumerCount);
+
+ // Initialize the thread pool
+ threadExecutor.initialize();
+
+ // Make sure all threads are started. This forces each thread to call
+ // PrimitiveController to initialize the next instance of AE
+ threadExecutor.getThreadPoolExecutor().prestartAllCoreThreads();
+ // This ThreadExecutor will use custom thread factory instead of default one
+// threadExecutor.setThreadFactory(tf);
+
+
+ }
+
+ }
+ jmsListenerThreadExecutor.setCorePoolSize(consumerCount);
+ jmsListenerThreadExecutor.setMaxPoolSize(consumerCount);
+ jmsListenerThreadExecutor.initialize();
+
+
+// threadExecutor.setCorePoolSize(consumerCount);
+// threadExecutor.setMaxPoolSize(consumerCount);
// destination can be NULL if this listener is meant for a
// a temp queue. Such destinations are created on demand
@@ -232,13 +284,15 @@ implements UimaASService {
listenerBuilder.withController(controller)
.withType(type)
.withConectionFactory(factory)
- .withThreadPoolExecutor(threadExecutor)
- .withConsumerCount(scaleout)
+ .withThreadPoolExecutor(jmsListenerThreadExecutor)
+ .withConsumerCount(consumerCount)
.withInputChannel(inputChannel)
+ .withPriorityMessageHandler(h)
.withSelector(getSelector(type))
.withDestination(destination)
.build();
messageListener.setReceiveTimeout(500);
+// messageListener.setMessageListener(h);
return messageListener;
}
public HandlerBase getMessageHandler(AnalysisEngineController controller) {
@@ -261,14 +315,14 @@ implements UimaASService {
}
return metaHandler;
}
- public UimaASJmsService build(int scaleout) throws Exception {
+ public UimaASJmsService build(int consumerCount) throws Exception {
// First create Connection Factory. This is needed by
// JMS listeners.
createConnectionFactory();
// counts number of initialized threads
- latchToCountNumberOfInitedThreads = new CountDownLatch(scaleout);
+ latchToCountNumberOfInitedThreads = new CountDownLatch(consumerCount);
// counts number of terminated threads
- latchToCountNumberOfTerminatedThreads = new CountDownLatch(scaleout);
+ latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount);
// Add one instance of JmsOutputChannel
if ( controller.getOutputChannel(ENDPOINT_TYPE.JMS) == null ) {
withOutputChannel();
@@ -296,9 +350,69 @@ implements UimaASService {
// listener to handle process CAS requests
UimaDefaultMessageListenerContainer processListener
- = createListener(Type.ProcessCAS, scaleout);
+ = createListener(Type.ProcessCAS, consumerCount);
inputChannel.addListenerContainer(processListener);
+
+
+
+
+
+ String targetStringSelector = "";
+ if ( System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) != null ) {
+ targetStringSelector = System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty);
+ } else {
+ // the default selector is IP:PID
+ String ip = InetAddress.getLocalHost().getHostAddress();
+ targetStringSelector = ip+":"+controller.getPID();
+ }
+ UimaDefaultMessageListenerContainer targetedListener =
+ new UimaDefaultMessageListenerContainer();
+ targetedListener.setType(Type.Target);
+ // setup jms selector
+ if ( controller.isCasMultiplier()) {
+ targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+ } else {
+ targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)");
+ }
+
+ // use shared ConnectionFactory
+ targetedListener.setConnectionFactory(processListener.getConnectionFactory());
+ // mark the listener as a 'Targeted' listener
+ targetedListener.setTargetedListener();
+ targetedListener.setController(controller);
+ // there will only be one delivery thread. Its job will be to
+ // add a targeted message to a BlockingQueue. Such thread will block
+ // in an enqueue if a dequeue is not available. This will be prevent
+ // the overwhelming the service with messages.
+ ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+ threadExecutor.setCorePoolSize(1);
+ threadExecutor.setMaxPoolSize(1);
+ targetedListener.setTaskExecutor(threadExecutor);
+ targetedListener.setConcurrentConsumers(1);
+ if ( processListener.getMessageListener() instanceof PriorityMessageHandler ) {
+ // the targeted listener will use the same message handler as the
+ // Process listener. This handler will add a message wrapper
+ // to enable prioritizing messages.
+ targetedListener.setMessageListener(processListener.getMessageListener());
+ }
+ // Same queue as the Process queue
+ targetedListener.setDestination(processListener.getDestination());
+ //registerListener(targetedListener);
+ // targetedListener.afterPropertiesSet();
+ threadExecutor.initialize();
+
+ //targetedListener.initialize();
+ //targetedListener.start();
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(),
+ "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_TARGET_LISTENER__INFO",
+ new Object[] {targetedListener.getMessageSelector(), controller.getComponentName() });
+ }
+
+ inputChannel.addListenerContainer(targetedListener);
+
listeners.add(processListener);
// listener to handle GetMeta requests
UimaDefaultMessageListenerContainer getMetaListener
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java?rev=1831129&r1=1831128&r2=1831129&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsMessageListenerBuilder.java Mon May 7 21:27:43 2018
@@ -19,6 +19,7 @@
package org.apache.uima.adapter.jms.service.builder;
import javax.jms.Destination;
+import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
@@ -35,6 +36,7 @@ import org.apache.uima.aae.error.ErrorHa
import org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener;
import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.PriorityMessageHandler;
import org.apache.uima.adapter.jms.activemq.TempDestinationResolver;
import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
import org.apache.uima.as.client.Listener.Type;
@@ -53,6 +55,7 @@ public class JmsMessageListenerBuilder {
private ThreadPoolTaskExecutor threadExecutor=null;
private Type type;
private TempDestinationResolver tempQueueDestinationResolver = null;
+ private PriorityMessageHandler priorityHandler = null;
public static void main(String[] args) {
try {
@@ -168,7 +171,7 @@ public class JmsMessageListenerBuilder {
jmsGetMetaMessageListener.initialize();
jmsGetMetaMessageListener.start();
-
+// !!!!!!!!!!!!!! WHY replyListener not added to input channel like the two above?
replyListener.afterPropertiesSet();
replyListener.initialize();
replyListener.start();
@@ -227,6 +230,11 @@ public class JmsMessageListenerBuilder {
this.selector = selector;
return this;
}
+ public JmsMessageListenerBuilder withPriorityMessageHandler(PriorityMessageHandler priorityHandler ) {
+ this.priorityHandler = priorityHandler;
+ return this;
+ }
+
public JmsMessageListenerBuilder withDestination(Destination destination ) {
this.destination = destination;
return this;
@@ -258,7 +266,7 @@ public class JmsMessageListenerBuilder {
return (endpoint != null && endpoint.isRemote() && endpoint.isCasMultiplier() );
}
public UimaDefaultMessageListenerContainer build() throws Exception{
- UimaDefaultMessageListenerContainer listener =
+ UimaDefaultMessageListenerContainer listenerContainer =
new UimaDefaultMessageListenerContainer();
/*
*
@@ -267,17 +275,21 @@ public class JmsMessageListenerBuilder {
*/
// make sure all required properties are set
validate();
+ if ( type != null ) {
+ listenerContainer.setType(type);
+ }
+
if ( threadExecutor != null ) {
threadExecutor.setThreadNamePrefix(controller.getComponentName()+"-"+type.name()+"Listener-Thread");
- listener.setTaskExecutor(threadExecutor);
+ listenerContainer.setTaskExecutor(threadExecutor);
}
- listener.setConcurrentConsumers(consumerCount);
- listener.setController(controller);
+ listenerContainer.setConcurrentConsumers(consumerCount);
+ listenerContainer.setController(controller);
if ( selector != null ) {
- listener.setMessageSelector(selector);
+ listenerContainer.setMessageSelector(selector);
}
if (isRemoteCasMultiplier(endpoint) ) {
@@ -291,16 +303,24 @@ public class JmsMessageListenerBuilder {
// is ConcurrentMessageListener which imposes order of replies (parent last) before delegating
// msgs to the inputchannel. When stopping the service, all listeners must be registered with
// an inputchannel which is responsible for shutting down all listeners.
- ((JmsInputChannel)inputChannel).registerListener(listener);
- listener.setMessageListener(concurrentListener);
+ ((JmsInputChannel)inputChannel).registerListener(listenerContainer);
+ listenerContainer.setMessageListener(concurrentListener);
concurrentListener.setAnalysisEngineController(controller);
} else {
- ((JmsInputChannel)inputChannel).registerListener(listener);
- listener.setMessageListener(inputChannel);
+ ((JmsInputChannel)inputChannel).registerListener(listenerContainer);
+ // Message priority handler is an intermediary object between JMS Message Listener
+ // and an InputChannel. Its main role is to intercept messages and add them to
+ // the priority queue shared with an InputChannel. This is done to support processing
+ // of targeted messages ahead of regular priority (process) msgs.
+ if ( priorityHandler != null ) {
+ listenerContainer.setMessageListener(priorityHandler);
+ } else {
+ listenerContainer.setMessageListener(inputChannel);
+ }
}
- listener.setTargetEndpoint(endpoint);
- listener.setConnectionFactory(connectionFactory);
+ listenerContainer.setTargetEndpoint(endpoint);
+ listenerContainer.setConnectionFactory(connectionFactory);
// is this listener processing replies from a remote service. This can
// only be true if the controller is an aggregate. Primitive controller
// can only handle requests from remote services. An aggregate can send
@@ -309,25 +329,22 @@ public class JmsMessageListenerBuilder {
String e = Type.FreeCAS.equals(type) ? "FreeCASEndpoint" :endpoint.getDelegateKey();
TempDestinationResolver resolver = new
TempDestinationResolver(controller.getComponentName(), e);
- resolver.setListener(listener);
+ resolver.setListener(listenerContainer);
resolver.setConnectionFactory(connectionFactory);
- listener.setDestinationResolver(resolver);
- listener.setDestinationName("");
+ listenerContainer.setDestinationResolver(resolver);
+ listenerContainer.setDestinationName("");
if ( Type.FreeCAS.equals(type)) {
- listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For FreeCas Listener");
+ listenerContainer.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For FreeCas Listener");
} else {
- listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For Delegate:"+endpoint.getDelegateKey());
+ listenerContainer.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener For Delegate:"+endpoint.getDelegateKey());
}
} else if ( destination != null ) {
- listener.setDestinationName(((ActiveMQDestination)destination).getPhysicalName());
- listener.setDestination(destination);
- listener.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener");
+ listenerContainer.setDestinationName(((ActiveMQDestination)destination).getPhysicalName());
+ listenerContainer.setDestination(destination);
+ listenerContainer.setBeanName(controller.getComponentName()+"-"+type.name()+"Listener");
}
- if ( type != null ) {
- listener.setType(type);
- }
- return listener;
+ return listenerContainer;
}
}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java?rev=1831129&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/JmsProcessListener.java Mon May 7 21:27:43 2018
@@ -0,0 +1,36 @@
+package org.apache.uima.adapter.jms.service.builder;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
+
+public class JmsProcessListener {
+
+ AnalysisEngineController controller;
+
+ JmsProcessListener(AnalysisEngineController controller) {
+ this.controller = controller;
+ }
+ public void create() throws Exception {
+ JmsMessageListenerBuilder listenerBuilder =
+ new JmsMessageListenerBuilder();
+/*
+ UimaDefaultMessageListenerContainer messageListener =
+ listenerBuilder.withController(controller)
+ .withType(type)
+ .withConectionFactory(factory)
+ .withThreadPoolExecutor(threadExecutor)
+ .withConsumerCount(consumerCount)
+ .withInputChannel(inputChannel)
+ .withPriorityMessageHandler(h)
+ .withSelector(getSelector(type))
+ .withDestination(destination)
+ .build();
+ messageListener.setReceiveTimeout(500);
+ */
+ }
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+
+ }
+
+}