You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:52:43 UTC
svn commit: r688172 [1/2] - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src:
main/java/org/apache/uima/adapter/jms/activemq/
main/java/org/apache/uima/adapter/jms/client/
main/java/org/apache/uima/adapter/jms/service/ test/java/org/apach...
Author: eae
Date: Fri Aug 22 11:52:42 2008
New Revision: 688172
URL: http://svn.apache.org/viewvc?rev=688172&view=rev
Log:
UIMA-1147 commit Jerry's work (patches) merging the post1st branch into the trunk
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/resources/descriptors/multiplier/SimpleCasGenerator.xml
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java Fri Aug 22 11:52:42 2008
@@ -23,11 +23,13 @@
import java.net.BindException;
import java.net.ServerSocket;
+import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
//import org.apache.activemq.memory.UsageListener;
import org.apache.uima.UIMAFramework;
import org.apache.uima.adapter.jms.JmsConstants;
@@ -40,7 +42,7 @@
{
private static final Class CLASS_NAME = BrokerDeployer.class;
private static final int BASE_JMX_PORT = 1200;
- private static final int MAX_PORT_THRESHOLD = 1400;
+ private static final int MAX_PORT_THRESHOLD = 200;
private static BrokerService service;
private Object semaphore = new Object();
@@ -104,16 +106,36 @@
String connectorList = "";
service.setPersistent(false);
-
int startPort = BASE_JMX_PORT;
+ if ( System.getProperties().containsKey("com.sun.management.jmxremote.port") )
+ {
+ startPort = Integer.parseInt(System.getProperty("com.sun.management.jmxremote.port"));
+
+ }
while( startPort < MAX_PORT_THRESHOLD && !openPort(startPort))
{
startPort++;
}
- if ( startPort < MAX_PORT_THRESHOLD )
+ if ( startPort < (startPort+MAX_PORT_THRESHOLD ) )
{
- service.setUseJmx(true);
- service.getManagementContext().setConnectorPort(startPort);
+ MBeanServer jmxServer = null;
+ // Check if the MBeanServer is available. If it is, plug it into the
+ // local Broker. We only need one MBeanServer in the JVM
+ if ( (jmxServer = ManagementContext.findTigerMBeanServer()) != null)
+ {
+ System.out.println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger.");
+ service.getManagementContext().setMBeanServer(jmxServer);
+ // Specify JMX Port
+ service.getManagementContext().setConnectorPort(startPort);
+ }
+ else
+ {
+ service.getManagementContext().setConnectorPort(startPort);
+ service.setUseJmx(true);
+ }
+
+ System.setProperty("com.sun.management.jmxremote.port", String.valueOf(startPort));
+
System.out.println("JMX Console connect URI: service:jmx:rmi:///jndi/rmi://localhost:"+startPort+"/jmxrmi");
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
"startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jmx_uri__CONFIG",
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Fri Aug 22 11:52:42 2008
@@ -27,6 +27,7 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
@@ -34,7 +35,11 @@
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.uima.UIMAFramework;
@@ -47,7 +52,7 @@
import org.apache.uima.util.Level;
import org.springframework.util.Assert;
-public class JmsEndpointConnection_impl
+public class JmsEndpointConnection_impl implements ConsumerListener
{
private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
@@ -90,16 +95,15 @@
connectionMap = aConnectionMap;
serverUri = anEndpoint.getServerURI();
if ( anEndpoint.isReplyEndpoint() &&
- anEndpoint.getDestination() != null &&
- anEndpoint.getDestination() instanceof ActiveMQDestination )
- {
- endpoint = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName();
- }
- else
- {
- endpoint = anEndpoint.getEndpoint();
- }
- endpointName = endpoint;
+ anEndpoint.getDestination() != null &&
+ anEndpoint.getDestination() instanceof ActiveMQDestination )
+ {
+ endpoint = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName();
+ }
+ else
+ {
+ endpoint = anEndpoint.getEndpoint();
+ }
isReplyEndpoint = anEndpoint.isReplyEndpoint();
anEndpoint.remove();
delegateEndpoint = anEndpoint;
@@ -108,6 +112,7 @@
{
controller = aController;
}
+
public boolean isRetryEnabled()
{
return retryEnabled;
@@ -117,6 +122,7 @@
{
this.retryEnabled = retryEnabled;
}
+
public boolean isOpen()
{
@@ -127,7 +133,7 @@
return ((ActiveMQSession) producerSession).isRunning();
}
- public void setInactivityTimeout(long aTimeout)
+ public void setInactivityTimeout(long aTimeout)
{
inactivityTimeout = aTimeout;
}
@@ -184,10 +190,6 @@
}
}
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // System.out.println("Creating New Connection To Endpoint::"+getEndpoint());
- // ConsumerEventSource evSource = new ConsumerEventSource(conn, (ActiveMQDestination)destination);
- // evSource.setConsumerListener(this);
- // evSource.start();
conn.start();
if ( controller != null )
{
@@ -198,7 +200,6 @@
}
if ( controller != null && controller.getInputChannel() != null)
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connection_open_to_endpoint__FINE", new Object[] { controller.getComponentName(), getEndpoint(), serverUri });
-
}
catch ( Exception e)
{
@@ -208,10 +209,14 @@
}
public synchronized void open() throws AsynchAEException, ServiceShutdownException
{
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "open", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open__FINE",
+ new Object[] { endpoint, serverUri });
if ( !connectionAborted )
{
openChannel();
}
+
}
public synchronized void abort()
@@ -357,31 +362,28 @@
{
return connectionCreationTimestamp;
}
- private synchronized void startTimer(long aConnectionCreationTimestamp, String componentName, String endpointName)
- {
+ private synchronized void startTimer(long aConnectionCreationTimestamp)
+ {
final long cachedConnectionCreationTimestamp = aConnectionCreationTimestamp;
- final JmsEndpointConnection_impl __instance = this;
- final String thisEndpoint = endpointName;
Date timeToRun = new Date(System.currentTimeMillis() + inactivityTimeout);
if (timer != null)
{
timer.cancel();
}
- if (componentName != null)
+ if (controller != null)
{
- timer = new Timer("Controller:" + componentName + ":TimerThread-JmsEndpointConnection_impl:" + endpointName + ":" + System.nanoTime());
+ timer = new Timer("Controller:" + controller.getComponentName() + ":TimerThread-JmsEndpointConnection_impl:" + endpoint + ":" + System.nanoTime());
}
else
{
- timer = new Timer("TimerThread-JmsEndpointConnection_impl:" + endpointName + ":" + System.nanoTime());
+ timer = new Timer("TimerThread-JmsEndpointConnection_impl:" + endpoint + ":" + System.nanoTime());
}
timer.schedule(new TimerTask() {
public void run()
{
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "startTimer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_inactivity_timer_expired_INFO", new Object[] { Thread.currentThread().getName(), inactivityTimeout, thisEndpoint });
-// if (connectionCreationTimestamp <= cachedConnectionCreationTimestamp)
- if (getConnectionCreationTimeout() <= cachedConnectionCreationTimestamp)
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "startTimer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_inactivity_timer_expired_INFO", new Object[] { Thread.currentThread().getName(), inactivityTimeout, endpoint });
+ if (connectionCreationTimestamp <= cachedConnectionCreationTimestamp)
{
try
{
@@ -395,8 +397,6 @@
}
}
}
-// timer.cancel();
-// timer.purge();
cancelTimer();
}
}, timeToRun);
@@ -416,11 +416,6 @@
{
timer = null;
}
-// if (timer != null)
-// {
-// timer.cancel();
-// timer = null;
-// }
}
public synchronized boolean send(Message aMessage, boolean startTimer) //throws AsynchAEException
@@ -455,7 +450,7 @@
}
if (startTimer)
{
- startTimer(connectionCreationTimestamp, controller.getComponentName(), endpointName);
+ startTimer(connectionCreationTimestamp);
}
lastException = null;
return true;
@@ -464,6 +459,11 @@
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_ableto_send_msg_INFO", new Object[] { controller.getComponentName(), destinationName, i+1, 10 });
lastException = e;
+ // If the controller has been stopped no need to send messages
+ if ( controller.isStopped())
+ {
+ return true;
+ }
}
try
{
@@ -481,6 +481,13 @@
return false;
}
+ public void onConsumerEvent(ConsumerEvent arg0)
+ {
+ if (controller != null)
+ {
+ controller.handleDelegateLifeCycleEvent(getEndpoint(), arg0.getConsumerCount());
+ }
+ }
protected synchronized void finalize() throws Throwable
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Fri Aug 22 11:52:42 2008
@@ -24,10 +24,15 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
+import mx4j.tools.adaptor.http.GetAttributeCommandProcessor;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
@@ -40,6 +45,7 @@
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.message.JmsMessageContext;
import org.apache.uima.util.Level;
@@ -86,6 +92,8 @@
private List listenerContainerList = new ArrayList();
+ private Object mux = new Object();
+
public AnalysisEngineController getController()
{
return controller;
@@ -174,6 +182,32 @@
}
return false;
}
+ private boolean isRemoteRequest( Message aMessage ) throws Exception
+ {
+
+ // Dont do checkpoints if a message was sent from a Cas Multiplier
+ if ( aMessage.propertyExists(AsynchAEMessage.CasSequence))
+ {
+ return false;
+ }
+
+ Map properties = ((ActiveMQMessage)aMessage).getProperties();
+ if ( properties.containsKey(AsynchAEMessage.MessageType) &&
+ properties.containsKey(AsynchAEMessage.Command) &&
+ properties.containsKey(UIMAMessage.ServerURI))
+ {
+ int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+ boolean isRemote = aMessage.getStringProperty(UIMAMessage.ServerURI).startsWith("vm") == false;
+ if ( isRemote && msgType == AsynchAEMessage.Request &&
+ (command == AsynchAEMessage.Process ||
+ command == AsynchAEMessage.CollectionProcessComplete) )
+ {
+ return true;
+ }
+ }
+ return false;
+ }
/**
* Validate command contained in the header of the JMS Message
@@ -265,7 +299,11 @@
{
int command = aMessage.getIntProperty(AsynchAEMessage.Command);
int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
-
+ if ( isStopped() || getController() == null || getController().getInProcessCache() == null )
+ {
+ // Shutting down
+ return true;
+ }
if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response )
{
String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
@@ -398,32 +436,33 @@
return false;
}
- private synchronized long computeIdleTime()
+ private boolean isCheckpointWorthy( Message aMessage ) throws Exception
{
- try
+ synchronized( mux )
{
- boolean isAggregate = getController() instanceof AggregateAnalysisEngineController;
- if ( isAggregate || !((PrimitiveAnalysisEngineController)getController()).isMultiplier() )
+ // Dont do checkpoints if a message was sent from a Cas Multiplier
+ if ( aMessage.propertyExists(AsynchAEMessage.CasSequence))
{
- long lastReplyTime = getController().getReplyTime();
- if ( lastReplyTime > 0 )
+ return false;
+ }
+ Map properties = ((ActiveMQMessage)aMessage).getProperties();
+ if ( properties.containsKey(AsynchAEMessage.MessageType) &&
+ properties.containsKey(AsynchAEMessage.Command) &&
+ properties.containsKey(UIMAMessage.ServerURI))
+ {
+ int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+ if ( msgType == AsynchAEMessage.Request &&
+ (command == AsynchAEMessage.Process ||
+ command == AsynchAEMessage.CollectionProcessComplete) )
{
- long t = System.nanoTime();
- long delta = t-(long)lastReplyTime;
- getController().saveIdleTime(delta, "", true);
- return delta;
+ return true;
}
}
+ return false;
+
}
- catch( Exception e)
- {
- e.printStackTrace();
- }
- return 0;
}
-
-
-
/**
* Receives Messages from the JMS Provider. It checks the message header
* to determine the type of message received. Based on the type,
@@ -436,38 +475,49 @@
*/
public void onMessage(Message aMessage, Session aJmsSession )
{
+ if ( isStopped() )
+ {
+ return;
+ }
+
+
try
{
// wait until message handlers are plugged in
msgHandlerLatch.await();
}
catch( InterruptedException e) {}
-
try
{
// wait until the controller is plugged in
controllerLatch.await();
}
catch( InterruptedException e) {}
+ long idleTime = 0;
+ boolean doCheckpoint = false;
+
+ String eN = endpointName;
+ if ( getController() != null )
+ {
+ eN = getController().getComponentName();
+ if (eN == null )
+ {
+ eN = "";
+ }
+ }
+
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_msg__FINE",
- new Object[] { endpointName });
+ new Object[] { eN });
JmsMessageContext messageContext = null;
- long idleTime = 0;
+
+ int requestType = 0;
try
{
- if ( isProcessRequest(aMessage) )
- {
- // Compute the time between waiting for this request and update
- // this service performance stats.
- idleTime = computeIdleTime();
- }
-
// Wrap JMS Message in MessageContext
messageContext = new JmsMessageContext( aMessage, endpointName );
- messageContext.getEndpoint().setIdleTime(idleTime);
if ( jmsSession == null )
{
jmsSession = aJmsSession;
@@ -502,9 +552,6 @@
}
if ( validMessage(aMessage) )
{
-
-
-
String command = decodeIntToString(AsynchAEMessage.Command, aMessage.getIntProperty(AsynchAEMessage.Command) );
String messageType = decodeIntToString(AsynchAEMessage.MessageType, aMessage.getIntProperty(AsynchAEMessage.MessageType) );
if ( ackMessageNow(aMessage))
@@ -541,8 +588,34 @@
new Object[] { controller.getComponentName(), msgFrom, messageType, command, casRefId });
}
}
+ else
+ {
+ }
// Delegate processing of the message contained in the MessageContext to the
// chain of handlers
+ try
+ {
+ if ( isRemoteRequest( aMessage ))
+ {
+ // Compute the idle time waiting for this request
+ idleTime = getController().getIdleTime();
+
+ // This idle time is reported to the client thus save it in the endpoint
+ // object. This value will be fetched and added to the outgoing reply.
+ messageContext.getEndpoint().setIdleTime(idleTime);
+ }
+ }
+ catch( Exception e ) {}
+
+ // Determine if this message is a request and either GetMeta, CPC, or Process
+ doCheckpoint = isCheckpointWorthy( aMessage );
+ requestType = aMessage.getIntProperty(AsynchAEMessage.Command);
+ // Checkpoint
+ if ( doCheckpoint )
+ {
+ getController().beginProcess(requestType);
+ }
+
if ( handler != null )
{
@@ -563,6 +636,14 @@
controller.getErrorHandlerChain().handle(t, HandlerBase.populateErrorContext( messageContext ), controller);
}
+ finally
+ {
+ // Call the end checkpoint for non-aggregates. For primitives the CAS has been fully processed if we are here
+ if ( doCheckpoint && getController() instanceof PrimitiveAnalysisEngineController )
+ {
+ getController().endProcess(requestType);
+ }
+ }
}
public int getSessionAckMode()
@@ -575,10 +656,11 @@
}
public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer messageListener)
{
- this.messageListener = messageListener;
- System.setProperty("BrokerURI", messageListener.getBrokerUrl());
- brokerURL = messageListener.getBrokerUrl();
- listenerContainerList.add(messageListener);
+ this.messageListener = messageListener;
+ System.setProperty("BrokerURI", messageListener.getBrokerUrl());
+ brokerURL = messageListener.getBrokerUrl();
+ listenerContainerList.add(messageListener);
+ this.messageListener = messageListener;
if ( getController() != null )
{
try
@@ -588,6 +670,17 @@
} catch( Exception e) {}
}
}
+ public ActiveMQConnectionFactory getConnectionFactory()
+ {
+ if (messageListener == null )
+ {
+ return null;
+ }
+ else
+ {
+ return (ActiveMQConnectionFactory)messageListener.getConnectionFactory();
+ }
+ }
public void ackMessage( MessageContext aMessageContext )
{
if ( aMessageContext != null && sessionAckMode == Session.CLIENT_ACKNOWLEDGE )
@@ -680,22 +773,5 @@
{
return stopped;
}
-/*
- private void spinThreadForListenerShutdown(final UimaDefaultMessageListenerContainer listenerContainer)
- {
- new Thread("Shutdown Thread For Listener:"+listenerContainer.getEndpointName()) {
- public void run()
- {
- try
- {
- listenerContainer.stop();
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
- "spinThreadForListenerShutdown.run()", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stop_listener__INFO",
- new Object[] { listenerContainer.getEndpointName() });
- }
- catch( Exception e) { e.printStackTrace();}
- }
- }.start();
- }
-*/
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Fri Aug 22 11:52:42 2008
@@ -22,7 +22,9 @@
import java.io.ByteArrayOutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -50,6 +52,7 @@
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaEEServiceException;
+import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.UIMAMessage;
@@ -88,6 +91,7 @@
private boolean aborting = false;
+ private Destination freeCASTempQueue;
/**
* Sets the ActiveMQ Broker URI
*/
@@ -95,6 +99,10 @@
{
serverURI = aServerURI;
}
+ protected void setFreeCasQueue( Destination destination)
+ {
+ freeCASTempQueue = destination;
+ }
public String getServerURI()
{
return System.getProperty("BrokerURI");
@@ -165,7 +173,7 @@
public String serializeCAS(boolean isReply, CAS aCAS, String aCasReferenceId, String aSerializerKey) throws Exception
{
- long start = System.nanoTime();
+ long start = getAnalysisEngineController().getCpuTime();
String serializedCas = null;
@@ -187,10 +195,10 @@
else
{
serSharedData = cacheEntry.getDeserSharedData();//new XmiSerializationSharedData();
- if (serSharedData == null) {
- serSharedData = new XmiSerializationSharedData();
- cacheEntry.setXmiSerializationData(serSharedData);
- }
+ if (serSharedData == null) {
+ serSharedData = new XmiSerializationSharedData();
+ cacheEntry.setXmiSerializationData(serSharedData);
+ }
serializedCas = UimaSerializer.serializeCasToXmi(aCAS, serSharedData);
int maxOutgoingXmiId = serSharedData.getMaxXmiId();
// Save High Water Mark in case a merge is needed
@@ -229,9 +237,8 @@
LongNumericStatistic statistic;
if ( (statistic = getAnalysisEngineController().getMonitor().getLongNumericStatistic("",Monitor.TotalSerializeTime)) != null )
{
- statistic.increment(System.nanoTime() - start);
+ statistic.increment(getAnalysisEngineController().getCpuTime() - start);
}
-
return serializedCas;
}
@@ -386,9 +393,10 @@
endpointConnection.send(tm, true);
if ( aCommand == AsynchAEMessage.ReleaseCAS )
{
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "collectionProcessComplete",
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_release_cas_req__FINE", new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),aCasReferenceId });
}
+
}
catch( JMSException e)
{
@@ -423,18 +431,20 @@
{
JmsEndpointConnection_impl endpointConnection =
getEndpointConnection(anEndpoint);
-
+
TextMessage tm = endpointConnection.produceTextMessage(null);
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
tm.setText(""); // Need this to prevent the Broker from throwing an exception when sending a message to C++ service
populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
-/*
- if ( anEndpoint.getServerURI().toLowerCase().startsWith("http") )
+
+ // For remotes add a special property to the message. This property
+ // will be echoed back by the service. This property enables matching
+ // the reply with the right endpoint object managed by the aggregate.
+ if ( anEndpoint.isRemote() )
{
- tm.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true);
+ tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
}
-*/
boolean startTimer = false;
// Start timer for endpoints that are remote and are managed by a different broker
// than this service. If an endpoint contains a destination object, the outgoing
@@ -462,6 +472,34 @@
"sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_releasecas_request__endpoint__FINEST",
new Object[] {getAnalysisEngineController().getName(), endpointConnection.getEndpoint()});
}
+ else if ( aCommand == AsynchAEMessage.GetMeta )
+ {
+ if ( anEndpoint.getDestination() != null )
+ {
+ String replyQueueName = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName().replaceAll(":","_");
+ if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController )
+ {
+ String delegateKey =
+ ((AggregateAnalysisEngineController)getAnalysisEngineController()).lookUpDelegateKey(anEndpoint.getEndpoint());
+ ServiceInfo serviceInfo =((AggregateAnalysisEngineController)getAnalysisEngineController()).getDelegateServiceInfo(delegateKey);
+ if (serviceInfo != null )
+ {
+ serviceInfo.setReplyQueueName(replyQueueName);
+ serviceInfo.setServiceKey(delegateKey);
+ System.out.println("Service:"+delegateKey+" Directed to Reply To:"+replyQueueName);
+ }
+ }
+ }
+ else if ( !anEndpoint.isRemote())
+ {
+ ServiceInfo serviceInfo =((AggregateAnalysisEngineController)getAnalysisEngineController()).getServiceInfo();
+ if (serviceInfo != null )
+ {
+ serviceInfo.setReplyQueueName(controllerInputEndpoint);
+ System.out.println("Aggregate Service Reply Queue:"+getAnalysisEngineController().getComponentName()+" Reply To:"+controllerInputEndpoint);
+ }
+ }
+ }
else
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -494,26 +532,7 @@
{
if (anEndpoint.isRemote())
{
- long t1 = System.nanoTime();
String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
-/*
- if ( analysisEngineController instanceof AggregateAnalysisEngineController )
- {
- String delegateKey
- = ((AggregateAnalysisEngineController)analysisEngineController).
- lookUpDelegateKey(anEndpoint.getEndpoint());
- if ( delegateKey != null)
- {
- long timeToSerialize = System.nanoTime() - t1;
-// ((AggregateAnalysisEngineController)analysisEngineController).
-// incrementCasSerializationTime(delegateKey, timeToSerialize);
-
- analysisEngineController.
- getServicePerformance().
- incrementCasSerializationTime(timeToSerialize);
- }
- }
-*/
if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -589,13 +608,8 @@
}
}
}
-// catch ( AsynchAEException e)
-// {
-// throw e;
-// }
catch ( Exception e)
{
-// throw new AsynchAEException(e);
// Handle the error
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
@@ -614,9 +628,6 @@
{
// Serializes CAS and releases it back to CAS Pool
String serializedCAS = getSerializedCas(true, aNewCasReferenceId, anEndpoint, anEndpoint.isRetryEnabled());
-
-
-
sendCasToRemoteEndpoint(false, serializedCAS, anInputCasReferenceId, aNewCasReferenceId, anEndpoint, false, sequence);
}
else
@@ -643,6 +654,41 @@
}
+ public void sendReply( CacheEntry entry, Endpoint anEndpoint ) throws AsynchAEException
+ {
+ try
+ {
+ anEndpoint.setReplyEndpoint(true);
+ if ( anEndpoint.isRemote() )
+ {
+ // Serializes CAS and releases it back to CAS Pool
+ String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+ sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
+ }
+ else
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_reply_with_sequence__FINE",
+ new Object[] { anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getCasSequence() });
+ sendCasToCollocatedDelegate(false, entry.getInputCasReferenceId(), entry.getCasReferenceId(), anEndpoint, false, entry.getCasSequence());
+ }
+ }
+ catch( ServiceShutdownException e)
+ {
+ e.printStackTrace();
+ }
+ catch (AsynchAEException e)
+ {
+ throw e;
+ }
+
+ catch (Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
public void sendReply( int aCommand, Endpoint anEndpoint ) throws AsynchAEException
{
anEndpoint.setReplyEndpoint(true);
@@ -670,7 +716,7 @@
{
// Unable to establish connection to the endpoint. Logit and continue
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { e});
}
@@ -798,7 +844,7 @@
{
// Unable to establish connection to the endpoint. Logit and continue
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
}
catch( ServiceShutdownException e)
@@ -808,7 +854,7 @@
catch (AsynchAEException e)
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
}
catch (Exception e)
@@ -916,7 +962,7 @@
else
{
CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
- long t1 = System.nanoTime();
+ long t1 = getAnalysisEngineController().getCpuTime();
// Serialize CAS for remote Delegates
String serializer = anEndpoint.getSerializer();
if ( serializer == null || serializer.trim().length() == 0)
@@ -924,7 +970,10 @@
serializer = "xmi";
}
serializedCAS = serializeCAS(isReply, cas, aCasReferenceId, serializer);
- long timeToSerializeCas = System.nanoTime()-t1;
+ long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1;
+
+ getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
+
entry.incrementTimeToSerializeCAS(timeToSerializeCas);
casStats.incrementCasSerializationTime(timeToSerializeCas);
getAnalysisEngineController().getServicePerformance().
@@ -949,37 +998,6 @@
CAS cas = null;
try
{
-
-
-/*
- String serializedCAS = null;
- // Using Cas reference Id retrieve CAS from the shared Cash
- cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
-
- if ( cas == null )
- {
- serializedCAS = getAnalysisEngineController().getInProcessCache().getSerializedCAS( aCasReferenceId );
- }
- else
- {
- CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
- long t1 = System.nanoTime();
- // Serialize CAS for remote Delegates
- String serializer = anEndpoint.getSerializer();
- if ( serializer == null || serializer.trim().length() == 0)
- {
- serializer = "xmi";
- }
- serializedCAS = serializeCAS(isReply, cas, aCasReferenceId, serializer);
- entry.incrementTimeToSerializeCAS(System.nanoTime()-t1);
- if ( cacheSerializedCas )
- {
- getAnalysisEngineController().getInProcessCache().saveSerializedCAS(aCasReferenceId, serializedCAS);
- }
- }
- return serializedCAS;
-*/
-
return getSerializedCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
}
catch( Exception e)
@@ -1010,7 +1028,6 @@
{
if ( anEndpoint.isFinal() )
{
- //aTextMessage.setLongProperty(AsynchAEMessage.TotalTimeSpentInAnalytic, (System.nanoTime()-anEndpoint.getEntryTime()));
aTextMessage.setLongProperty("SENT-TIME", System.nanoTime());
}
@@ -1028,14 +1045,13 @@
getAnalysisEngineController().getCasStatistics(aCasReferenceId);
aTextMessage.setLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats.getRawCasSerializationTime());
-// aTextMessage.setLongProperty(AsynchAEMessage.TimeWaitingForCAS, entry.getTimeWaitingForCAS());
aTextMessage.setLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats.getRawCasDeserializationTime());
aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
- aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, anEndpoint.getIdleTime() );
-
- String lookupKey = getAnalysisEngineController().getName();//getInProcessCache().getMessageAccessorByReference(aCasReferenceId).getEndpointName();
+ long iT =getAnalysisEngineController().getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
+ aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT );
+ String lookupKey = getAnalysisEngineController().getName();
long arrivalTime = getAnalysisEngineController().getTime( aCasReferenceId, lookupKey); //serviceInputEndpoint);
- long timeInService = System.nanoTime()-arrivalTime;
+ long timeInService = getAnalysisEngineController().getCpuTime()-arrivalTime;
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"populateStats", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timein_service__FINEST",
new Object[] { serviceInputEndpoint, (double)timeInService/(double)1000000 });
@@ -1109,7 +1125,7 @@
"populateHeaderWithRequestContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
new Object[] {getAnalysisEngineController().getComponentName(), anEndpoint.getServerURI(), anEndpoint.getEndpoint()});
}
- else
+ else // collocated
{
aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
}
@@ -1142,6 +1158,10 @@
{
aMessage.setStringProperty(AsynchAEMessage.ServerIP,hostIP);
}
+ if ( anEndpoint.getEndpointServer() != null )
+ {
+ aMessage.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getEndpointServer());
+ }
}
else
{
@@ -1291,17 +1311,21 @@
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-/*
- // Add the top ancestor of this CAS.
- addTopCASParentReferenceId(tm, anInputCasReferenceId);
-*/
tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
// Add a sequence number assigned to this CAS by the controller
tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
+ isRequest = true;
+ // Add the name of the FreeCas Queue
+// if ( secondaryInputEndpoint != null )
+// {
+// tm.setStringProperty(AsynchAEMessage.MessageFrom, secondaryInputEndpoint);
+// }
- if ( secondaryInputEndpoint != null )
+ if ( freeCASTempQueue != null )
{
- tm.setStringProperty(AsynchAEMessage.MessageFrom, secondaryInputEndpoint);
+ // Attach a temp queue to the outgoing message. This a queue where
+ // Free CAS notifications need to be sent from the client
+ tm.setJMSReplyTo(freeCASTempQueue);
}
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
{
@@ -1309,7 +1333,7 @@
if ( cacheEntry != null )
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId, cacheEntry.getInputCasReferenceId() });
}
}
@@ -1344,11 +1368,140 @@
// ----------------------------------------------------
endpointConnection.send(tm, startConnectionTimer);
- if ( getAnalysisEngineController().isTopLevelComponent() )
+// if ( getAnalysisEngineController().isTopLevelComponent() )
+// {
+// getAnalysisEngineController().getInProcessCache().dumpContents(getAnalysisEngineController().getComponentName());
+// }
+ if ( !isRequest )
{
- getAnalysisEngineController().getInProcessCache().dumpContents();
+ addIdleTime(tm);
+ }
+ }
+ catch( JMSException e)
+ {
+ // Unable to establish connection to the endpoint. Logit and continue
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
+
+ }
+
+ catch( ServiceShutdownException e)
+ {
+ throw e;
+ }
+ catch( AsynchAEException e)
+ {
+ throw e;
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+ private void sendCasToRemoteEndpoint( boolean isRequest, String aSerializedCAS, CacheEntry entry, Endpoint anEndpoint, boolean startTimer )
+ throws AsynchAEException, ServiceShutdownException
+ {
+
+ try
+ {
+ if ( aborting )
+ {
+ return;
+ }
+ // Get the connection object for a given endpoint
+ JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ // Create empty JMS Text Message
+ TextMessage tm = endpointConnection.produceTextMessage("");
+
+ // Save Serialized CAS in case we need to re-send it for analysis
+ if ( anEndpoint.isRetryEnabled() && getAnalysisEngineController().getInProcessCache().getSerializedCAS(entry.getCasReferenceId()) == null)
+ {
+ getAnalysisEngineController().getInProcessCache().saveSerializedCAS(entry.getCasReferenceId(), aSerializedCAS);
+ }
+
+ tm.setText(aSerializedCAS);
+ tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
+ // Add Cas Reference Id to the outgoing JMS Header
+ tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
+ // Add common properties to the JMS Header
+ if ( isRequest == true )
+ {
+ populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
+ }
+ else
+ {
+ populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
+ }
+ // The following is true when the analytic is a CAS Multiplier
+ if ( entry.isSubordinate() && !isRequest )
+ {
+ // Override MessageType set in the populateHeaderWithContext above.
+ // Make the reply message look like a request. This message will contain a new CAS
+ // produced by the CAS Multiplier. The client will treat this CAS
+ // differently from the input CAS.
+ tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+
+ isRequest = true;
+ // Save the id of the parent CAS
+ tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry.getCasReferenceId()));
+ // Add a sequence number assigned to this CAS by the controller
+ tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
+ // If this is a Cas Multiplier, add a reference to a special queue where
+ // the client sends Free Cas Notifications
+ if ( freeCASTempQueue != null )
+ {
+ // Attach a temp queue to the outgoing message. This is a queue where
+ // Free CAS notifications need to be sent from the client
+ tm.setJMSReplyTo(freeCASTempQueue);
+ }
+ if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+ new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
+ }
+ }
+
+ // Add stats
+ populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
+ if ( startTimer)
+ {
+ // Start a timer for this request. The amount of time to wait
+ // for response is provided in configuration for the endpoint
+ anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+ new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+
+ // By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+ // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+ // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+ // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+ // by anEndpoint.getDestination != null, we dont start the timer.
+ boolean startConnectionTimer = true;
+
+ if ( anEndpoint.getDestination() != null || !isRequest )
+ {
+ startConnectionTimer = false;
+ }
+
+ // ----------------------------------------------------
+ // Send Request Messsage to the Endpoint
+ // ----------------------------------------------------
+ endpointConnection.send(tm, startConnectionTimer);
+
+// if ( getAnalysisEngineController().isTopLevelComponent() )
+// {
+// getAnalysisEngineController().getInProcessCache().dumpContents(getAnalysisEngineController().getComponentName());
+// }
+ if ( !isRequest )
+ {
+ addIdleTime(tm);
}
- addIdleTime(tm);
}
catch( JMSException e)
{
@@ -1373,6 +1526,24 @@
}
}
+
+ private String getTopParentCasReferenceId( String casReferenceId ) throws Exception
+ {
+ if ( !getAnalysisEngineController().getInProcessCache().entryExists(casReferenceId) )
+ {
+ return null;
+ }
+
+ CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ if ( entry.isSubordinate() )
+ {
+ // Recurse until the top CAS reference Id is found
+ return getTopParentCasReferenceId(entry.getInputCasReferenceId());
+ }
+ // Return the top ancestor CAS id
+ return entry.getCasReferenceId();
+ }
+
private boolean isProcessReply( Message aMessage )
{
try
@@ -1389,14 +1560,14 @@
private void addIdleTime( Message aMessage )
{
-
+/*
if ( isProcessReply(aMessage ) &&
( getAnalysisEngineController() instanceof AggregateAnalysisEngineController ||
- !((PrimitiveAnalysisEngineController)getAnalysisEngineController()).isMultiplier() ) )
- {
- long t = System.nanoTime();
- getAnalysisEngineController().saveReplyTime(t, "");
- }
+ !getAnalysisEngineController().isCasMultiplier() ) )
+
+*/
+ long t = System.nanoTime();
+ getAnalysisEngineController().saveReplyTime(t, "");
}
private void sendCasToCollocatedDelegate(boolean isRequest, String anInputCasReferenceId, String aNewCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence)
throws AsynchAEException, ServiceShutdownException
@@ -1449,6 +1620,8 @@
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ isRequest = true;
+
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
{
CacheEntry cacheEntry = getCacheEntry(aNewCasReferenceId);
@@ -1485,7 +1658,10 @@
// Send Request Messsage to Delegate
// ----------------------------------------------------
endpointConnection.send(tm, startTimer);
- addIdleTime(tm);
+ if ( !isRequest )
+ {
+ addIdleTime(tm);
+ }
}
catch( JMSException e)
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java Fri Aug 22 11:52:42 2008
@@ -24,6 +24,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIDGenerator;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
@@ -37,6 +39,8 @@
import org.apache.uima.util.Level;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.springframework.jms.support.destination.DestinationResolver;
+
public class SpringContainerDeployer implements ControllerCallbackListener {
private static final Class CLASS_NAME = SpringContainerDeployer.class;
@@ -45,19 +49,101 @@
private boolean serviceInitializationException;
private Object serviceMonitor = new Object();
private ConcurrentHashMap springContainerRegistry=null;
-
+ private FileSystemXmlApplicationContext context = null;
public SpringContainerDeployer(){
}
public SpringContainerDeployer( ConcurrentHashMap aSpringContainerRegistry ) {
springContainerRegistry = aSpringContainerRegistry;
}
+ private UimaDefaultMessageListenerContainer produceListenerConnector(ActiveMQConnectionFactory cf)
+ {
+ DestinationResolver resolver = new TempDestinationResolver();
+ UimaDefaultMessageListenerContainer connector =
+ new UimaDefaultMessageListenerContainer(true);
+
+ connector.setConnectionFactory(cf);
+ connector.setConcurrentConsumers(1);
+ connector.setDestinationResolver(resolver);
+ connector.initialize();
+ connector.start();
+ while( connector.getListenerEndpoint() == null )
+ {
+ synchronized(connector)
+ {
+ try
+ {
+ connector.wait(50);
+ }
+ catch( InterruptedException e) {}
+ }
+ }
+ return connector;
+ }
+ private ActiveMQConnectionFactory getTopLevelQueueConnectionFactory( ApplicationContext ctx )
+ {
+ ActiveMQConnectionFactory factory = null;
+ String[] inputChannelBeanIds = ctx.getBeanNamesForType(org.apache.uima.adapter.jms.activemq.JmsInputChannel.class);
+ String beanId = null;
+ for( int i=0; i < inputChannelBeanIds.length; i++)
+ {
+ JmsInputChannel inputChannel = (JmsInputChannel)ctx.getBean(inputChannelBeanIds[i]);
+ if ( inputChannel.getName().startsWith("top_level_input_queue_service") &&
+ inputChannel instanceof JmsInputChannel )
+ {
+ factory = ((JmsInputChannel)inputChannel).getConnectionFactory();
+ break;
+ }
+ }
+ return factory;
+ }
private void initializeTopLevelController( AnalysisEngineController cntlr, ApplicationContext ctx)
throws Exception
{
((FileSystemXmlApplicationContext) ctx).setDisplayName(cntlr.getComponentName());
cntlr.addControllerCallbackListener(this);
+
+ String inputQueueName = cntlr.getServiceEndpointName();
+ if ( inputQueueName != null )
+ {
+ if ( ctx.containsBean(inputQueueName) )
+ {
+ ActiveMQQueue queue = (ActiveMQQueue)ctx.getBean(inputQueueName);
+ if ( cntlr.getServiceInfo() != null )
+ {
+ cntlr.getServiceInfo().setInputQueueName(queue.getQueueName());
+ }
+ }
+ else
+ {
+ if ( cntlr.getServiceInfo() != null )
+ {
+ cntlr.getServiceInfo().setInputQueueName(inputQueueName);
+ }
+ }
+ }
+
+
+ // If this is a Cas Multiplier add a special temp queue for receiving Free CAS
+ // notifications.
+ if ( cntlr.isCasMultiplier() )
+ {
+
+ ActiveMQConnectionFactory cf = getTopLevelQueueConnectionFactory( ctx );
+ // Create a listener and a temp queue for Free CAS notifications.
+ UimaDefaultMessageListenerContainer connector = produceListenerConnector(cf);
+ System.out.println(">>>> Cas Multiplier Controller:"+cntlr.getComponentName()+" Activated Listener to Receive Free CAS Notifications - Temp Queue Name:"+connector.getEndpointName());
+ // Direct all messages to the InputChannel
+ connector.setMessageListener(((JmsInputChannel)cntlr.getInputChannel()));
+ ((JmsInputChannel)cntlr.getInputChannel()).setListenerContainer(connector);
+ // Save the temp queue reference in the Output Channel. The output channel will
+ // add this queue to every outgoing message containing a CAS generated by the
+ // Cas Multiplier.
+ ((JmsOutputChannel)cntlr.getOutputChannel()).setFreeCasQueue(connector.getListenerEndpoint());
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initializeTopLevelController", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_activated_fcq__CONFIG", new Object[] { cntlr.getComponentName(), connector.getEndpointName() });
+ }
+
if (cntlr instanceof AggregateAnalysisEngineController) {
// Get a map of delegates for the top level aggregate
Map destinationMap = ((AggregateAnalysisEngineController) cntlr).getDestinations();
@@ -197,8 +283,8 @@
if (!springContextFile.startsWith("file:")) {
springContextFile = "file:" + springContextFile;
}
- ApplicationContext ctx = new FileSystemXmlApplicationContext(springContextFile);
- return initializeContainer(ctx);
+ context = new FileSystemXmlApplicationContext(springContextFile);
+ return initializeContainer(context);
} catch (ResourceInitializationException e) {
//e.printStackTrace();
throw e;
@@ -226,8 +312,8 @@
// synchronous ( one bean at a time), some beans run in a separate
// threads. The completion of container deployment doesnt
// necessarily mean that all beans have initialized completely.
- ApplicationContext ctx = new FileSystemXmlApplicationContext(springContextFiles);
- return initializeContainer(ctx);
+ context = new FileSystemXmlApplicationContext(springContextFiles);
+ return initializeContainer(context);
} catch (ResourceInitializationException e) {
e.printStackTrace();
throw e;
@@ -253,7 +339,7 @@
}
}
}
- public void notifyOnInitializationFailure(Exception e) {
+ public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
// Initialization exception. Notify blocking thread and indicate a
// problem
@@ -265,14 +351,37 @@
}
- public void notifyOnInitializationSuccess() {
+ public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
serviceInitializationCompleted = true;
+
synchronized (serviceMonitor) {
serviceMonitor.notifyAll();
}
}
+ public void notifyOnInitializationFailure( Exception e)
+ {
+ notifyOnInitializationFailure(null, e);
+ }
+ public void notifyOnInitializationSuccess()
+ {
+ notifyOnInitializationSuccess(null);
+ }
public void notifyOnTermination(String message) {
}
+ public FileSystemXmlApplicationContext getSpringContext()
+ {
+ return context;
+ }
+
+ public boolean isInitialized()
+ {
+ return serviceInitializationCompleted;
+ }
+
+ public boolean initializationFailed()
+ {
+ return serviceInitializationException;
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Aug 22 11:52:42 2008
@@ -35,6 +35,7 @@
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.Threshold;
import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
@@ -50,15 +51,21 @@
private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class;
private String destinationName="";
private Endpoint endpoint;
+ private boolean freeCasQueueListener;
private AnalysisEngineController controller;
private int retryCount = 2;
+
public UimaDefaultMessageListenerContainer()
{
super();
- setRecoveryInterval(20000);
- //setAcceptMessagesWhileStopping(false);
+ setRecoveryInterval(60000);
+ setAcceptMessagesWhileStopping(false);
setExceptionListener(this);
-
+ }
+ public UimaDefaultMessageListenerContainer(boolean freeCasQueueListener)
+ {
+ this();
+ this.freeCasQueueListener = freeCasQueueListener;
}
public void setController( AnalysisEngineController aController)
{
@@ -84,25 +91,6 @@
"handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
new Object[] { endpointName, getBrokerUrl(), t });
-/*
- while ( retryCount > 0 )
- {
- retryCount--;
- String brokerURL = ((ActiveMQConnectionFactory)getConnectionFactory()).getBrokerURL();
- System.out.println(">>>>> Retrying Connection To Broker:"+brokerURL+" Endpoint:"+endpoint.getEndpoint()+" Sleeping For 1 Minute Before Retry. Retries Remaining:"+retryCount);
- // Retry the connection
- try
- {
- establishSharedConnection();
- synchronized(this)
- {
- wait(60000);
- }
- break;
- }
- catch( Exception e) {}
- }
-*/
boolean terminate = true;
if ( disableListener(t) )
{
@@ -311,5 +299,8 @@
{
endpoint = anEndpoint;
}
-
+ public boolean isFreeCasQueueListener()
+ {
+ return freeCasQueueListener;
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Fri Aug 22 11:52:42 2008
@@ -22,6 +22,7 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -30,6 +31,8 @@
import org.apache.activemq.command.ActiveMQDestination;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
/**
* Initializes JMS session and creates JMS MessageProducer to be used for
* sending messages to a given destination. It extends BaseMessageSender which
@@ -45,7 +48,8 @@
private Session session = null;
private MessageProducer producer = null;
private String destinationName = null;
-
+ private ConcurrentHashMap producerMap = new ConcurrentHashMap();
+
public ActiveMQMessageSender(Connection aConnection,
List pendingMessageList, String aDestinationName,
BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
@@ -53,15 +57,28 @@
connection = aConnection;
destinationName = aDestinationName;
}
-
+ public MessageProducer getMessageProducer(Destination destination) throws Exception {
+ if ( producerMap.containsKey(destination))
+ {
+ return (MessageProducer) producerMap.get(destination);
+ }
+ createSession();
+ MessageProducer mProducer = session.createProducer(destination);
+ mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producerMap.put(destination, mProducer);
+ return mProducer;
+ }
+ private void createSession() throws Exception {
+ if ( session == null ) {
+ session = connection.createSession(false, 0);
+ }
+ }
/**
* Creates a jms session object used to instantiate message producer
*/
protected void initializeProducer() throws Exception {
- session = connection.createSession(false, 0);
- Queue producerDestination = session.createQueue(destinationName);
- producer = session.createProducer(producerDestination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ createSession();
+ producer = getMessageProducer(session.createQueue(destinationName));
}
/**
@@ -96,5 +113,6 @@
if (producer != null) {
producer.close();
}
+ producerMap.clear();
}
}
\ No newline at end of file
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Fri Aug 22 11:52:42 2008
@@ -54,6 +54,8 @@
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
import org.apache.uima.adapter.jms.service.Dd2spring;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceConfigurationException;
@@ -163,6 +165,7 @@
msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
msg.setJMSReplyTo(consumerDestination);
+
}
catch (Exception e)
{
@@ -308,7 +311,6 @@
// in pendingMessageList. Upon arrival, each message is removed from
// pendingMessageList and it is sent to a destination.
-//// Thread t = new Thread((BaseMessageSender)messageDispatcher); //.doStart();
t.start();
// Wait until the worker thread is fully initialized
synchronized( sender )
@@ -342,6 +344,7 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "initializeConsumer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_consumer_INFO", new Object[] { aBrokerURI, consumerDestination.getQueueName() });
consumer = consumerSession.createConsumer(consumerDestination);
consumer.setMessageListener(this);
+ System.out.println(">>>> Client Activated Temp Reply Queue:"+consumerDestination.getQueueName());
}
/**
* Initialize the uima ee client. Takes initialization parameters from the
@@ -446,6 +449,16 @@
}
asynchManager.initialize(casPoolSize, "ApplicationCasPoolContext", performanceTuningSettings);
+
+ // Create a special CasPool of size 1 to be used for deserializing CASes from a Cas Multiplier
+ if ( super.resourceMetadata != null && super.resourceMetadata instanceof AnalysisEngineMetaData )
+ {
+ if ( ((AnalysisEngineMetaData) super.resourceMetadata).getOperationalProperties().getOutputsNewCASes() )
+ {
+ // Create a Shadow CAS Pool used to de-serialize CASes produced by a CAS Multiplier
+ asynchManager.initialize(1, SHADOW_CAS_POOL, performanceTuningSettings);
+ }
+ }
initialized = true;
remoteService = true;
// running = true;
@@ -715,33 +728,47 @@
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.ReleaseCAS);
+ msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.ReleaseCAS);
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ msg.setJMSReplyTo(consumerDestination);
}
public void notifyOnInitializationFailure(Exception e) {
-
- // Initialization exception. Notify blocking thread and indicate a problem
- serviceInitializationException = true;
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
- synchronized(serviceMonitor)
- {
- serviceMonitor.notifyAll();
- }
-
+ notifyOnInitializationFailure(null, e);
}
public void notifyOnInitializationSuccess() {
- serviceInitializationCompleted = true;
- synchronized(serviceMonitor)
- {
- serviceMonitor.notifyAll();
- }
+ notifyOnInitializationSuccess(null);
}
- public void notifyOnTermination(String message) {
+ public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
+
+ // Initialization exception. Notify blocking thread and indicate a problem
+ serviceInitializationException = true;
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
+ synchronized(serviceMonitor)
+ {
+ serviceMonitor.notifyAll();
+ }
+
+ }
+
+ public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
+ serviceInitializationCompleted = true;
+ synchronized(serviceMonitor)
+ {
+ serviceMonitor.notifyAll();
+ }
+ }
+
+ public void notifyOnTermination(String message) {
}
+ protected MessageProducer getMessageProducer( Destination destination ) throws Exception
+ {
+ return sender.getMessageProducer(destination);
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java Fri Aug 22 11:52:42 2008
@@ -1,3 +1,4 @@
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -22,179 +23,244 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.net.BindException;
-import java.net.ServerSocket;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import java.io.InvalidClassException;
import org.apache.uima.UIMAFramework;
-import org.apache.uima.aae.controller.ControllerCallbackListener;
+import org.apache.uima.aae.jmx.monitor.BasicUimaJmxMonitorListener;
+import org.apache.uima.aae.jmx.monitor.JmxMonitor;
+import org.apache.uima.aae.jmx.monitor.JmxMonitorListener;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Level;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.support.FileSystemXmlApplicationContext;
-public class UIMA_Service implements ControllerCallbackListener
+public class UIMA_Service implements ApplicationListener
{
private static final Class CLASS_NAME = UIMA_Service.class;
- private BrokerService service;
- private TransportConnector connector;
- private TransportConnector stompConnector;
- private TransportConnector httpConnector;
- private Object semaphore = new Object();
protected boolean serviceInitializationCompleted;
protected boolean serviceInitializationException;
protected Object serviceMonitor = new Object();
-
- public void startInternalBroker() throws Exception
+ private JmxMonitor monitor = null;
+ private Thread monitorThread = null;
+
+ /**
+ * Parse command args, run dd2spring on the deployment descriptors to generate Spring context files.
+ *
+ * @param args - command line arguments
+ * @return - an array of Spring context files generated from provided deployment descriptors
+ * @throws Exception
+ */
+ public String[] initialize(String[] args) throws Exception
{
+ UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, "UIMA-AS version " + UIMAFramework.getVersionString());
+
+ String[] springConfigFileArray =
+ {};
+ String[] deployedDescriptors =
+ {};
+ String[] deploymentDescriptors =
+ {};
+ int nbrOfArgs = args.length;
- service = new BrokerService();
- String[] connectors = service.getNetworkConnectorURIs();
- String brokerURI;
- if (connectors != null)
+ deploymentDescriptors = getMultipleArg("-d", args);
+ if (deploymentDescriptors.length == 0)
{
- for (int i = 0; i < connectors.length; i++)
- {
- System.out.println("ActiveMQ Broker Started With Connector:" + connectors[i]);
- }
- brokerURI = service.getMasterConnectorURI();
+ // allow multiple args for one key
+ deploymentDescriptors = getMultipleArg2("-dd", args);
+ }
+ String saxonURL = getArg("-saxonURL", args);
+ String xslTransform = getArg("-xslt", args);
+ String uimaAsDebug = getArg("-uimaEeDebug", args);
+
+ if (nbrOfArgs < 1 || (args[0].startsWith("-") && (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform.equals(""))))
+ {
+ printUsageMessage();
+ return null;
+ }
+
+ if (deploymentDescriptors.length == 0)
+ {
+ // array of context files passed in
+ springConfigFileArray = args;
+ deployedDescriptors = args;
}
else
{
- String connectorList = "";
- service.setPersistent(false);
- service.setUseJmx(true);
- brokerURI = generateInternalURI("tcp", 18810, true, false);
- connector = service.addConnector(brokerURI);
- System.out.println("Adding TCP Connector:" + connector.getConnectUri());
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG", new Object[]
- { "Adding TCP Connector", connector.getConnectUri() });
+ // create a String array of spring context files
+ springConfigFileArray = new String[deploymentDescriptors.length];
+ deployedDescriptors = new String[deploymentDescriptors.length];
- connectorList = connector.getName();
- if (System.getProperty("StompSupport") != null)
- {
- String stompURI = generateInternalURI("stomp", 61613, false, false);
- stompConnector = service.addConnector(stompURI);
- System.out.println("Adding STOMP Connector:" + stompConnector.getConnectUri());
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG", new Object[]
- { "Adding STOMP Connector", stompConnector.getConnectUri() });
- connectorList += "," + stompConnector.getName();
- }
- if (System.getProperty("HTTP") != null)
+ Dd2spring aDd2Spring = new Dd2spring();
+ for (int dd = 0; dd < deploymentDescriptors.length; dd++)
{
- String stringPort = System.getProperty("HTTP");
- int p = Integer.parseInt(stringPort);
- String httpURI = generateInternalURI("http", p, false, true);
- httpConnector = service.addConnector(httpURI);
- System.out.println("Adding HTTP Connector:" + httpConnector.getConnectUri());
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG", new Object[]
- { "Adding HTTP Connector", httpConnector.getConnectUri() });
+ String deploymentDescriptor = deploymentDescriptors[dd];
+
+ File springConfigFile = aDd2Spring.convertDd2Spring(deploymentDescriptor, xslTransform, saxonURL, uimaAsDebug);
+
+ // if any are bad, fail
+ if (null == springConfigFile)
+ {
+ return null;
+ }
+ springConfigFileArray[dd] = springConfigFile.getAbsolutePath();
- connectorList += "," + httpConnector.getName();
+ // get the descriptor to register with the engine controller
+ String deployDescriptor = "";
+ File afile = null;
+ FileInputStream fis = null;
+ try
+ {
+ afile = new File(deploymentDescriptor);
+ fis = new FileInputStream(afile);
+ byte[] bytes = new byte[(int) afile.length()];
+ fis.read(bytes);
+ deployDescriptor = new String(bytes);
+ deployedDescriptors[dd] = deployDescriptor;
+ // Log Deployment Descriptor
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_deploy_desc__FINEST", new Object[]
+ { deployDescriptor });
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (fis != null)
+ {
+ try
+ {
+ fis.close();
+ }
+ catch (IOException e)
+ {
+ }
+ }
+ }
}
- service.start();
- System.setProperty("ActiveMQConnectors", connectorList);
- System.out.println("Broker Service Started - URL:" + service.getVmConnectorURI());
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_broker_started__CONFIG", new Object[]
- { service.getVmConnectorURI() });
}
+ return springConfigFileArray;
+
- // Allow the connector time to start
- synchronized (semaphore)
- {
- semaphore.wait(5000);
- }
- System.setProperty("jms.brokerUrl", brokerURI);
+
}
-
/**
- * Generates a unique port for the Network Connector that will be plugged
- * into the internal Broker. This connector externalizes the internal broker
- * so that remote delegates can reply back to the Aggregate. This method
- * tests port 18810 for availability and it fails increments the port by one
- * until a port is valid.
+ * Deploy Spring context files in a Spring Container.
*
- * @return - Broker URI with a unique port
+ * @param springContextFiles - array of Spring context files
+ *
+ * @throws Exception
*/
-
- private String generateInternalURI(String aProtocol, int aDefaultPort, boolean cacheURL, boolean oneTry) throws Exception
+ public void deploy( String[] springContextFiles ) throws Exception
{
- boolean success = false;
- int openPort = aDefaultPort;
- ServerSocket ssocket = null;
-
- while (!success)
+ SpringContainerDeployer springDeployer =
+ new SpringContainerDeployer();
+ // now try to deploy the array of spring context files
+ springDeployer.deploy(springContextFiles);
+ // Poll the deployer for the initialization status. Wait for either successful
+ // initialization or failure.
+ while( !springDeployer.isInitialized() && !springDeployer.initializationFailed())
{
- try
+ synchronized( springDeployer )
{
- ssocket = new ServerSocket(openPort);
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_port_available__CONFIG", new Object[]
- { openPort });
-
- String uri = aProtocol + "://" + ssocket.getInetAddress().getLocalHost().getCanonicalHostName() + ":" + openPort;
- success = true;
- if (cacheURL)
- {
- System.setProperty("BrokerURI", uri);
- }
- return uri;
+ springDeployer.wait(100);
}
- catch (BindException e)
+ }
+ // Check if the deployer failed
+ if ( springDeployer.initializationFailed() )
+ {
+ throw new ResourceInitializationException();
+ }
+ // Register this class to receive Spring container notifications. Specifically, looking
+ // for an even signaling the container termination. This is done so that we can stop
+ // the monitor thread
+ FileSystemXmlApplicationContext context = springDeployer.getSpringContext();
+ context.addApplicationListener(this);
+ }
+ /**
+ * Creates an instance of a {@link JmxMonitor}, initializes it with the JMX Server URI and
+ * checkpoint frequency, and finally starts the monitor.
+ *
+ * @param samplingFrequency - how often the JmxMonitor should checkpoint to fetch service metrics
+ *
+ * @throws Exception - error on monitor initialization or startup
+ */
+ public void startMonitor(long samplingFrequency) throws Exception
+ {
+ monitor = new JmxMonitor();
+
+ // Check if the monitor should run in the verbose mode. In this mode
+ // the monitor dumps JMX Server URI, and a list of UIMA-AS services
+ // found in that server. The default is to not show this info.
+ if ( System.getProperty("verbose") != null )
+ {
+ monitor.setVerbose();
+ }
+
+ // Use the URI provided in the first arg to connect to the JMX server.
+ // Also define sampling frequency. The monitor will collect the metrics
+ // at this interval.
+ String jmxServerPort = null;
+ // get the port of the JMX Server. This property can be set on the command line via -d
+ // OR it is set automatically by the code that creates an internal JMX Server. The latter
+ // is created in the {@link BaseAnalysisEngineController} constructor.
+ if ( ( jmxServerPort = System.getProperty("com.sun.management.jmxremote.port")) != null )
+ {
+ // parameter is set, compose the URI
+ String jmxServerURI = "service:jmx:rmi:///jndi/rmi://localhost:"+jmxServerPort+"/jmxrmi";
+ // Connect to the JMX Server, configure checkpoint frequency, create MBean proxies for
+ // UIMA-AS MBeans and service input queues
+ monitor.initialize(jmxServerURI, samplingFrequency);
+ // Create formatter listener
+ JmxMonitorListener listener = null;
+ String formatterListenerClass = null;
+ // Check if a custom monitor formatter listener class is provided. The user provides this
+ // formatter by adding a -Djmx.monitor.formatter=<class> parameter which specifies a class
+ // that implements {@link JmxMonitorListener} interface
+ if ( (formatterListenerClass = System.getProperty(JmxMonitor.FormatterListener)) != null )
{
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_port_not_available__CONFIG", new Object[]
- { openPort });
- if (oneTry)
+ Object object = null;
+ try
{
- System.out.println("Given port:" + openPort + " is not available for " + aProtocol);
- throw e;
+ // Instantiate the formatter listener class
+ Class formatterClass = Class.forName( formatterListenerClass );
+ object = formatterClass.newInstance();
}
- openPort++;
- }
- catch (Exception e)
- {
- e.printStackTrace();
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[]
- { JmsConstants.threadName(), e });
- if (oneTry)
+ catch( ClassNotFoundException e)
{
+ System.out.println("Class Not Found:"+formatterListenerClass+". Provide a Formatter Class Which Implements:org.apache.uima.aae.jmx.monitor.JmxMonitorListener");
throw e;
}
- }
- finally
- {
- try
+ if ( object instanceof JmxMonitorListener )
{
- if (ssocket != null)
- {
- ssocket.close();
- }
+ listener = (JmxMonitorListener)object;
}
- catch (IOException ioe)
+ else
{
+ throw new InvalidClassException("Invalid Monitor Formatter Class:"+formatterListenerClass+".The Monitor Requires a Formatter Which Implements:org.apache.uima.aae.jmx.monitor.JmxMonitorListener");
}
}
+ else
+ {
+ // The default formatter listener which logs to the UIMA log
+ listener = new BasicUimaJmxMonitorListener(monitor.getMaxServiceNameLength());
+ }
+ // Plug in the monitor listener
+ monitor.addJmxMonitorListener(listener);
+ // Create and start the monitor thread
+ monitorThread = new Thread(monitor);
+
+ // Start the monitor thread. It will run until the Spring container stops. When this happens
+ // the UIMA_Service receives notication via a {@code onApplicationEvent()} callback. There
+ // the monitor is stopped allowing the service to terminate.
+ monitorThread.start();
+ System.out.println(">>> Started JMX Monitor.\n\t>>> MBean Server Port:"+jmxServerPort+"\n\t>>> Monitor Checkpoint Frequency:"+samplingFrequency+"\n\t>>> Monitor Formatter Class:"+listener.getClass().getName());
}
- return null;
-
- }
-
- public ApplicationContext initialize(String[] springXmlConfigFiles) throws ResourceInitializationException
- {
- try
- {
- ApplicationContext ctx = new FileSystemXmlApplicationContext(springXmlConfigFiles);
- ((AbstractApplicationContext) ctx).registerShutdownHook();
- return ctx;
- }
- catch (Exception e)
- {
- throw new ResourceInitializationException(e);
- }
+
}
/**
@@ -287,46 +353,15 @@
System.out.println(" Arguments to the program are as follows : \n" + "-d path-to-UIMA-Deployment-Descriptor [-d path-to-UIMA-Deployment-Descriptor ...] \n" + "-saxon path-to-saxon.jar \n" + "-xslt path-to-dd2spring-xslt\n" + " or\n"
+ "path to Spring XML Configuration File which is the output of running dd2spring\n");
}
- protected void waitForServiceNotification() throws Exception {
-
- synchronized (serviceMonitor) {
- while (!serviceInitializationCompleted) {
- if (serviceInitializationException) {
- throw new ResourceInitializationException();
- }
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
-
- serviceMonitor.wait();
-
- if (serviceInitializationException) {
- throw new ResourceInitializationException();
- }
-
- }
- }
- }
- public void notifyOnInitializationFailure(Exception e) {
-
- // Initialization exception. Notify blocking thread and indicate a
- // problem
- serviceInitializationException = true;
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
- synchronized (serviceMonitor) {
- serviceMonitor.notifyAll();
- }
-
- }
-
- public void notifyOnInitializationSuccess() {
- serviceInitializationCompleted = true;
- synchronized (serviceMonitor) {
- serviceMonitor.notifyAll();
+ public void onApplicationEvent(ApplicationEvent event) {
+ if ( event instanceof ContextClosedEvent && monitor != null && monitor.isRunning() )
+ {
+ System.out.println("Stopping Monitor");
+ // Stop the monitor. The service has stopped
+ monitor.doStop();
}
}
- public void notifyOnTermination(String message) {
- }
-
/**
* The main routine for starting the deployment of a UIMA-AS instance. The
* args are either: 1 or more "paths" to Spring XML descriptors representing
@@ -346,100 +381,31 @@
*/
public static void main(String[] args)
{
- UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, "UIMA-AS version " + UIMAFramework.getVersionString());
-
- String[] springConfigFileArray =
- {};
- String[] deployedDescriptors =
- {};
- String[] deploymentDescriptors =
- {};
- int nbrOfArgs = args.length;
-
- deploymentDescriptors = getMultipleArg("-d", args);
- if (deploymentDescriptors.length == 0)
- {
- // allow multiple args for one key
- deploymentDescriptors = getMultipleArg2("-dd", args);
- }
- String saxonURL = getArg("-saxonURL", args);
- String xslTransform = getArg("-xslt", args);
- String uimaAsDebug = getArg("-uimaEeDebug", args);
-
- if (nbrOfArgs < 1 || (args[0].startsWith("-") && (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform.equals(""))))
- {
- printUsageMessage();
- return;
- }
-
- if (deploymentDescriptors.length == 0)
- {
- // array of context files passed in
- springConfigFileArray = args;
- deployedDescriptors = args;
- }
- else
+ try
{
- // create a String array of spring context files
- springConfigFileArray = new String[deploymentDescriptors.length];
- deployedDescriptors = new String[deploymentDescriptors.length];
-
- Dd2spring aDd2Spring = new Dd2spring();
- for (int dd = 0; dd < deploymentDescriptors.length; dd++)
+ UIMA_Service service = new UIMA_Service();
+ // parse command args and run dd2spring to generate spring context
+ // files from deployment descriptors
+ String contextFiles[] = service.initialize(args);
+ // If no context files generated there is nothing to do
+ if ( contextFiles == null )
{
- String deploymentDescriptor = deploymentDescriptors[dd];
-
- File springConfigFile = aDd2Spring.convertDd2Spring(deploymentDescriptor, xslTransform, saxonURL, uimaAsDebug);
-
- // if any are bad, fail
- if (null == springConfigFile)
- {
- return;
- }
- springConfigFileArray[dd] = springConfigFile.getAbsolutePath();
-
- // get the descriptor to register with the engine controller
- String deployDescriptor = "";
- File afile = null;
- FileInputStream fis = null;
- try
- {
- afile = new File(deploymentDescriptor);
- fis = new FileInputStream(afile);
- byte[] bytes = new byte[(int) afile.length()];
- fis.read(bytes);
- deployDescriptor = new String(bytes);
- deployedDescriptors[dd] = deployDescriptor;
- // Log Deployment Descriptor
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_deploy_desc__FINEST", new Object[]
- { deployDescriptor });
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- finally
- {
- if (fis != null)
- {
- try
- {
- fis.close();
- }
- catch (IOException e)
- {
- }
- }
- }
+ return;
+ }
+ // Deploy components defined in Spring context files. This method blocks until
+ // the container is fully initialized and all UIMA-AS components are succefully
+ // deployed.
+ service.deploy( contextFiles );
+ // Check if we should start an optional JMX-based monitor that will provide service metrics
+ // The monitor is enabled by existence of -Djmx.monitor.frequency=<number> parameter. By default
+ // the monitor is not enabled.
+ String monitorCheckpointFrequency;
+ if ( ( monitorCheckpointFrequency = System.getProperty(JmxMonitor.CheckpointFrequency)) != null)
+ {
+ // Found monitor checkpoint frequency parameter, configure and start the monitor.
+ // If the monitor fails to initialize the service is not effected.
+ service.startMonitor(Long.parseLong(monitorCheckpointFrequency));
}
- }
-
- // now try to deploy the array of spring context files
- SpringContainerDeployer springDeployer =
- new SpringContainerDeployer();
- try
- {
- springDeployer.deploy(springConfigFileArray);
}
catch( Exception e)
{