You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [2/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima...
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Mon Feb 26 18:54:11 2018
@@ -54,9 +54,11 @@ import org.apache.uima.aae.UimaAsPriorit
//import org.apache.uima.aae.UimaASCredentials;
import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
//import org.apache.uima.aae.UimaAsThreadFactory.UsedFor;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
@@ -68,6 +70,7 @@ import org.apache.uima.aae.message.Async
import org.apache.uima.aae.message.MessageWrapper;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry;
+import org.apache.uima.as.client.Listener;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Level;
import org.springframework.core.task.TaskExecutor;
@@ -80,12 +83,15 @@ import org.springframework.jms.support.d
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class UimaDefaultMessageListenerContainer extends DefaultMessageListenerContainer implements
- ExceptionListener {
+ Listener, ExceptionListener {
private static final Class<?> CLASS_NAME = UimaDefaultMessageListenerContainer.class;
public static final String PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR Command=2002)";
public static final String CM_PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR Command=2002 OR Command=2005)";
public static final String GETMETA_SELECTOR_SUFFIX = "(Command=2001)";
+ private Transport transport = Transport.JMS;
+ private Type type = Type.Unknown;
+
public static final int HIGH_PRIORITY = 9;
private String destinationName = "";
@@ -153,12 +159,23 @@ public class UimaDefaultMessageListenerC
UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
__listenerRef = this;
setRecoveryInterval(400); // increase connection recovery to 30 sec
- setAcceptMessagesWhileStopping(true);
+ //setAcceptMessagesWhileStopping(true);
setExceptionListener(this);
threadGroup = new ThreadGroup("ListenerThreadGroup_"
+ Thread.currentThread().getThreadGroup().getName());
}
-
+ public Transport getTransport() {
+ return transport;
+ }
+ public void setType(Type t) {
+ this.type = t;
+ }
+ public Type getType() {
+ return type;
+ }
+ protected void handleListenerException(Throwable t) {
+ t.printStackTrace();
+ }
public UimaDefaultMessageListenerContainer(boolean freeCasQueueListener) {
this();
this.freeCasQueueListener = freeCasQueueListener;
@@ -183,7 +200,38 @@ public class UimaDefaultMessageListenerC
*/
protected void refreshConnectionUntilSuccessful() {
// System.out.println("............refreshConnectionUntilSuccessful() called");
+ while (isRunning()) {
+ try {
+
+ if (sharedConnectionEnabled()) {
+ refreshSharedConnection();
+ } else {
+ Connection con = createConnection();
+ JmsUtils.closeConnection(con);
+ }
+ // Use UIMA-AS custom Destination Resolver to create a new temp queue for this reply listener
+ if( Type.Reply.equals(type) || Type.FreeCAS.equals(type)) {
+ getDestinationResolver().resolveDestinationName(getSharedConnection().createSession(false, Session.AUTO_ACKNOWLEDGE),"",false);
+ }
+ logger.info(getType().name()+" Listener Successfully refreshed JMS Connection to broker: "+getBrokerUrl()+" Endpoint:"+getDestination());
+ logListenerFailure = true;
+ break;
+ }
+ catch (Exception ex) {
+
+ if (ex instanceof JMSException) {
+ if (logListenerFailure && ex.getCause() instanceof ConnectException) {
+
+ logger.info(getType().name()+" Listener lost connection to broker: "+getBrokerUrl()+"- Retrying until successfull ...");
+ logListenerFailure = false;
+ } else {
+ invokeExceptionListener((JMSException) ex);
+ }
+ }
+ }
+ }
+ /*
boolean doLogFailureMsg = true;
try {
// Only one listener thread should enter to recover lost connection.
@@ -244,30 +292,11 @@ public class UimaDefaultMessageListenerC
}
}
- /*
- if ( getMessageListener() instanceof JmsInputChannel ) {
- System.out.println("------------------- Creating new listener for the temp queue");
- try {
- ((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory());
- System.out.println("------------------- New listener on temp queue is ready");
- } catch( Exception e) {
- System.out.println("------------------- Error while creating new listener on temp queue");
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
-
- }
- }
-*/
+
}
- /*
- String delegateKey = ((AggregateAnalysisEngineController) controller)
- .lookUpDelegateKey(endpoint.getEndpoint());
- */
+
//}
break;
}
@@ -293,6 +322,7 @@ public class UimaDefaultMessageListenerC
}
} catch( IllegalStateException e ) {
}
+ */
}
protected void recoverAfterListenerSetupFailure() {
if ( !terminating ) {
@@ -323,6 +353,7 @@ public class UimaDefaultMessageListenerC
/**
* Stops this Listener
*/
+ /*
private void handleListenerFailure() {
// If shutdown already, nothing to do
if (awaitingShutdown) {
@@ -363,12 +394,13 @@ public class UimaDefaultMessageListenerC
}
}
}
-
+*/
/**
* Handles failure on a temp queue
*
* @param t
*/
+ /*
private void handleTempQueueFailure(Throwable t) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
if ( controller != null ) {
@@ -463,7 +495,7 @@ public class UimaDefaultMessageListenerC
} else {
}
}
-
+*/
private ErrorHandler fetchGetMetaErrorHandler() {
ErrorHandler handler = null;
Iterator it = controller.getErrorHandlerChain().iterator();
@@ -483,6 +515,7 @@ public class UimaDefaultMessageListenerC
*
* @param t
*/
+ /*
private void handleQueueFailure(Throwable t) {
// System.out.println("............handleQueueFailure() called");
final String endpointName = (getDestination() == null) ? ""
@@ -598,7 +631,7 @@ public class UimaDefaultMessageListenerC
}
}
-
+*/
/**
* This method is called by Spring when a listener fails
*/
@@ -609,9 +642,10 @@ public class UimaDefaultMessageListenerC
}
// If shutdown already, nothing to do
// If controller is stopping no need to recover the connection
- if (awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
+ if (!super.isRunning() || awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
return;
}
+ /*
if ( controller != null ) {
controller.changeState(ServiceState.FAILED);
}
@@ -676,10 +710,12 @@ public class UimaDefaultMessageListenerC
}
failed = true;
}
+ */
}
public Endpoint getEndpoint() {
return endpoint;
}
+ /*
private void terminate(Throwable t) {
// ****************************************
// terminate the service
@@ -695,7 +731,7 @@ public class UimaDefaultMessageListenerC
controller.stop();
}
}
-
+
protected void handleListenerException(Throwable t) {
// System.out.println("............handleListenerException(Throwable t)");
@@ -703,17 +739,7 @@ public class UimaDefaultMessageListenerC
if (awaitingShutdown) {
return;
}
- /*
- String endpointName = (getDestination() == null) ? ""
- : ((ActiveMQDestination) getDestination()).getPhysicalName();
-
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleListenerException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_jms_listener_failed_WARNING",
- new Object[] { endpointName, getBrokerUrl(), t });
- }
- */
+
super.handleListenerException(t);
}
@@ -731,14 +757,16 @@ public class UimaDefaultMessageListenerC
super.setConnectionFactory(connectionFactory);
}
-
+*/
public boolean isGetMetaListener() {
-
+ return Type.GetMeta.equals(type);
+ /*
return getMessageSelector() != null
&& __listenerRef.getMessageSelector().endsWith(GETMETA_SELECTOR_SUFFIX);
// && __listenerRef.getMessageSelector().endsWith("(Command=2001)");
+ */
}
-
+ /*
private boolean isActiveMQDestination() {
return getDestination() != null && getDestination() instanceof ActiveMQDestination;
}
@@ -762,24 +790,26 @@ public class UimaDefaultMessageListenerC
}
}
}
-
+*/
/**
* Intercept Spring call to increment number of consumer threads. If the value > 1, don't
* propagate to Spring. A new listener will be injected and it will use provided number of
* consumer threads.
**/
+ /*
public void setConcurrentConsumers(int concurrentConsumers) {
cc = concurrentConsumers;
if (this.freeCasQueueListener) {
super.setConcurrentConsumers(concurrentConsumers);
}
}
-
+*/
/**
* Intercept Spring call to inject application Pojo listener. Don't propagate the listener up to
* Spring just yet. If more than one consumer thread is used, a different listener will be
* injected.
**/
+ /*
public void setMessageListener(Object messageListener) {
ml = messageListener;
if (this.freeCasQueueListener || targetedListener ) {
@@ -791,10 +821,12 @@ public class UimaDefaultMessageListenerC
public void afterPropertiesSet() {
afterPropertiesSet(true);
}
+ */
/**
* Called by Spring and some Uima AS components when all properties have been set. This method
* spins a thread in which the listener is initialized.
*/
+ /*
public void afterPropertiesSet(final boolean propagate) {
if (endpoint != null) {
// Override the prefetch size. The dd2spring always sets this to 1 which
@@ -938,12 +970,13 @@ public class UimaDefaultMessageListenerC
});
t.start();
}
-
+*/
/**
* Inject instance of this listener into the InputChannel
*
* @throws Exception
*/
+ /*
private void connectWithInputChannel() throws Exception {
Object pojoListener = getPojoListener();
@@ -961,7 +994,7 @@ public class UimaDefaultMessageListenerC
((ModifiableListener) pojoListener).setListener(__listenerRef);
}
}
-
+*/
public String getDestinationName() {
return destinationName;
@@ -975,7 +1008,9 @@ public class UimaDefaultMessageListenerC
}
public String getBrokerUrl() {
- return ((ActiveMQConnectionFactory) connectionFactory).getBrokerURL();
+ return ((ActiveMQConnectionFactory)super.getConnectionFactory()).getBrokerURL();
+
+// return ((ActiveMQConnectionFactory) connectionFactory).getBrokerURL();
}
/*
@@ -984,28 +1019,17 @@ public class UimaDefaultMessageListenerC
* deployment descriptor and create a new one with rewritten Broker URL. We will inject the
* prefetch policy to the new CF based on what is found in the CF in the deployment descriptor.
*/
-
+/*
public void setConnectionFactory(ConnectionFactory aConnectionFactory) {
connectionFactory = aConnectionFactory;
- /*
- if ( System.getProperty("uima.as.broker.credentials.file") != null ) {
- UimaASCredentials credentials = new UimaASCredentials();
- try {
- credentials.readCredentials(System.getProperty("uima.as.broker.credentials.file"));
- } catch( IOException e) {
- throw new RuntimeException(e);
- }
- ((ActiveMQConnectionFactory)connectionFactory).setUserName(credentials.getUsername());
- ((ActiveMQConnectionFactory)connectionFactory).setPassword(credentials.getPassword());
- }
- */
+
ConnectionFactoryIniter cfIniter =
new ConnectionFactoryIniter((ActiveMQConnectionFactory)connectionFactory);
cfIniter.whiteListPackages();
((ActiveMQConnectionFactory)connectionFactory).setTrustAllPackages(true);
super.setConnectionFactory(connectionFactory);
}
-
+*/
public void setDestinationResolver(DestinationResolver resolver) {
((TempDestinationResolver) resolver).setListener(this);
super.setDestinationResolver(resolver);
@@ -1057,8 +1081,17 @@ public class UimaDefaultMessageListenerC
public void setDestination(Destination aDestination) {
super.setDestination(aDestination);
+
+ System.out.println("............................. "+endpoint+" Destination:"+aDestination.toString());
+ if ( Type.FreeCAS.equals(type)) {
+ ((JmsOutputChannel)controller.getOutputChannel(ENDPOINT_TYPE.JMS)).setFreeCasQueue(aDestination);
+ }
if (endpoint != null) {
endpoint.setDestination(aDestination);
+ if (aDestination instanceof TemporaryQueue ) {
+ endpoint.setTempReplyDestination(true);
+ }
+ /*
// Get the prefetch size. If > 1, it has been previously overriden. The override is done in
// the code since dd2spring alwys sets the prefetch on a reply queue to 1. This may slow down
// a throughput of a service.
@@ -1079,10 +1112,28 @@ public class UimaDefaultMessageListenerC
((JmsInputChannel) pojoListener).setListenerContainer(this);
}
}
+ */
endpoint.setServerURI(getBrokerUrl());
}
}
+ public void start() {
+ if ( isRunning()) {
+ return;
+ }
+ int consumerThreadCount=-1;
+ if ( getTaskExecutor() instanceof ThreadPoolTaskExecutor) {
+ ((ThreadPoolTaskExecutor)getTaskExecutor()).initialize();
+ // if this listener is a handling Process requests, the prestartAllCoreThreads() below
+ // will force initialization of AEs if this is a primitive service.
+ ((ThreadPoolTaskExecutor)getTaskExecutor()).getThreadPoolExecutor().prestartAllCoreThreads();
+ }
+ super.afterPropertiesSet();
+ super.initialize();
+ super.start();
+
+ System.out.println(">>>>>>> Listener Service:"+controller.getComponentName()+" Broker URL:"+getBrokerUrl()+" Endpoint:"+__listenerRef.getEndpoint()+" ConsumerThreadCount:"+consumerThreadCount);
+ }
private Object getPojoListener() {
Object pojoListener = null;
if (ml != null) {
@@ -1098,6 +1149,7 @@ public class UimaDefaultMessageListenerC
}
public void onException(JMSException arg0) {
+ /*
if (awaitingShutdown) {
return;
}
@@ -1121,7 +1173,7 @@ public class UimaDefaultMessageListenerC
if ( getDestination() != null && ((ActiveMQDestination)getDestination()).isTemporary() ) {
handleTempQueueFailure(arg0);
}
-
+ */
}
public void setTargetEndpoint(Endpoint anEndpoint) {
@@ -1129,7 +1181,8 @@ public class UimaDefaultMessageListenerC
}
public boolean isFreeCasQueueListener() {
- return freeCasQueueListener;
+ return Type.FreeCAS.equals(type);
+ //return freeCasQueueListener;
}
protected void setModifiedTaskExecutor(TaskExecutor taskExecutor) {
@@ -1185,6 +1238,7 @@ public class UimaDefaultMessageListenerC
"UIMAJMS_exception__WARNING", t);
}
}
+ /*
public void shutdownTaskExecutor(ThreadPoolExecutor tpe, boolean stopImmediate) throws InterruptedException {
tpe.awaitTermination(50, TimeUnit.MILLISECONDS);
@@ -1203,10 +1257,12 @@ public class UimaDefaultMessageListenerC
public void destroy() {
destroy(true);
}
+ */
/**
* Spins a shutdown thread and stops Sprint and ActiveMQ threads.
*
*/
+ /*
public void destroy(final boolean stopImmediate) {
if (awaitingShutdown) {
@@ -1432,26 +1488,9 @@ public class UimaDefaultMessageListenerC
super.setTaskExecutor(es);
}
}
- /*
- else {
- UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
- tf.setDaemon(true);
- if ( isFreeCasQueueListener()) {
- tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
- } else if ( isGetMetaListener() ) {
- tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
- } else if ( getDestination() != null && getMessageSelector() != null ) {
- tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
- } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
- tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
- } else {
- throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
- }
-
- }
- */
+
}
-
+*/
private boolean isPrimitiveService() {
return controller != null && controller instanceof PrimitiveAnalysisEngineController &&
controller.getInputChannel() != null;
@@ -1462,6 +1501,7 @@ public class UimaDefaultMessageListenerC
*/
public void setTaskExecutor(TaskExecutor aTaskExecutor) {
taskExecutor = aTaskExecutor;
+ super.setTaskExecutor(aTaskExecutor);
}
public TaskExecutor getTaskExecutor() {
@@ -1476,6 +1516,7 @@ public class UimaDefaultMessageListenerC
*
* @throws Exception
*/
+ /*
private void initializeTaskExecutor(int consumers) throws Exception {
// TaskExecutor is only used with primitives
if (controller instanceof PrimitiveAnalysisEngineController) {
@@ -1493,10 +1534,18 @@ public class UimaDefaultMessageListenerC
threadPoolExecutor.prestartAllCoreThreads();
}
}
+ */
public void delegateStop() {
super.stop();
}
+ /*
public void stop() throws JmsException {
destroy();
}
+ */
+ @Override
+ public String getName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Mon Feb 26 18:54:11 2018
@@ -44,6 +44,7 @@ import org.apache.uima.adapter.jms.JmsCo
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.adapter.jms.message.PendingMessageImpl;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.util.Level;
@@ -284,11 +285,11 @@ public class ActiveMQMessageSender exten
// instead of a temp queue. Regular queues can be recovered in case of
// a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
// Code in JmsOutputChannel will add the selector if the service is a CM.
- if (pm.get(AsynchAEMessage.TargetingSelector) != null) {
- selector = (String)pm.get(AsynchAEMessage.TargetingSelector);
+ if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) {
+ selector = (String)pm.getPropertyAsString(AsynchAEMessage.TargetingSelector);
}
if ( selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS || pm.getMessageType() == AsynchAEMessage.Stop) ) {
- d = (Destination)pm.get(AsynchAEMessage.Destination);
+ d = (Destination)pm.getProperty(AsynchAEMessage.Destination);
} else {
d = jmsSession.createQueue(destinationName);
@@ -313,7 +314,7 @@ public class ActiveMQMessageSender exten
}
if (casProcessRequest) {
cacheEntry = (ClientRequest) engine.getCache().get(
- pm.get(AsynchAEMessage.CasReference));
+ pm.getPropertyAsString(AsynchAEMessage.CasReference));
if (cacheEntry != null) {
// CAS cas = cacheEntry.getCAS();
// enable logging
@@ -356,7 +357,7 @@ public class ActiveMQMessageSender exten
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_failed_cache_lookup__WARNING",
new Object[] {
- pm.get(AsynchAEMessage.CasReference),
+ pm.getPropertyAsString(AsynchAEMessage.CasReference),
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
.getIntProperty(AsynchAEMessage.Command)),
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
@@ -403,7 +404,7 @@ public class ActiveMQMessageSender exten
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_calling_onBeforeMessageSend__FINE",
new Object[] {
- pm.get(AsynchAEMessage.CasReference),
+ pm.getPropertyAsString(AsynchAEMessage.CasReference),
String.valueOf(cacheEntry.getCAS().hashCode())
});
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Mon Feb 26 18:54:11 2018
@@ -20,10 +20,15 @@
package org.apache.uima.adapter.jms.client;
import java.io.File;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -74,13 +79,24 @@ import org.apache.uima.aae.error.UimaASM
import org.apache.uima.aae.jmx.JmxManager;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.UIMAMessage;
+import org.apache.uima.aae.service.AsynchronousUimaASService;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.UimaAsServiceRegistry;
+import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.ConnectionFactoryIniter;
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.adapter.jms.message.PendingMessageImpl;
import org.apache.uima.adapter.jms.service.Dd2spring;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
+import org.apache.uima.as.client.DirectMessage;
+import org.apache.uima.as.deployer.ServiceDeployers;
+import org.apache.uima.as.deployer.ServiceDeployers.Protocol;
+import org.apache.uima.as.deployer.ServiceDeployers.Provider;
+import org.apache.uima.as.deployer.UimaAsServiceDeployer;
+import org.apache.uima.as.dispatcher.LocalDispatcher;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.impl.UimaVersion;
@@ -90,11 +106,15 @@ import org.apache.uima.resource.Resource
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceProcessException;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
import org.apache.uima.util.Level;
+import org.apache.xmlbeans.XmlDocumentProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.XMLReaderFactory;
public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineCommon_impl
implements UimaAsynchronousEngine, MessageListener, ControllerCallbackListener, ApplicationListener<ApplicationEvent>{
@@ -138,7 +158,12 @@ public class BaseUIMAAsynchronousEngine_
protected static Lock globalLock = new ReentrantLock();
- //private String serviceTargetSelector = null;
+ protected UimaASService service = null;
+
+ ExecutorService consumerService = null;
+ private int casPoolSize = 1;
+
+ private Thread dispatchThread;
protected volatile boolean stopped = false;
public BaseUIMAAsynchronousEngine_impl() {
@@ -148,7 +173,11 @@ public class BaseUIMAAsynchronousEngine_
" UIMA-AS Version " + UimaAsVersion.getVersionString());
}
-
+ protected void beforeProcessReply(String casReferenceId) {
+ if ( service != null ) {
+ service.removeFromCache(casReferenceId);
+ }
+ }
protected TextMessage createTextMessage() throws ResourceInitializationException {
return new ActiveMQTextMessage();
}
@@ -352,20 +381,30 @@ public class BaseUIMAAsynchronousEngine_
}
public void stop() {
try {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
- Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
- CLASS_NAME.getName(), "stop",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_stopping_as_client_INFO");
- }
- stopConnection();
+ if ( brokerURI != null && !brokerURI.equals("java")) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
+ Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
+ CLASS_NAME.getName(), "stop",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_stopping_as_client_INFO");
+ }
+ stopConnection();
+ }
+
super.doStop();
if (!running) {
return;
}
running = false;
+ if ( consumerService != null ) {
+ DirectMessage poisonPillMsg = new DirectMessage().withCommand(AsynchAEMessage.Stop);
+ ((AsynchronousUimaASService)service).getReplyQueue().put(poisonPillMsg);
+ // stop thread pool for local queue listener
+ // consumerService.shutdown();
+ consumerService.shutdown();
+ }
if (super.serviceDelegate != null) {
// Cancel all timers and purge lists
super.serviceDelegate.cleanup();
@@ -378,6 +417,9 @@ public class BaseUIMAAsynchronousEngine_
// stopConnection();
// Undeploy all containers
undeploy();
+ if ( dispatchThread != null ) {
+ dispatchThread.interrupt();
+ }
clientCache.clear();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
Level.INFO)) {
@@ -407,7 +449,7 @@ public class BaseUIMAAsynchronousEngine_
}
}
- protected void setCPCMessage(Message msg) throws Exception {
+ public void setCPCMessage(Message msg) throws Exception {
msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
@@ -419,6 +461,183 @@ public class BaseUIMAAsynchronousEngine_
((TextMessage) msg).setText("");
}
}
+ protected boolean isServiceRemote() {
+ return transport.equals(Transport.JMS);
+// return (service instanceof UimaASJmsService);
+// return service == null;
+ }
+ private void startLocalConsumer(Map anApplicationContext) {
+
+ consumerService = Executors.newFixedThreadPool(1);
+ consumerService.execute( new Runnable() {
+
+ public void run() {
+ try {
+ while( running ) {
+ DirectMessage message =
+ ((AsynchronousUimaASService)service).getReplyQueue().take();
+ if ( message.getAsInt(AsynchAEMessage.Command) == AsynchAEMessage.Stop) {
+ System.out.println("BaseUIMAAsynchronousEngine_impl.startLocalConsumer().run() - Direct Consumer Recv'd Stop Msg - Terminating");
+ return;
+ }
+ System.out.println("Client Direct Local Consumer() Recv'd Reply");
+ onMessage(message);
+ }
+
+ } catch( InterruptedException e) {
+ System.out.println("BaseUIMAAsynchronousEngine_impl.startLocalConsumer().run() - Stopped Direct Consumer");
+ return;
+
+ } catch( Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ });
+
+
+ }
+ private void initializeLocal(Map anApplicationContext) throws ResourceInitializationException {
+ if ( dispatchThread == null ) {
+ // make sure we are in the running state. The local consumer depends on it
+ running = true;
+
+ // start message consumer to handle replies
+ startLocalConsumer(anApplicationContext);
+ // start dispatcher in its own thread. It will fetch messages from a shared 'pendingMessageQueue'
+ LocalDispatcher dispatcher =
+ new LocalDispatcher(this, service, pendingMessageQueue);
+ dispatchThread = new Thread(dispatcher);
+ dispatchThread.start();
+ }
+
+ }
+ private void initializeJMS(Map anApplicationContext) throws ResourceInitializationException {
+ if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri)) {
+ throw new ResourceInitializationException();
+ }
+ if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ENDPOINT)) {
+ throw new ResourceInitializationException();
+ }
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.SERIALIZATION_STRATEGY)) {
+ final String serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SERIALIZATION_STRATEGY);
+ // change this to support compressed filitered as the default
+ setSerialFormat((serializationStrategy.equalsIgnoreCase("xmi")) ? SerialFormat.XMI : SerialFormat.BINARY);
+ clientSideJmxStats.setSerialization(getSerialFormat());
+ }
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.userName)) {
+ amqUser = (String) anApplicationContext
+ .get(UimaAsynchronousEngine.userName);
+ }
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.password)) {
+ amqPassword = (String) anApplicationContext
+ .get(UimaAsynchronousEngine.password);
+ }
+
+ brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
+ endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.ENDPOINT);
+
+ // Check if a placeholder is passed in instead of actual broker URL or endpoint.
+ // The placeholder has the syntax ${placeholderName} and may be imbedded in text.
+ // A system property with placeholderName must exist for successful placeholder resolution.
+ // Throws ResourceInitializationException if placeholder is not in the System properties.
+ brokerURI = replacePlaceholder(brokerURI);
+ endpoint = replacePlaceholder(endpoint);
+ // Check if sharedConnection exists. If not create a new one. The sharedConnection
+ // is static and shared by all instances of UIMA AS client in a jvm. The check
+ // is made in a critical section by first acquiring a global static semaphore to
+ // prevent a race condition.
+ try {
+ createSharedConnection(brokerURI);
+ running = true;
+ // This is done to give the broker enough time to 'finalize' creation of
+ // temp reply queue. It's been observed (on MAC OS only) that AMQ
+ // broker QueueSession.createTemporaryQueue() call is not synchronous. Meaning,
+ // return from createTemporaryQueue() does not guarantee immediate availability
+ // of the temp queue. It seems like this operation is asynchronous, causing:
+ // "InvalidDestinationException: Cannot publish to a deleted Destination..."
+ // on the service side when it tries to reply to the client.
+ wait(100);
+ } catch( InterruptedException e) {
+
+ } catch (Exception e) {
+ state = ClientState.FAILED;
+ notifyOnInitializationFailure(e);
+ throw new ResourceInitializationException(e);
+ }
+
+ }
+ private void fetchRequiredProperties(Map anApplicationContext, Properties performanceTuningSettings) throws ResourceInitializationException {
+ ResourceManager rm = null;
+ if (anApplicationContext.containsKey(Resource.PARAM_RESOURCE_MANAGER)) {
+ rm = (ResourceManager) anApplicationContext.get(Resource.PARAM_RESOURCE_MANAGER);
+ } else {
+ rm = UIMAFramework.newDefaultResourceManager();
+ }
+
+ if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE)) {
+ String cas_initial_heap_size = (String) anApplicationContext
+ .get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
+ performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
+ }
+ asynchManager = new AsynchAECasManager_impl(rm);
+
+
+ clientSideJmxStats.setEndpointName(endpoint);
+
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.CasPoolSize)) {
+ casPoolSize = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CasPoolSize))
+ .intValue();
+ clientSideJmxStats.setCasPoolSize(casPoolSize);
+ }
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) {
+ processTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.Timeout))
+ .intValue();
+ }
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.GetMetaTimeout)) {
+ metadataTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.GetMetaTimeout))
+ .intValue();
+ }
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.CpcTimeout)) {
+ cpcTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CpcTimeout))
+ .intValue();
+ }
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.ApplicationName)) {
+ applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
+ }
+
+ if (anApplicationContext.containsKey(UimaAsynchronousEngine.TimerPerCAS)) {
+ timerPerCAS = ((Boolean) anApplicationContext.get(UimaAsynchronousEngine.TimerPerCAS))
+ .booleanValue();
+ }
+
+ brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
+ endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.ENDPOINT);
+
+ // Check if a placeholder is passed in instead of actual broker URL or endpoint.
+ // The placeholder has the syntax ${placeholderName} and may be imbedded in text.
+ // A system property with placeholderName must exist for successful placeholder resolution.
+ // Throws ResourceInitializationException if placeholder is not in the System properties.
+ brokerURI = replacePlaceholder(brokerURI);
+ endpoint = replacePlaceholder(endpoint);
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
+ UIMAFramework.getLogger(CLASS_NAME)
+ .logrb(
+ Level.CONFIG,
+ CLASS_NAME.getName(),
+ "initialize",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_init_uimaee_client__CONFIG",
+ new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout,
+ cpcTimeout,timerPerCAS });
+ }
+
+}
protected void setFreeCasMessage(Message msg, String aCasReferenceId, String selector) throws Exception {
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
@@ -750,7 +969,27 @@ public class BaseUIMAAsynchronousEngine_
throw new ResourceInitializationException(new UIMA_IllegalStateException());
}
reset();
- Properties performanceTuningSettings = null;
+ Properties performanceTuningSettings = new Properties();
+ fetchRequiredProperties(anApplicationContext, performanceTuningSettings);
+ transport = (Transport)anApplicationContext.get(UimaAsynchronousEngine.ClientTransport);
+ if ( Transport.JMS.equals(transport)) {
+ initializeJMS(anApplicationContext);
+ } else if ( Transport.Java.equals(transport)) {
+ if ( service == null ) {
+ service = UimaAsServiceRegistry.getInstance().lookupByEndpoint(endpoint);
+ }
+ brokerURI = "java";
+ initializeLocal(anApplicationContext);
+
+ } else if ( transport == null ){
+ throw new IllegalArgumentException("Client Transport Not Specified - Add Transport.JMS or Transport.Java to ApplicationContext");
+ } else {
+ throw new IllegalArgumentException("Unsupported Client Transport Specified - "+transport.toString()+" Expected Transport.JMS or Transport.Java");
+ }
+// Properties performanceTuningSettings = new Properties();
+// fetchRequiredProperties(anApplicationContext, performanceTuningSettings);
+
+ //Properties performanceTuningSettings = null;
if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri)) {
throw new ResourceInitializationException();
@@ -847,33 +1086,7 @@ public class BaseUIMAAsynchronousEngine_
super.serviceDelegate.setCasProcessTimeout(processTimeout);
super.serviceDelegate.setGetMetaTimeout(metadataTimeout);
try {
- // Generate unique identifier
- String uuid = UUIDGenerator.generate();
- // JMX does not allow ':' in the ObjectName so replace these with underscore
- uuid = uuid.replaceAll(":", "_");
- uuid = uuid.replaceAll("-", "_");
- applicationName += "_" + uuid;
- jmxManager = new JmxManager("org.apache.uima");
- clientSideJmxStats.setApplicationName(applicationName);
- clientJmxObjectName = new ObjectName("org.apache.uima:name=" + applicationName);
- jmxManager.registerMBean(clientSideJmxStats, clientJmxObjectName);
-
- // Check if sharedConnection exists. If not create a new one. The sharedConnection
- // is static and shared by all instances of UIMA AS client in a jvm. The check
- // is made in a critical section by first acquiring a global static semaphore to
- // prevent a race condition.
- createSharedConnection(brokerURI);
- running = true;
- // This is done to give the broker enough time to 'finalize' creation of
- // temp reply queue. It's been observed (on MAC OS only) that AMQ
- // broker QueueSession.createTemporaryQueue() call is not synchronous. Meaning,
- // return from createTemporaryQueue() does not guarantee immediate availability
- // of the temp queue. It seems like this operation is asynchronous, causing:
- // "InvalidDestinationException: Cannot publish to a deleted Destination..."
- // on the service side when it tries to reply to the client.
- try {
- wait(100);
- } catch( InterruptedException e) {}
+ initJMX();
sendMetaRequest();
waitForMetadataReply();
if (abort || !running) {
@@ -902,7 +1115,8 @@ public class BaseUIMAAsynchronousEngine_
}
}
initialized = true;
- remoteService = true;
+ //remoteService = true;
+ remoteService = isServiceRemote();
// running = true;
for (int i = 0; listeners != null && i < listeners.size(); i++) {
@@ -929,7 +1143,71 @@ public class BaseUIMAAsynchronousEngine_
super.acquireCpcReadySemaphore();
state = ClientState.RUNNING;
}
-
+ private AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
+ org.apache.xmlbeans.XmlOptions options = new org.apache.xmlbeans.XmlOptions();
+
+
+ XMLReader xmlReader = XMLReaderFactory.createXMLReader();
+ xmlReader.setFeature("http://xml.org/sax/features/external-general-entities", false);
+ xmlReader.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
+ xmlReader.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd",false);
+ xmlReader.setFeature("http://apache.org/xml/features/disallow-doctype-decl",true);
+ options.setLoadUseXMLReader(xmlReader);
+
+ return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath), options);
+// return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath));
+ }
+
+ private Provider provider( AnalysisEngineDeploymentDescriptionDocument dd) {
+ String provider =
+ dd.getAnalysisEngineDeploymentDescription().getDeployment().getProvider();
+
+ provider = UimaAsDirectServiceBuilder.resolvePlaceholder(provider);
+ System.out.println("...... provider() - "+provider);
+
+ if (Provider.JAVA.get().equals(provider)) {
+ return Provider.JAVA;
+ } else if (Provider.ACTIVEMQ.get().equals(provider)) {
+ return Provider.ACTIVEMQ;
+ } else {
+ throw new RuntimeException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
+ }
+
+ /*
+ if (Provider.JAVA.get().equals(provider)) {
+ return Provider.JAVA;
+ } else if (Provider.ACTIVEMQ.get().equals(provider)) {
+ return Provider.ACTIVEMQ;
+ } else {
+ throw new RuntimeException("Invalid provider attribute value in Deployment Descriptor :{"+provider+"} please check <deployment> element. Expected \"java\" or \"activemq\"");
+ }
+ */
+ }
+ private Protocol protocol( AnalysisEngineDeploymentDescriptionDocument dd) {
+ String protocol =
+ dd.getAnalysisEngineDeploymentDescription().getDeployment().getProtocol();
+
+ protocol = UimaAsDirectServiceBuilder.resolvePlaceholder(protocol);
+
+ System.out.println("...... protocol() - "+protocol);
+ if (Protocol.JAVA.get().equalsIgnoreCase(protocol) ) {
+ return Protocol.JAVA;
+ } else if (Protocol.JMS.get().equalsIgnoreCase(protocol) ) {
+ return Protocol.JMS;
+ } else {
+ throw new RuntimeException("Invalid protocol attribute value in Deployment Descriptor :{"+protocol+"} please check <deployment> element. Expected \"java\" or \"jms\"");
+ }
+
+ /*
+ if (Protocol.JAVA.get().equals(protocol)) {
+ return Protocol.JAVA;
+ } else if (Protocol.JMS.get().equals(protocol)) {
+ return Protocol.JMS;
+ } else {
+ throw new RuntimeException("Invalid protocol attribute value in Deployment Descriptor :{"+protocol+"} please check <deployment> element. Expected \"java\" or \"jms\"");
+ }
+ */
+ }
/**
* First generates a Spring context from a given deploy descriptor and than deploys the context
* into a Spring Container.
@@ -942,34 +1220,29 @@ public class BaseUIMAAsynchronousEngine_
* @return - a unique spring container id
*
*/
- public String deploy(String aDeploymentDescriptor, Map anApplicationContext) throws Exception {
- String springContext = null;
- try {
- springContext = generateSpringContext(aDeploymentDescriptor, anApplicationContext);
-
- SpringContainerDeployer springDeployer = new SpringContainerDeployer(springContainerRegistry, this);
+ public String deploy(String deploymentDescriptorPath, Map anApplicationContext) throws Exception {
+ AnalysisEngineDeploymentDescriptionDocument dd =
+ parseDD(deploymentDescriptorPath);
+
+ XmlDocumentProperties dp = dd.documentProperties();
+ System.out.println(dp.getSourceName());
+
+ // Use factory to create deployer instance for a given protocol and provider
+ UimaAsServiceDeployer deployer =
+ ServiceDeployers.newDeployer(protocol(dd), provider(dd));
+
+ service = deployer.deploy(dd, anApplicationContext);
+
+ UimaAsServiceRegistry.getInstance().register(service);
- String id = springDeployer.deploy(springContext);
- if ( springDeployer.isInitialized() ) {
- springDeployer.startListeners();
- }
- return id;
- } catch (Exception e) {
- running = true;
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
+ return service.getId();
- throw e;
- } finally {
- String uimaAsDebug = (String) anApplicationContext.get(UimaAsynchronousEngine.UimaEeDebug);
- if ( springContext != null && (null == uimaAsDebug || uimaAsDebug.equals("") ) ) {
- disposeContextFiles(springContext);
- }
- }
}
+
+ protected UimaASService getServiceReference() {
+ return service;
+ }
+
private void disposeContextFiles(String ...contextFiles) {
for( String contextFile: contextFiles) {
File file = new File(contextFile);
@@ -983,6 +1256,7 @@ public class BaseUIMAAsynchronousEngine_
*/
public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
throws Exception {
+ /*
if (aDeploymentDescriptorList == null) {
throw new ResourceConfigurationException(UIMA_IllegalArgumentException.ILLEGAL_ARGUMENT,
new Object[] { "Null", "DeploymentDescriptorList", "deploy()" });
@@ -1018,11 +1292,29 @@ public class BaseUIMAAsynchronousEngine_
// disposeContextFiles(springContextFiles);
}
-
+*/
+ return "";
}
public void undeploy() throws Exception {
- Iterator containerIterator = springContainerRegistry.keySet().iterator();
+ Iterator<Entry<String, List<UimaASService>>> iterator =
+ UimaAsServiceRegistry.getInstance().getServiceList().entrySet().iterator();
+ // Need a separate list to hold service ids to prevent ConcurrentModificationException
+ List<String> serviceIdList = new ArrayList<String>();
+ while( iterator.hasNext() ) {
+ Iterator<UimaASService> listIterator = iterator.next().getValue().iterator();
+ while( listIterator.hasNext()) {
+ UimaASService service = listIterator.next();
+ serviceIdList.add(service.getId());
+ }
+ }
+ // Now undeploy all services
+ for( String serviceId : serviceIdList ) {
+ undeploy(serviceId);
+ }
+ /*
+
+ Iterator containerIterator = springContainerRegistry.keySet().iterator();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg ="undeploying "+springContainerRegistry.size()+" Containers";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
@@ -1039,6 +1331,7 @@ public class BaseUIMAAsynchronousEngine_
}
undeploy(containerId);
}
+ */
}
public void undeploy(String aSpringContainerId) throws Exception {
@@ -1055,18 +1348,41 @@ public class BaseUIMAAsynchronousEngine_
return;
}
- UimaEEAdminSpringContext adminContext = null;
- if (!springContainerRegistry.containsKey(aSpringContainerId)) {
- return;
- // throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId +
- // ". Unable to undeploy the Spring container");
- }
- // Fetch an administrative context which contains a Spring Container
- adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
- if (adminContext == null) {
+// UimaEEAdminSpringContext adminContext = null;
+ final UimaASService deployedService =
+ UimaAsServiceRegistry.getInstance().lookupById(aSpringContainerId);
+ if ( deployedService == null ) {
throw new InvalidContainerException(
"Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
}
+ switch (stop_level) {
+ case SpringContainerDeployer.QUIESCE_AND_STOP:
+ //((AnalysisEngineController) ctrer).quiesceAndStop();
+ //service.stop();
+ deployedService.quiesce();
+
+ break;
+ case SpringContainerDeployer.STOP_NOW:
+ // ((AnalysisEngineController) ctrer).terminate();
+ //service.stop();
+ deployedService.stop();
+
+
+ break;
+ }
+ UimaAsServiceRegistry.getInstance().unregister(deployedService);
+ /*
+// if (!springContainerRegistry.containsKey(aSpringContainerId)) {
+// return;
+// // throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId +
+// // ". Unable to undeploy the Spring container");
+// }
+// // Fetch an administrative context which contains a Spring Container
+// adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
+// if (adminContext == null) {
+// throw new InvalidContainerException(
+// "Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
+// }
// Fetch instance of the Container from its context
ApplicationContext ctx = adminContext.getSpringContainer();
// Query the container for objects that implement
@@ -1144,8 +1460,21 @@ public class BaseUIMAAsynchronousEngine_
}
// Remove the container from a local registry
springContainerRegistry.remove(aSpringContainerId);
+ */
}
+ private void initJMX() throws Exception {
+ // Generate unique identifier
+ String uuid = UUIDGenerator.generate();
+ // JMX does not allow ':' in the ObjectName so replace these with underscore
+ uuid = uuid.replaceAll(":", "_");
+ uuid = uuid.replaceAll("-", "_");
+ applicationName += "_" + uuid;
+ jmxManager = new JmxManager("org.apache.uima");
+ clientSideJmxStats.setApplicationName(applicationName);
+ clientJmxObjectName = new ObjectName("org.apache.uima:name=" + applicationName);
+ jmxManager.registerMBean(clientSideJmxStats, clientJmxObjectName);
+ }
/**
* Use dd2spring to generate Spring context file from a given deployment descriptor file.
*
@@ -1340,9 +1669,56 @@ public class BaseUIMAAsynchronousEngine_
// private void stopProducingCases(ClientRequest clientCachedRequest) {
private void stopProducingCases(String casReferenceId, Destination cmFreeCasQueue) {
- PendingMessage msg = new PendingMessage(AsynchAEMessage.Stop);
- msg.put(AsynchAEMessage.Destination, cmFreeCasQueue);
- msg.put(AsynchAEMessage.CasReference, casReferenceId);
+ try {
+// if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
+ if (cmFreeCasQueue != null) {
+ TextMessage msg = createTextMessage();
+ msg.setText("");
+ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+// msg.setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest.getCasReferenceId());
+ msg.setStringProperty(AsynchAEMessage.CasReference, casReferenceId);
+ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ try {
+ MessageProducer msgProducer = getMessageProducer(cmFreeCasQueue);
+ if (msgProducer != null) {
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue});
+ }
+ // Send STOP message to Cas Multiplier Service
+ msgProducer.send(msg);
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
+ }
+ }
+
+ } catch (Exception ex) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING",
+ ex);
+ }
+ }
+ }
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
+ }
+ }
+ /*
+ PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.Stop);
+ msg.addProperty(AsynchAEMessage.Destination, cmFreeCasQueue);
+ msg.addProperty(AsynchAEMessage.CasReference, casReferenceId);
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -1365,64 +1741,17 @@ public class BaseUIMAAsynchronousEngine_
}
}
- /*
- try {
-// if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
- if (cmFreeCasQueue != null) {
- TextMessage msg = createTextMessage();
- msg.setText("");
- msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
-// msg.setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest.getCasReferenceId());
- msg.setStringProperty(AsynchAEMessage.CasReference, casReferenceId);
- msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
- msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
- try {
- MessageProducer msgProducer = getMessageProducer(cmFreeCasQueue);
- if (msgProducer != null) {
-
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue});
- }
- // Send STOP message to Cas Multiplier Service
- msgProducer.send(msg);
- } else {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
- }
- }
-
- } catch (Exception ex) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING",
- ex);
- }
- }
- }
- } catch (Exception e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
- }
- */
+ */
}
protected void dispatchFreeCasRequest(String casReferenceId, Message message) throws Exception {
- PendingMessage msg = new PendingMessage(AsynchAEMessage.ReleaseCAS);
+ PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.ReleaseCAS);
// if ( message.getStringProperty(AsynchAEMessage.TargetingSelector) != null ) {
// msg.put(AsynchAEMessage.TargetingSelector,message.getStringProperty(AsynchAEMessage.TargetingSelector) );
// } else {
// msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo());
// }
- msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo());
- msg.put(AsynchAEMessage.CasReference, casReferenceId);
+ msg.addProperty(AsynchAEMessage.Destination, message.getJMSReplyTo());
+ msg.addProperty(AsynchAEMessage.CasReference, casReferenceId);
sender.dispatchMessage(msg, this, false);
}
protected MessageSender getDispatcher() {
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java Mon Feb 26 18:54:11 2018
@@ -23,6 +23,10 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InvalidClassException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UimaASApplicationExitEvent;
@@ -31,8 +35,11 @@ import org.apache.uima.aae.controller.An
import org.apache.uima.aae.jmx.monitor.BasicUimaJmxMonitorListener;
import org.apache.uima.aae.jmx.monitor.JmxMonitor;
import org.apache.uima.aae.jmx.monitor.JmxMonitorListener;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.UimaAsServiceRegistry;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Level;
import org.springframework.context.ApplicationEvent;
@@ -74,6 +81,7 @@ public class UIMA_Service implements App
// allow multiple args for one key
deploymentDescriptors = getMultipleArg2("-dd", args);
}
+ /*
String saxonURL = getArg("-saxonURL", args);
String xslTransform = getArg("-xslt", args);
String uimaAsDebug = getArg("-uimaEeDebug", args);
@@ -84,6 +92,7 @@ public class UIMA_Service implements App
printUsageMessage();
return null;
}
+ */
String brokerURL = getArg("-brokerURL", args);
// Check if broker URL is specified on the command line. If it is not, use the default
// localhost:61616. In either case, set the System property defaultBrokerURL. It will be used
@@ -99,6 +108,7 @@ public class UIMA_Service implements App
System.out.println(">>> Setting Inactivity Timeout To: "
+ System.getProperty(JmsConstants.SessionTimeoutOverride));
}
+ /*
if (deploymentDescriptors.length == 0) {
// array of context files passed in
springConfigFileArray = args;
@@ -149,8 +159,10 @@ public class UIMA_Service implements App
}
}
}
+
return springConfigFileArray;
-
+ */
+ return deploymentDescriptors;
}
public SpringContainerDeployer deploy(String[] springContextFiles, ApplicationListener<ApplicationEvent> listener) throws Exception {
SpringContainerDeployer springDeployer;
@@ -415,6 +427,7 @@ public class UIMA_Service implements App
public static void main(String[] args) {
try {
UIMA_Service service = new UIMA_Service();
+ /*
// parse command args and run dd2spring to generate spring context
// files from deployment descriptors
String contextFiles[] = service.initialize(args);
@@ -422,6 +435,9 @@ public class UIMA_Service implements App
if (contextFiles == null) {
return;
}
+ */
+ String dd[] = service.initialize(args);
+ /*
// Deploy components defined in Spring context files. This method blocks until
// the container is fully initialized and all UIMA-AS components are succefully
// deployed.
@@ -430,6 +446,7 @@ public class UIMA_Service implements App
System.out.println(">>> Failed to Deploy UIMA Service. Check Logs for Details");
System.exit(0);
}
+
// remove temporary spring context files generated from DD
for( String contextFile: contextFiles) {
File file = new File(contextFile);
@@ -437,8 +454,16 @@ public class UIMA_Service implements App
file.delete();
}
}
+ */
+ BaseUIMAAsynchronousEngine_impl engine =
+ new BaseUIMAAsynchronousEngine_impl();
+
+ for( String deploymentDescriptorPath : dd ) {
+ engine.deploy(deploymentDescriptorPath, new HashMap<>());
+ }
// Add a shutdown hook to catch kill signal and to force quiesce and stop
- ServiceShutdownHook shutdownHook = new ServiceShutdownHook(serviceDeployer);
+ ServiceShutdownHook shutdownHook = new ServiceShutdownHook(engine);
+// ServiceShutdownHook shutdownHook = new ServiceShutdownHook(serviceDeployer);
Runtime.getRuntime().addShutdownHook(shutdownHook);
// Check if we should start an optional JMX-based monitor that will provide service metrics
// The monitor is enabled by existence of -Duima.jmx.monitor.interval=<number> parameter. By
@@ -451,21 +476,25 @@ public class UIMA_Service implements App
service.startMonitor(Long.parseLong(monitorCheckpointFrequency));
}
- AnalysisEngineController topLevelControllor = serviceDeployer.getTopLevelController();
+// AnalysisEngineController topLevelControllor = serviceDeployer.getTopLevelController();
String prompt = "Press 'q'+'Enter' to quiesce and stop the service or 's'+'Enter' to stop it now.\nNote: selected option is not echoed on the console.";
- if (topLevelControllor != null) {
+ // if (topLevelControllor != null) {
System.out.println(prompt);
// Loop forever or until the service is stopped
- while (!topLevelControllor.isStopped()) {
+ while ( engine.isRunning() ) {
+// while (!topLevelControllor.isStopped()) {
+
if (System.in.available() > 0) {
int c = System.in.read();
if (c == 's') {
service.stopMonitor();
- serviceDeployer.undeploy(SpringContainerDeployer.STOP_NOW);
+ engine.undeploy();
+ //serviceDeployer.undeploy(SpringContainerDeployer.STOP_NOW);
System.exit(0);
} else if (c == 'q') {
- service.stopMonitor();
- serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+ engine.undeploy();
+ //service.stopMonitor();
+ // serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
System.exit(0);
} else if (Character.isLetter(c) || Character.isDigit(c)) {
@@ -473,13 +502,13 @@ public class UIMA_Service implements App
}
}
// This is a polling loop. Sleep for 1 sec
- try {
- if (!topLevelControllor.isStopped())
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- }
+// try {
+// if (!topLevelControllor.isStopped())
+// Thread.sleep(1000);
+// } catch (InterruptedException ex) {
+// }
} // while
- }
+ //}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
@@ -491,21 +520,28 @@ public class UIMA_Service implements App
static class ServiceShutdownHook extends Thread {
public SpringContainerDeployer serviceDeployer;
-
+ BaseUIMAAsynchronousEngine_impl engine;
+
public ServiceShutdownHook(SpringContainerDeployer serviceDeployer) {
this.serviceDeployer = serviceDeployer;
}
-
+ public ServiceShutdownHook(BaseUIMAAsynchronousEngine_impl engine) {
+ this.engine = engine;
+ }
public void run() {
try {
- AnalysisEngineController topLevelController = serviceDeployer.getTopLevelController();
- if (topLevelController != null && !topLevelController.isStopped() ) {
+
+ //AnalysisEngineController topLevelController = serviceDeployer.getTopLevelController();
+ //if (topLevelController != null && !topLevelController.isStopped() ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_caught_signal__INFO", new Object[] { topLevelController.getComponentName() });
- serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+ "UIMAJMS_caught_signal__INFO", new Object[] { "TopLevelService" });
+// "UIMAJMS_caught_signal__INFO", new Object[] { topLevelController.getComponentName() });
+ // serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+ engine.undeploy();
+
Runtime.getRuntime().halt(0);
- }
+ //}
} catch( Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.uima.aae.AsynchAECasManager_impl;
+import org.apache.uima.aae.InProcessCache;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.UimaAsThreadFactory;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.handler.HandlerBase;
+import org.apache.uima.aae.handler.input.MetadataRequestHandler_impl;
+import org.apache.uima.aae.handler.input.MetadataResponseHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessRequestHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessResponseHandler;
+import org.apache.uima.aae.service.AbstractUimaASService;
+import org.apache.uima.aae.service.ScaleoutSpecification;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
+import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
+import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer;
+import org.apache.uima.adapter.jms.service.builder.ActiveMQFactory;
+import org.apache.uima.adapter.jms.service.builder.JmsMessageListenerBuilder;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.resource.ResourceManager;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+public class UimaASJmsService extends AbstractUimaASService
+implements UimaASService {
+ ActiveMQConnectionFactory factory = null;
+ private String brokerURL;
+ private String queueName;
+
+ private JmsOutputChannel outputChannel;
+ private JmsInputChannel inputChannel;
+ private CountDownLatch latchToCountNumberOfTerminatedThreads;
+ private CountDownLatch latchToCountNumberOfInitedThreads;
+
+ ErrorHandlerChain errorHandlerChain;
+ private List<UimaDefaultMessageListenerContainer> listeners =
+ new ArrayList<>();
+
+ public static void main(String[] args) {
+ try {
+
+ String queueName = "PersonTitleAnnotatorQueue";
+ String analysisEngineDescriptor = "C:/uima/releases/testing/uima/uima-as/2.9.0/target/uima-as-2.9.1-SNAPSHOT-bin/apache-uima-as-2.9.1-SNAPSHOT/examples/descriptors/analysis_engine/PersonTitleAnnotator.xml";
+ String brokerURL = "tcp://localhost:61616";
+ UimaASJmsService service =
+ new UimaASJmsService();
+
+
+ ScaleoutSpecification spec =
+ new ScaleoutSpecification();
+ spec.withProcessScaleout(4).withGetMetaScaleout(1).withFreeCasScaleout(1);
+
+ ErrorHandlerChain errorHandlerChain = null;
+
+ InProcessCache inProcessCache = new InProcessCache();
+
+ ResourceManager resourceManager =
+ UimaClassFactory.produceResourceManager();
+
+ AsynchAECasManager_impl casManager =
+ new AsynchAECasManager_impl(resourceManager);
+ casManager.setCasPoolSize(4);
+
+ PrimitiveAnalysisEngineController_impl controller =
+ new PrimitiveAnalysisEngineController_impl(null, queueName, analysisEngineDescriptor, casManager, inProcessCache, 1, 4);
+
+ controller.setErrorHandlerChain(errorHandlerChain);
+
+
+ service.withConttroller(controller)
+ .withErrorHandlerChain(errorHandlerChain)
+ .withBrokerURL(brokerURL)
+ .withInputChannel()
+ .withInputQueue(queueName)
+ .withOutputChannel()
+ .build(4);
+
+ service.start();
+
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ public UimaASJmsService withConttroller(AnalysisEngineController controller) {
+ this.controller = controller;
+ return this;
+ }
+
+ public UimaASJmsService withInProcessCache(InProcessCache cache) {
+ this.inProcessCache = cache;
+ return this;
+ }
+ public UimaASJmsService withResourceSpecifier(ResourceSpecifier resourceSpecifier) {
+ this.resourceSpecifier = resourceSpecifier;
+ return this;
+ }
+
+ public UimaASJmsService withBrokerURL(String brokerURL) {
+ this.brokerURL = brokerURL;
+ return this;
+ }
+ public UimaASJmsService withName(String name) {
+ this.name = name;
+ return this;
+ }
+ public UimaASJmsService withInputQueue(String queueName) {
+ this.queueName = queueName;
+ return this;
+ }
+ public UimaASJmsService withErrorHandlerChain(ErrorHandlerChain errorHandlerChain) {
+ this.errorHandlerChain = errorHandlerChain;
+ return this;
+ }
+
+ private UimaASJmsService withInputChannel(){
+ inputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY);
+ return this;
+ }
+ private UimaASJmsService withOutputChannel() {
+ outputChannel = new JmsOutputChannel();
+ outputChannel.setController(controller);
+
+ return this;
+ }
+ private void createConnectionFactory() {
+ if ( factory == null ) {
+ factory = ActiveMQFactory.newConnectionFactory(brokerURL, 0);
+ factory.setTrustAllPackages(true);
+ }
+ }
+ public String getBrokerURL() {
+ return brokerURL;
+ }
+ private String getSelector(Type type) {
+ String selector = null;
+ switch(type) {
+ case ProcessCAS:
+ selector = "Command=2000 OR Command=2002";
+ break;
+
+ case GetMeta:
+ selector = "Command=2001";
+ break;
+
+ case FreeCAS:
+ case Unknown:
+ case Reply:
+ break;
+ }
+ return selector; // OK to return NULL. This means no selector will be used
+ }
+ private boolean isTempQueueListener(Type type) {
+ if ( Type.ProcessCAS.equals(type) || Type.GetMeta.equals(type)) {
+ return false;
+ }
+ return true;
+ }
+ private UimaDefaultMessageListenerContainer createListener(Type type, int scaleout) throws Exception{
+ if ( inputChannel == null ) {
+ withInputChannel();
+ }
+ if ( outputChannel == null ) {
+ withOutputChannel();
+ outputChannel.setServerURI(getBrokerURL());
+ }
+ ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor();
+ if (controller.isPrimitive() && Type.ProcessCAS.equals(type)) {
+
+ // Create a Custom Thread Factory. Provide it with an instance of
+ // PrimitiveController so that every thread can call it to initialize
+ // the next available instance of a AE.
+ ThreadFactory tf = new UimaAsThreadFactory().
+ withThreadGroup(Thread.currentThread().getThreadGroup()).
+ withPrimitiveController((PrimitiveAnalysisEngineController) controller).
+ withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
+ withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
+ ((UimaAsThreadFactory)tf).setDaemon(true);
+ // This ThreadExecutor will use custom thread factory instead of defult one
+ ((ThreadPoolTaskExecutor) threadExecutor).setThreadFactory(tf);
+ }
+ threadExecutor.setCorePoolSize(scaleout);
+ threadExecutor.setMaxPoolSize(scaleout);
+
+ // destination can be NULL if this listener is meant for a
+ // a temp queue. Such destinations are created on demand
+ // using destination resolver which is plugged into the
+ // listener. The resolver creates a temp queue lazily on
+ // listener startup.
+ ActiveMQDestination destination = null;
+
+ if ( !isTempQueueListener(type) ) {
+ destination = new ActiveMQQueue(queueName);
+ }
+ JmsMessageListenerBuilder listenerBuilder =
+ new JmsMessageListenerBuilder();
+
+ UimaDefaultMessageListenerContainer messageListener =
+ listenerBuilder.withController(controller)
+ .withType(type)
+ .withConectionFactory(factory)
+ .withThreadPoolExecutor(threadExecutor)
+ .withConsumerCount(scaleout)
+ .withInputChannel(inputChannel)
+ .withSelector(getSelector(type))
+ .withDestination(destination)
+ .build();
+ messageListener.setReceiveTimeout(500);
+ return messageListener;
+ }
+ public HandlerBase getMessageHandler(AnalysisEngineController controller) {
+ MetadataRequestHandler_impl metaHandler = new MetadataRequestHandler_impl("MetadataRequestHandler");
+ metaHandler.setController(controller);
+ ProcessRequestHandler_impl processHandler = new ProcessRequestHandler_impl("ProcessRequestHandler");
+ processHandler.setController(controller);
+ metaHandler.setDelegate(processHandler);
+ if ( !controller.isPrimitive() ) {
+ MetadataResponseHandler_impl metaResponseHandler =
+ new MetadataResponseHandler_impl("MetadataResponseHandler");
+ metaResponseHandler.setController(controller);
+ processHandler.setDelegate(metaResponseHandler);
+
+ ProcessResponseHandler processResponseHandler =
+ new ProcessResponseHandler("ProcessResponseHandler");
+ processResponseHandler.setController(controller);
+ metaResponseHandler.setDelegate(processResponseHandler);
+
+ }
+ return metaHandler;
+ }
+ public UimaASJmsService build(int scaleout) throws Exception {
+ // First create Connection Factory. This is needed by
+ // JMS listeners.
+ createConnectionFactory();
+ // counts number of initialized threads
+ latchToCountNumberOfInitedThreads = new CountDownLatch(scaleout);
+ // counts number of terminated threads
+ latchToCountNumberOfTerminatedThreads = new CountDownLatch(scaleout);
+ // Add one instance of JmsOutputChannel
+ if ( controller.getOutputChannel(ENDPOINT_TYPE.JMS) == null ) {
+ withOutputChannel();
+ outputChannel.setServerURI(brokerURL);
+ outputChannel.setServiceInputEndpoint(queueName);
+ controller.addOutputChannel(outputChannel);
+ } else {
+ outputChannel = (JmsOutputChannel)controller.getOutputChannel(ENDPOINT_TYPE.JMS);
+ outputChannel.setServiceInputEndpoint(queueName);
+ }
+ // Add one instance of JmsInputChannel
+ if ( controller.getInputChannel(ENDPOINT_TYPE.JMS) == null ) {
+ withInputChannel(); // one input channel instance
+ controller.setInputChannel(inputChannel);
+ } else {
+ inputChannel = (JmsInputChannel)controller.getInputChannel(ENDPOINT_TYPE.JMS);
+ }
+
+ inputChannel.setController(controller);
+
+ inputChannel.setMessageHandler(getMessageHandler(controller));
+
+ // Create service JMS listeners to handle Process, GetMeta and optional FreeCas
+ // requests.
+
+ // listener to handle process CAS requests
+ UimaDefaultMessageListenerContainer processListener
+ = createListener(Type.ProcessCAS, scaleout);
+ inputChannel.addListenerContainer(processListener);
+
+ listeners.add(processListener);
+ // listener to handle GetMeta requests
+ UimaDefaultMessageListenerContainer getMetaListener
+ = createListener(Type.GetMeta, 1);
+ inputChannel.addListenerContainer(getMetaListener);
+ listeners.add(getMetaListener);
+
+ if ( controller.isCasMultiplier()) {
+ // listener to handle Free CAS requests
+ UimaDefaultMessageListenerContainer freeCasListener
+ = createListener(Type.FreeCAS, 1);
+ inputChannel.addListenerContainer(freeCasListener);
+ listeners.add(freeCasListener);
+ }
+
+ return this;
+ }
+ public void quiesce() throws Exception {
+ controller.quiesceAndStop();
+ }
+
+ public void stop() throws Exception {
+ // controller.stop();
+
+ controller.terminate();
+/*
+ for( UimaDefaultMessageListenerContainer listener : listeners ) {
+ listener.setTerminating();
+ listener.stop();
+ // wait for all process threads to exit
+ if ( controller.isPrimitive() && Type.ProcessCAS.equals(listener.getType())) {
+ latchToCountNumberOfTerminatedThreads.await();
+ }
+ if ( listener.getTaskExecutor() != null ) {
+ if ( listener.getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
+ ThreadPoolTaskExecutor threadExecutor =
+ (ThreadPoolTaskExecutor)listener.getTaskExecutor();
+ threadExecutor.getThreadPoolExecutor().shutdownNow();
+ threadExecutor.shutdown();
+ }
+ }
+// listener.closeConnection();
+// listener.destroy();
+ System.out.println("Stopped Process Listener ....");
+ }
+*/
+ }
+ @Override
+ public String getEndpoint() {
+ return queueName;
+ }
+ public ResourceSpecifier getResourceSpecifier( ){
+ return resourceSpecifier;
+ }
+ @Override
+ public String getId() {
+ // TODO Auto-generated method stub
+ return id;
+ }
+ @Override
+ public String getName() {
+ return name;
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/ActiveMQFactory.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.service.builder;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ActiveMQFactory {
+ private ActiveMQFactory() {
+
+ }
+ public static ActiveMQQueue newQueue(String withName) {
+ return new ActiveMQQueue(withName);
+ }
+
+ public static ActiveMQPrefetchPolicy newPrefetchPolicy(int howMany) {
+ ActiveMQPrefetchPolicy prefetchPolicy =
+ new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setQueuePrefetch(howMany);
+ return prefetchPolicy;
+ }
+
+ public static ActiveMQConnectionFactory newConnectionFactory(String broker, int prefetch) {
+ ActiveMQConnectionFactory factory =
+ new ActiveMQConnectionFactory();
+ factory.setBrokerURL(broker);
+ factory.setPrefetchPolicy(newPrefetchPolicy(prefetch));
+
+ return factory;
+ }
+}