You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:52:43 UTC

svn commit: r688172 [1/2] - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src: main/java/org/apache/uima/adapter/jms/activemq/ main/java/org/apache/uima/adapter/jms/client/ main/java/org/apache/uima/adapter/jms/service/ test/java/org/apach...

Author: eae
Date: Fri Aug 22 11:52:42 2008
New Revision: 688172

URL: http://svn.apache.org/viewvc?rev=688172&view=rev
Log:
UIMA-1147 commit Jerry's work (patches) merging the post1st branch into the trunk

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/multiplier/SimpleCasGenerator.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/resources/descriptors/multiplier/SimpleCasGenerator.xml

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java Fri Aug 22 11:52:42 2008
@@ -23,11 +23,13 @@
 import java.net.BindException;
 import java.net.ServerSocket;
 
+import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
 //import org.apache.activemq.memory.UsageListener;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.adapter.jms.JmsConstants;
@@ -40,7 +42,7 @@
 {
 	private static final Class CLASS_NAME = BrokerDeployer.class;
     private static final int BASE_JMX_PORT = 1200;
-    private static final int MAX_PORT_THRESHOLD = 1400;
+    private static final int MAX_PORT_THRESHOLD = 200;
     
 	private static BrokerService service;
 	private Object semaphore = new Object();
@@ -104,16 +106,36 @@
 			
 			String connectorList = "";
 			service.setPersistent(false);
-			
 			int startPort = BASE_JMX_PORT;
+			if ( System.getProperties().containsKey("com.sun.management.jmxremote.port") )
+			{
+				startPort = Integer.parseInt(System.getProperty("com.sun.management.jmxremote.port"));
+				
+			}
 			while( startPort < MAX_PORT_THRESHOLD && !openPort(startPort))
 			{
 				startPort++;
 			}
-			if ( startPort < MAX_PORT_THRESHOLD )
+			if ( startPort < (startPort+MAX_PORT_THRESHOLD ) )
 			{
-				service.setUseJmx(true);
-				service.getManagementContext().setConnectorPort(startPort);
+				MBeanServer jmxServer = null;
+				//	Check if the MBeanServer is available. If it is, plug it into the
+				//	local Broker. We only need one MBeanServer in the JVM
+				if ( (jmxServer = ManagementContext.findTigerMBeanServer()) != null)
+				{
+					System.out.println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger.");
+					service.getManagementContext().setMBeanServer(jmxServer);
+					//	Specify JMX Port
+					service.getManagementContext().setConnectorPort(startPort);
+				}
+				else
+				{
+					service.getManagementContext().setConnectorPort(startPort);
+					service.setUseJmx(true);
+				}
+				
+				System.setProperty("com.sun.management.jmxremote.port", String.valueOf(startPort)); 
+
 				System.out.println("JMX Console connect URI:  service:jmx:rmi:///jndi/rmi://localhost:"+startPort+"/jmxrmi");
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
 	                    "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jmx_uri__CONFIG",

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Fri Aug 22 11:52:42 2008
@@ -27,6 +27,7 @@
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
@@ -34,7 +35,11 @@
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerListener;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.uima.UIMAFramework;
@@ -47,7 +52,7 @@
 import org.apache.uima.util.Level;
 import org.springframework.util.Assert;
 
-public class JmsEndpointConnection_impl
+public class JmsEndpointConnection_impl implements ConsumerListener
 {
 	private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
 
@@ -90,16 +95,15 @@
 		connectionMap = aConnectionMap;
 		serverUri = anEndpoint.getServerURI();
 		if ( anEndpoint.isReplyEndpoint() && 
-				 anEndpoint.getDestination() != null && 
-				 anEndpoint.getDestination() instanceof ActiveMQDestination )
-			{
-				endpoint = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName();
-			}
-			else
-			{
-				endpoint = anEndpoint.getEndpoint();
-			}
-		endpointName = endpoint;
+			 anEndpoint.getDestination() != null && 
+			 anEndpoint.getDestination() instanceof ActiveMQDestination )
+		{
+			endpoint = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName();
+		}
+		else
+		{
+			endpoint = anEndpoint.getEndpoint();
+		}
 		isReplyEndpoint = anEndpoint.isReplyEndpoint();
 		anEndpoint.remove();
 		delegateEndpoint = anEndpoint;
@@ -108,6 +112,7 @@
 	{
 		controller = aController;
 	}
+
 	public boolean isRetryEnabled()
 	{
 		return retryEnabled;
@@ -117,6 +122,7 @@
 	{
 		this.retryEnabled = retryEnabled;
 	}
+	
 
 	public boolean isOpen()
 	{
@@ -127,7 +133,7 @@
 		return ((ActiveMQSession) producerSession).isRunning();
 	}
 
-	public  void setInactivityTimeout(long aTimeout)
+	public void setInactivityTimeout(long aTimeout)
 	{
 		inactivityTimeout = aTimeout;
 	}
@@ -184,10 +190,6 @@
 				}
 			}
 			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-			// System.out.println("Creating New Connection To Endpoint::"+getEndpoint());
-			// ConsumerEventSource evSource = new ConsumerEventSource(conn, (ActiveMQDestination)destination);
-			// evSource.setConsumerListener(this);
-			// evSource.start();
 			conn.start();
 			if ( controller != null )
 			{
@@ -198,7 +200,6 @@
 			}
 			if ( controller != null && controller.getInputChannel() != null)
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connection_open_to_endpoint__FINE", new Object[] { controller.getComponentName(), getEndpoint(), serverUri });
-
 		}
 		catch ( Exception e)
 		{
@@ -208,10 +209,14 @@
 	}
 	public synchronized void open() throws AsynchAEException, ServiceShutdownException
 	{
+		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                "open", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open__FINE",
+                new Object[] { endpoint, serverUri });
 		if ( !connectionAborted )
 		{
 		    openChannel();
 		}
+
 	}
 
 	public synchronized void abort()
@@ -357,31 +362,28 @@
 	{
 		return connectionCreationTimestamp;
 	}
-	private synchronized void startTimer(long aConnectionCreationTimestamp, String componentName, String endpointName)
-	{
+    private synchronized void startTimer(long aConnectionCreationTimestamp)	
+    {
 		final long cachedConnectionCreationTimestamp = aConnectionCreationTimestamp;
-		final JmsEndpointConnection_impl __instance = this;
-		final String thisEndpoint = endpointName;  
 		Date timeToRun = new Date(System.currentTimeMillis() + inactivityTimeout);
 		if (timer != null)
 		{
 			timer.cancel();
 		}
-		if (componentName != null)
+		if (controller != null)
 		{
-			timer = new Timer("Controller:" + componentName + ":TimerThread-JmsEndpointConnection_impl:" + endpointName + ":" + System.nanoTime());
+			timer = new Timer("Controller:" + controller.getComponentName() + ":TimerThread-JmsEndpointConnection_impl:" + endpoint + ":" + System.nanoTime());
 		}
 		else
 		{
-			timer = new Timer("TimerThread-JmsEndpointConnection_impl:" + endpointName + ":" + System.nanoTime());
+			timer = new Timer("TimerThread-JmsEndpointConnection_impl:" + endpoint + ":" + System.nanoTime());
 		}
 		timer.schedule(new TimerTask() {
 
 			public void run()
 			{
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "startTimer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_inactivity_timer_expired_INFO", new Object[] { Thread.currentThread().getName(), inactivityTimeout, thisEndpoint });
-//					if (connectionCreationTimestamp <= cachedConnectionCreationTimestamp)
-					if (getConnectionCreationTimeout() <= cachedConnectionCreationTimestamp)
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "startTimer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_inactivity_timer_expired_INFO", new Object[] { Thread.currentThread().getName(), inactivityTimeout, endpoint });
+					if (connectionCreationTimestamp <= cachedConnectionCreationTimestamp)
 					{
 						try
 						{
@@ -395,8 +397,6 @@
 							}
 						}
 					}
-//					timer.cancel();
-//					timer.purge();
 					cancelTimer();
 			}
 		}, timeToRun);
@@ -416,11 +416,6 @@
 		{
 			timer = null;
 		}
-//		if (timer != null)
-//		{
-//			timer.cancel();
-//			timer = null;
-//		}
 	}
 
 	public synchronized boolean send(Message aMessage, boolean startTimer) //throws AsynchAEException
@@ -455,7 +450,7 @@
 					}
 				if (startTimer)
 				{
-					startTimer(connectionCreationTimestamp, controller.getComponentName(), endpointName);
+					startTimer(connectionCreationTimestamp);
 				}
 				lastException = null;
 				return true;
@@ -464,6 +459,11 @@
 			{
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_ableto_send_msg_INFO", new Object[] { controller.getComponentName(), destinationName, i+1, 10 });
 				lastException = e;
+				//	If the controller has been stopped no need to send messages
+				if ( controller.isStopped())
+				{
+					return true;
+				}
 			}
 			try
 			{
@@ -481,6 +481,13 @@
 		return false;
 	}
 
+	public void onConsumerEvent(ConsumerEvent arg0)
+	{
+		if (controller != null)
+		{
+			controller.handleDelegateLifeCycleEvent(getEndpoint(), arg0.getConsumerCount());
+		}
+	}
 
 	protected synchronized void finalize() throws Throwable
 	{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Fri Aug 22 11:52:42 2008
@@ -24,10 +24,15 @@
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
+import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 
+import mx4j.tools.adaptor.http.GetAttributeCommandProcessor;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.InputChannel;
@@ -40,6 +45,7 @@
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.message.JmsMessageContext;
 import org.apache.uima.util.Level;
@@ -86,6 +92,8 @@
 	
 	private List listenerContainerList = new ArrayList();
 	
+	private Object mux = new Object();
+	
 	public AnalysisEngineController getController()
 	{
 		return controller;
@@ -174,6 +182,32 @@
 		}
 		return false;
 	}
+	private boolean isRemoteRequest( Message aMessage ) throws Exception
+	{
+		
+		//	Dont do checkpoints if a message was sent from a Cas Multiplier
+		if ( aMessage.propertyExists(AsynchAEMessage.CasSequence))
+		{
+			return false;
+		}
+
+		Map properties = ((ActiveMQMessage)aMessage).getProperties();
+		if ( properties.containsKey(AsynchAEMessage.MessageType) &&
+			 properties.containsKey(AsynchAEMessage.Command) &&
+			 properties.containsKey(UIMAMessage.ServerURI))
+		{
+			int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+			int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+			boolean isRemote = aMessage.getStringProperty(UIMAMessage.ServerURI).startsWith("vm") == false;
+			if ( isRemote && msgType == AsynchAEMessage.Request &&
+					(command == AsynchAEMessage.Process ||
+					 command == AsynchAEMessage.CollectionProcessComplete) )
+			{
+				return true;
+			}
+		}
+		return false;
+	}
 	
 	/**
 	 * Validate command contained in the header of the JMS Message
@@ -265,7 +299,11 @@
 	{
 		int command = aMessage.getIntProperty(AsynchAEMessage.Command);
 		int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
-
+		if ( isStopped() || getController() == null || getController().getInProcessCache() == null )
+		{
+			//	Shutting down 
+			return true;
+		}
 		if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response )
 		{
 			String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
@@ -398,32 +436,33 @@
 		return false;
 	}
 	
-	private synchronized long computeIdleTime()
+	private boolean isCheckpointWorthy( Message aMessage ) throws Exception
 	{
-		try
+		synchronized( mux )
 		{
-			boolean isAggregate = getController() instanceof AggregateAnalysisEngineController;
-			if ( isAggregate || !((PrimitiveAnalysisEngineController)getController()).isMultiplier() )
+			//	Dont do checkpoints if a message was sent from a Cas Multiplier
+			if ( aMessage.propertyExists(AsynchAEMessage.CasSequence))
 			{
-				long lastReplyTime = getController().getReplyTime();
-				if ( lastReplyTime > 0 )
+				return false;
+			}
+			Map properties = ((ActiveMQMessage)aMessage).getProperties();
+			if ( properties.containsKey(AsynchAEMessage.MessageType) &&
+				 properties.containsKey(AsynchAEMessage.Command) &&
+				 properties.containsKey(UIMAMessage.ServerURI))
+			{
+				int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+				int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+				if ( msgType == AsynchAEMessage.Request &&
+						(command == AsynchAEMessage.Process ||
+						 command == AsynchAEMessage.CollectionProcessComplete) )
 				{
-					long t = System.nanoTime();
-					long delta = t-(long)lastReplyTime;
-					getController().saveIdleTime(delta, "", true);
-					return delta;
+					return true;
 				}
 			}
+			return false;
+			
 		}
-		catch( Exception e)
-		{
-			e.printStackTrace();
-		}
-		return 0;
 	}
-
-	
-	
 	/**
 	 * Receives Messages from the JMS Provider. It checks the message header
 	 * to determine the type of message received. Based on the type,
@@ -436,38 +475,49 @@
 	 */
 	public void onMessage(Message aMessage, Session aJmsSession )
 	{
+		if ( isStopped() )
+		{
+			return;
+		}
+		
+		
 		try
 		{
 			//	wait until message handlers are plugged in
 			msgHandlerLatch.await();
 		}
 		catch( InterruptedException e) {}
-		
 		try
 		{
 			//	wait until the controller is plugged in
 			controllerLatch.await();
 		}
 		catch( InterruptedException e) {}
+		long idleTime = 0;
 
+		boolean doCheckpoint = false;
+		
+		String eN = endpointName;
+		if ( getController() != null )
+		{
+			eN = getController().getComponentName();
+			if (eN == null )
+			{
+				eN = "";
+			}
+		}
 		
+
 		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                 "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_msg__FINE",
-                new Object[] { endpointName });
+                new Object[] { eN });
 		JmsMessageContext messageContext = null;
-		long idleTime = 0;
+
+		int requestType = 0;
 		try
 		{
-			if ( isProcessRequest(aMessage) )
-			{
-				//	Compute the time between waiting for this request and update
-				//	this service performance stats.
-				idleTime = computeIdleTime();
-			}
-
 			//	Wrap JMS Message in MessageContext
 			messageContext = new JmsMessageContext( aMessage, endpointName );
-			messageContext.getEndpoint().setIdleTime(idleTime);
 			if ( jmsSession == null )
 			{
 				jmsSession = aJmsSession;
@@ -502,9 +552,6 @@
 			}
 			if ( validMessage(aMessage) )
 			{
-
-				
-				
 				String command = decodeIntToString(AsynchAEMessage.Command, aMessage.getIntProperty(AsynchAEMessage.Command) );
 				String messageType =  decodeIntToString(AsynchAEMessage.MessageType, aMessage.getIntProperty(AsynchAEMessage.MessageType) );
 				if ( ackMessageNow(aMessage))
@@ -541,8 +588,34 @@
 			                    new Object[] { controller.getComponentName(), msgFrom, messageType, command, casRefId });
 					}
 				}
+				else
+				{
+				}
 				//	Delegate processing of the message contained in the MessageContext to the
 				//	chain of handlers
+				try
+				{
+					if ( isRemoteRequest( aMessage ))
+					{
+						//	Compute the idle time waiting for this request 
+						idleTime = getController().getIdleTime();
+						
+						//	This idle time is reported to the client thus save it in the endpoint
+						//	object. This value will be fetched and added to the outgoing reply.
+						messageContext.getEndpoint().setIdleTime(idleTime);
+					}
+				}
+				catch( Exception e ) {}
+
+				//	Determine if this message is a request and either GetMeta, CPC, or Process
+				doCheckpoint = isCheckpointWorthy( aMessage );
+				requestType = aMessage.getIntProperty(AsynchAEMessage.Command);
+				//	Checkpoint
+				if ( doCheckpoint )
+				{
+					getController().beginProcess(requestType);
+				}
+
 				
 				if ( handler != null )
 				{
@@ -563,6 +636,14 @@
 
 			controller.getErrorHandlerChain().handle(t, HandlerBase.populateErrorContext( messageContext ), controller);			
 		}
+		finally
+		{
+			//	Call the end checkpoint for non-aggregates. For primitives the CAS has been fully processed if we are here
+			if ( doCheckpoint && getController() instanceof PrimitiveAnalysisEngineController )
+			{
+				getController().endProcess(requestType);
+			}
+		}
 	}
 	
 	public int getSessionAckMode()
@@ -575,10 +656,11 @@
 	}
 	public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer messageListener)
 	{
-			this.messageListener = messageListener;
-			System.setProperty("BrokerURI", messageListener.getBrokerUrl());
-			brokerURL = messageListener.getBrokerUrl();
-			listenerContainerList.add(messageListener);
+		this.messageListener = messageListener;
+		System.setProperty("BrokerURI", messageListener.getBrokerUrl());
+		brokerURL = messageListener.getBrokerUrl();
+		listenerContainerList.add(messageListener);
+		this.messageListener = messageListener;
 		if ( getController() != null )
 		{
 			try
@@ -588,6 +670,17 @@
 			} catch( Exception e) {}
 		}
 	}
+	public ActiveMQConnectionFactory getConnectionFactory()
+	{
+		if (messageListener == null )
+		{
+			return null;
+		}
+		else
+		{
+			return (ActiveMQConnectionFactory)messageListener.getConnectionFactory();
+		}
+	}
 	public void ackMessage( MessageContext aMessageContext )
 	{
 		if ( aMessageContext != null && sessionAckMode == Session.CLIENT_ACKNOWLEDGE )
@@ -680,22 +773,5 @@
 	{
 		return stopped;
 	}
-/*	
-	private void spinThreadForListenerShutdown(final UimaDefaultMessageListenerContainer listenerContainer)
-	{
-		new Thread("Shutdown Thread For Listener:"+listenerContainer.getEndpointName()) {
-			public void run()
-			{
-				try
-				{
-					listenerContainer.stop();
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
-			                "spinThreadForListenerShutdown.run()", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stop_listener__INFO",
-			                new Object[] {  listenerContainer.getEndpointName() });
-				}
-				catch( Exception e) { e.printStackTrace();}
-			}
-		}.start();
-	}
-*/	
+
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Fri Aug 22 11:52:42 2008
@@ -22,7 +22,9 @@
 import java.io.ByteArrayOutputStream;
 import java.net.ConnectException;
 import java.net.InetAddress;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -50,6 +52,7 @@
 import org.apache.uima.aae.error.ErrorContext;
 import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.aae.error.UimaEEServiceException;
+import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.jmx.ServicePerformance;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.UIMAMessage;
@@ -88,6 +91,7 @@
 
 	private boolean aborting = false;
 	
+	private Destination freeCASTempQueue;
 	/**
 	 * Sets the ActiveMQ Broker URI 
 	 */
@@ -95,6 +99,10 @@
 	{
 		serverURI = aServerURI;
 	}
+	protected void setFreeCasQueue( Destination destination)
+	{
+		freeCASTempQueue = destination;
+	}
 	public String getServerURI()
 	{
 		return System.getProperty("BrokerURI");
@@ -165,7 +173,7 @@
 	public String serializeCAS(boolean isReply, CAS aCAS, String aCasReferenceId, String aSerializerKey) throws Exception
 	{
 		
-		long start = System.nanoTime();
+		long start = getAnalysisEngineController().getCpuTime();
 		
 		String serializedCas = null;
 		
@@ -187,10 +195,10 @@
 			else
 			{
 				serSharedData = cacheEntry.getDeserSharedData();//new XmiSerializationSharedData();
-        if (serSharedData == null) {
-          serSharedData = new XmiSerializationSharedData();
-          cacheEntry.setXmiSerializationData(serSharedData);
-        }
+				if (serSharedData == null) {
+					serSharedData = new XmiSerializationSharedData();
+					cacheEntry.setXmiSerializationData(serSharedData);
+				}
 			    serializedCas = UimaSerializer.serializeCasToXmi(aCAS, serSharedData);
 			    int maxOutgoingXmiId = serSharedData.getMaxXmiId();				
 				//	Save High Water Mark in case a merge is needed
@@ -229,9 +237,8 @@
 		LongNumericStatistic statistic;
 		if ( (statistic = getAnalysisEngineController().getMonitor().getLongNumericStatistic("",Monitor.TotalSerializeTime)) != null )
 		{
-			statistic.increment(System.nanoTime() - start);
+			statistic.increment(getAnalysisEngineController().getCpuTime() - start);
 		}
-
 		
 		return serializedCas;
 	}
@@ -386,9 +393,10 @@
 			endpointConnection.send(tm, true);
 			if ( aCommand == AsynchAEMessage.ReleaseCAS )
 			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "collectionProcessComplete", 
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest", 
 						JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_release_cas_req__FINE", new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),aCasReferenceId });
 			}
+				
 		}
 		catch( JMSException e)
 		{
@@ -423,18 +431,20 @@
 		{
 			JmsEndpointConnection_impl endpointConnection = 
 				getEndpointConnection(anEndpoint);
-
+			
 			TextMessage tm = endpointConnection.produceTextMessage(null);
 			tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None); 
 			tm.setText("");    // Need this to prevent the Broker from throwing an exception when sending a message to C++ service
 			
 			populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
-/*
-			if ( anEndpoint.getServerURI().toLowerCase().startsWith("http") )
+			
+			//	For remotes add a special property to the message. This property
+			//	will be echoed back by the service. This property enables matching
+			//	the reply with the right endpoint object managed by the aggregate.
+			if ( anEndpoint.isRemote() )
 			{
-				tm.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true);
+				tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
 			}
-*/
 			boolean startTimer = false;
 			//	Start timer for endpoints that are remote and are managed by a different broker
 			//	than this service. If an endpoint contains a destination object, the outgoing
@@ -462,6 +472,34 @@
 	                    "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_releasecas_request__endpoint__FINEST",
 	                    new Object[] {getAnalysisEngineController().getName(), endpointConnection.getEndpoint()});
 			}
+			else if ( aCommand == AsynchAEMessage.GetMeta )
+			{
+				if ( anEndpoint.getDestination() != null )
+				{
+					String replyQueueName = ((ActiveMQDestination)anEndpoint.getDestination()).getPhysicalName().replaceAll(":","_");
+					if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController )
+					{
+						String delegateKey =
+							((AggregateAnalysisEngineController)getAnalysisEngineController()).lookUpDelegateKey(anEndpoint.getEndpoint());
+						ServiceInfo serviceInfo =((AggregateAnalysisEngineController)getAnalysisEngineController()).getDelegateServiceInfo(delegateKey);
+						if (serviceInfo != null )
+						{
+							serviceInfo.setReplyQueueName(replyQueueName);
+							serviceInfo.setServiceKey(delegateKey);
+							System.out.println("Service:"+delegateKey+" Directed to Reply To:"+replyQueueName);
+						}
+					}
+				}
+				else if ( !anEndpoint.isRemote())
+				{
+					ServiceInfo serviceInfo =((AggregateAnalysisEngineController)getAnalysisEngineController()).getServiceInfo();
+					if (serviceInfo != null )
+					{
+						serviceInfo.setReplyQueueName(controllerInputEndpoint);
+						System.out.println("Aggregate Service Reply Queue:"+getAnalysisEngineController().getComponentName()+" Reply To:"+controllerInputEndpoint);
+					}
+				}
+			}
 			else 
 			{
 	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -494,26 +532,7 @@
 		{
 			if (anEndpoint.isRemote())
 			{
-				long t1 = System.nanoTime();
 				String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled());
-/*				
-				if ( analysisEngineController instanceof AggregateAnalysisEngineController )
-				{
-					String delegateKey
-					 	= ((AggregateAnalysisEngineController)analysisEngineController).
-					 		lookUpDelegateKey(anEndpoint.getEndpoint());
-					if ( delegateKey != null)
-					{
-						long timeToSerialize = System.nanoTime() - t1;
-//						((AggregateAnalysisEngineController)analysisEngineController).
-//							incrementCasSerializationTime(delegateKey, timeToSerialize);
-						
-						analysisEngineController.
-							getServicePerformance().
-								incrementCasSerializationTime(timeToSerialize);
-					}
-				}
-*/
 				if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
 				{
 		            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -589,13 +608,8 @@
 				}
 			}
 		}
-//		catch ( AsynchAEException e)
-//		{
-//			throw e;
-//		}
 		catch ( Exception e)
 		{
-//			throw new AsynchAEException(e);
 			// Handle the error
 			ErrorContext errorContext = new ErrorContext();
 			errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
@@ -614,9 +628,6 @@
 			{
 				//	Serializes CAS and releases it back to CAS Pool
 				String serializedCAS = getSerializedCas(true, aNewCasReferenceId, anEndpoint, anEndpoint.isRetryEnabled());
-
-				
-				
 				sendCasToRemoteEndpoint(false, serializedCAS, anInputCasReferenceId, aNewCasReferenceId, anEndpoint, false, sequence);
 			}
 			else
@@ -643,6 +654,41 @@
 		
 	}
 	
+	public void sendReply( CacheEntry entry, Endpoint anEndpoint ) throws AsynchAEException
+	{
+		try
+		{
+			anEndpoint.setReplyEndpoint(true);
+			if ( anEndpoint.isRemote() )
+			{
+				//	Serializes CAS and releases it back to CAS Pool
+				String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled());
+				sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
+			}
+			else
+			{
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                    "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_reply_with_sequence__FINE",
+	                    new Object[] { anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getCasSequence() });
+				sendCasToCollocatedDelegate(false, entry.getInputCasReferenceId(), entry.getCasReferenceId(), anEndpoint, false, entry.getCasSequence());
+			}
+		}
+		catch( ServiceShutdownException e)
+		{
+			e.printStackTrace();
+		}
+		catch (AsynchAEException e)
+		{
+			throw e;
+		}
+		
+		catch (Exception e)
+		{
+			throw new AsynchAEException(e);
+		}
+		
+	}
+
 	public void sendReply( int aCommand, Endpoint anEndpoint ) throws AsynchAEException
 	{
 		anEndpoint.setReplyEndpoint(true);
@@ -670,7 +716,7 @@
 		{
 			//	Unable to establish connection to the endpoint. Logit and continue
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
+                    "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
                     new Object[] { e});
 			
 		}
@@ -798,7 +844,7 @@
 		{
 			//	Unable to establish connection to the endpoint. Logit and continue
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                    "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+                    "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
                     new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
 		}
 		catch( ServiceShutdownException e)
@@ -808,7 +854,7 @@
 		catch (AsynchAEException e)
 		{
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                    "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+                    "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
                     new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
 		}
 		catch (Exception e)
@@ -916,7 +962,7 @@
 			else
 			{
 				CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-				long t1 = System.nanoTime();
+				long t1 = getAnalysisEngineController().getCpuTime();
 				//	Serialize CAS for remote Delegates
 				String serializer = anEndpoint.getSerializer();
 				if ( serializer == null || serializer.trim().length() == 0)
@@ -924,7 +970,10 @@
 					serializer = "xmi";
 				}
 				serializedCAS = serializeCAS(isReply, cas, aCasReferenceId, serializer);
-				long timeToSerializeCas = System.nanoTime()-t1;
+				long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1;
+				
+				getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
+				
 				entry.incrementTimeToSerializeCAS(timeToSerializeCas);
 				casStats.incrementCasSerializationTime(timeToSerializeCas);
 				getAnalysisEngineController().getServicePerformance().
@@ -949,37 +998,6 @@
 		CAS cas = null;
 		try
 		{
-			
-			
-/*			
-			String serializedCAS = null;
-			//	Using Cas reference Id retrieve CAS from the shared Cash
-			cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
-
-			if ( cas == null )
-			{
-				serializedCAS = getAnalysisEngineController().getInProcessCache().getSerializedCAS( aCasReferenceId );
-			}
-			else
-			{
-				CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-				long t1 = System.nanoTime();
-				//	Serialize CAS for remote Delegates
-				String serializer = anEndpoint.getSerializer();
-				if ( serializer == null || serializer.trim().length() == 0)
-				{
-					serializer = "xmi";
-				}
-				serializedCAS = serializeCAS(isReply, cas, aCasReferenceId, serializer);
-				entry.incrementTimeToSerializeCAS(System.nanoTime()-t1);
-				if ( cacheSerializedCas )
-				{
-					getAnalysisEngineController().getInProcessCache().saveSerializedCAS(aCasReferenceId, serializedCAS);
-				}
-			}
-			return serializedCAS;
-*/			
-
 			return getSerializedCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
 		}
 		catch( Exception e)
@@ -1010,7 +1028,6 @@
 	{
 		if ( anEndpoint.isFinal() )
 		{
-			//aTextMessage.setLongProperty(AsynchAEMessage.TotalTimeSpentInAnalytic, (System.nanoTime()-anEndpoint.getEntryTime()));
 			aTextMessage.setLongProperty("SENT-TIME", System.nanoTime());
 		}
 
@@ -1028,14 +1045,13 @@
 					getAnalysisEngineController().getCasStatistics(aCasReferenceId);
 				
 				aTextMessage.setLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats.getRawCasSerializationTime());
-//					aTextMessage.setLongProperty(AsynchAEMessage.TimeWaitingForCAS, entry.getTimeWaitingForCAS());
 				aTextMessage.setLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats.getRawCasDeserializationTime());
 				aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
-				aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, anEndpoint.getIdleTime() );
-				
-				String lookupKey = getAnalysisEngineController().getName();//getInProcessCache().getMessageAccessorByReference(aCasReferenceId).getEndpointName();
+				long iT =getAnalysisEngineController().getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process); 
+				aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT );
+				String lookupKey = getAnalysisEngineController().getName();
 				long arrivalTime = getAnalysisEngineController().getTime( aCasReferenceId, lookupKey); //serviceInputEndpoint);
-				long timeInService = System.nanoTime()-arrivalTime;
+				long timeInService = getAnalysisEngineController().getCpuTime()-arrivalTime;
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
 	                    "populateStats", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timein_service__FINEST",
 	                    new Object[] { serviceInputEndpoint, (double)timeInService/(double)1000000 });
@@ -1109,7 +1125,7 @@
 	                    "populateHeaderWithRequestContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
 	                    new Object[] {getAnalysisEngineController().getComponentName(), anEndpoint.getServerURI(), anEndpoint.getEndpoint()});
 			}
-			else
+			else // collocated
 			{
 				aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
 			}
@@ -1142,6 +1158,10 @@
 			{
 				aMessage.setStringProperty(AsynchAEMessage.ServerIP,hostIP);
 			}
+			if ( anEndpoint.getEndpointServer() != null )
+			{
+				aMessage.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getEndpointServer());
+			}
 		}
 		else
 		{
@@ -1291,17 +1311,21 @@
 				//	produced by the CAS Multiplier. The client will treat this CAS
 				//	differently from the input CAS. 
 				tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-/*
-				//	Add the top ancestor of this CAS. 
-				addTopCASParentReferenceId(tm, anInputCasReferenceId);
-*/
 				tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
 				//	Add a sequence number assigned to this CAS by the controller
 				tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
+				isRequest = true;
+				//	Add the name of the FreeCas Queue
+//				if ( secondaryInputEndpoint != null )
+//				{
+//					tm.setStringProperty(AsynchAEMessage.MessageFrom, secondaryInputEndpoint);
+//				}
 				
-				if ( secondaryInputEndpoint != null )
+				if ( freeCASTempQueue != null )
 				{
-					tm.setStringProperty(AsynchAEMessage.MessageFrom, secondaryInputEndpoint);
+					//	Attach a temp queue to the outgoing message. This a queue where
+					//  Free CAS notifications need to be sent from the client
+					tm.setJMSReplyTo(freeCASTempQueue);
 				}
 				if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
 				{
@@ -1309,7 +1333,7 @@
 					if ( cacheEntry != null )
 					{
 						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-			                    "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+			                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
 			                    new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId, cacheEntry.getInputCasReferenceId() });
 					}
 				}
@@ -1344,11 +1368,140 @@
 			// ----------------------------------------------------
 			endpointConnection.send(tm, startConnectionTimer);
 
-			if ( getAnalysisEngineController().isTopLevelComponent() )
+//			if ( getAnalysisEngineController().isTopLevelComponent() )
+//			{
+//				getAnalysisEngineController().getInProcessCache().dumpContents(getAnalysisEngineController().getComponentName());
+//			}
+			if ( !isRequest )
 			{
-				getAnalysisEngineController().getInProcessCache().dumpContents();
+				addIdleTime(tm);
+			}
+		}
+		catch( JMSException e)
+		{
+			//	Unable to establish connection to the endpoint. Logit and continue
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                    "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+                    new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
+			
+		}
+
+		catch( ServiceShutdownException e)
+		{
+			throw e;
+		}
+		catch( AsynchAEException e)
+		{
+			throw e;
+		}
+		catch( Exception e)
+		{
+			throw new AsynchAEException(e);
+		}
+		
+	}
+	
+	private void sendCasToRemoteEndpoint( boolean isRequest, String aSerializedCAS, CacheEntry entry,  Endpoint anEndpoint, boolean startTimer ) 
+	throws AsynchAEException, ServiceShutdownException
+	{
+		
+		try
+		{
+			if ( aborting )
+			{
+				return;
+			}
+			//	Get the connection object for a given endpoint
+			JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+			//	Create empty JMS Text Message
+			TextMessage tm = endpointConnection.produceTextMessage("");
+			
+			//	Save Serialized CAS in case we need to re-send it for analysis
+			if ( anEndpoint.isRetryEnabled() && getAnalysisEngineController().getInProcessCache().getSerializedCAS(entry.getCasReferenceId()) == null)
+			{
+				getAnalysisEngineController().getInProcessCache().saveSerializedCAS(entry.getCasReferenceId(), aSerializedCAS);
+			}
+
+			tm.setText(aSerializedCAS);
+			tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload); 
+			//	Add Cas Reference Id to the outgoing JMS Header
+			tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
+			//	Add common properties to the JMS Header
+			if ( isRequest == true )
+			{
+				populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process); 
+			}
+			else
+			{
+				populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
+			}
+			//	The following is true when the analytic is a CAS Multiplier
+			if ( entry.isSubordinate() && !isRequest )
+			{
+				//	Override MessageType set in the populateHeaderWithContext above.
+				//	Make the reply message look like a request. This message will contain a new CAS 
+				//	produced by the CAS Multiplier. The client will treat this CAS
+				//	differently from the input CAS. 
+				tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+
+				isRequest = true;
+				//	Save the id of the parent CAS
+				tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry.getCasReferenceId()));
+				//	Add a sequence number assigned to this CAS by the controller
+				tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
+				//	If this is a Cas Multiplier, add a reference to a special queue where
+				//	the client sends Free Cas Notifications
+				if ( freeCASTempQueue != null )
+				{
+					//	Attach a temp queue to the outgoing message. This is a queue where
+					//  Free CAS notifications need to be sent from the client
+					tm.setJMSReplyTo(freeCASTempQueue);
+				}
+				if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
+				{
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+		                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
+		                    new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
+				}
+			}
+
+			//	Add stats
+			populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
+			if ( startTimer)
+			{
+				//	Start a timer for this request. The amount of time to wait
+				//	for response is provided in configuration for the endpoint
+				anEndpoint.startProcessRequestTimer(entry.getCasReferenceId());
+			}
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                    "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE",
+                    new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
+
+			//	By default start a timer associated with a connection to the endpoint. Once a connection is established with an
+			//	endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval
+			//	the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when
+			//	managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied
+			//	by anEndpoint.getDestination != null, we dont start the timer.
+			boolean startConnectionTimer = true;
+			
+			if ( anEndpoint.getDestination() != null || !isRequest )
+			{
+				startConnectionTimer = false;
+			}
+			
+			// ----------------------------------------------------
+			//	Send Request Messsage to the Endpoint
+			// ----------------------------------------------------
+			endpointConnection.send(tm, startConnectionTimer);
+
+//			if ( getAnalysisEngineController().isTopLevelComponent() )
+//			{
+//				getAnalysisEngineController().getInProcessCache().dumpContents(getAnalysisEngineController().getComponentName());
+//			}
+			if ( !isRequest )
+			{
+				addIdleTime(tm);
 			}
-			addIdleTime(tm);
 		}
 		catch( JMSException e)
 		{
@@ -1373,6 +1526,24 @@
 		}
 		
 	}
+	
+	private String getTopParentCasReferenceId( String casReferenceId ) throws Exception
+	{
+		if ( !getAnalysisEngineController().getInProcessCache().entryExists(casReferenceId) )
+		{
+			return null;
+		}
+		
+		CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(casReferenceId); 
+		if ( entry.isSubordinate() )
+		{
+			//	Recurse until the top CAS reference Id is found
+			return getTopParentCasReferenceId(entry.getInputCasReferenceId());
+		}
+		//	Return the top ancestor CAS id
+		return entry.getCasReferenceId();
+	}
+	
 	private boolean isProcessReply( Message aMessage )
 	{
 		try
@@ -1389,14 +1560,14 @@
 	private void addIdleTime( Message aMessage )
 	{
 
-
+/*
 		if ( isProcessReply(aMessage ) && 
 			 ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController || 
-			   !((PrimitiveAnalysisEngineController)getAnalysisEngineController()).isMultiplier() ) )
-			{
-				long t = System.nanoTime();
-				getAnalysisEngineController().saveReplyTime(t, "");
-			}
+			   !getAnalysisEngineController().isCasMultiplier() ) )
+
+*/
+		long t = System.nanoTime();
+		getAnalysisEngineController().saveReplyTime(t, "");
 	}
 	private void sendCasToCollocatedDelegate(boolean isRequest, String anInputCasReferenceId, String aNewCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence) 
 	throws AsynchAEException, ServiceShutdownException
@@ -1449,6 +1620,8 @@
 				//	produced by the CAS Multiplier. The client will treat this CAS
 				//	differently from the input CAS. 
 				tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+				isRequest = true;
+
 				if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
 				{
 					CacheEntry cacheEntry = getCacheEntry(aNewCasReferenceId);
@@ -1485,7 +1658,10 @@
 			//	Send Request Messsage to Delegate
 			// ----------------------------------------------------
 			endpointConnection.send(tm, startTimer);
-			addIdleTime(tm);
+			if ( !isRequest )
+			{
+				addIdleTime(tm);
+			}
 			
 		}
 		catch( JMSException e)

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java Fri Aug 22 11:52:42 2008
@@ -24,6 +24,8 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIDGenerator;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
@@ -37,6 +39,8 @@
 import org.apache.uima.util.Level;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.springframework.jms.support.destination.DestinationResolver;
+
 
 public class SpringContainerDeployer implements ControllerCallbackListener {
 	private static final Class CLASS_NAME = SpringContainerDeployer.class;
@@ -45,19 +49,101 @@
 	private boolean serviceInitializationException;
 	private Object serviceMonitor = new Object();
 	private ConcurrentHashMap springContainerRegistry=null;
-	
+	private FileSystemXmlApplicationContext context = null;
 	public SpringContainerDeployer(){
 	}
 
 	public SpringContainerDeployer( ConcurrentHashMap aSpringContainerRegistry )	{
 		springContainerRegistry = aSpringContainerRegistry;
 	}
+	private UimaDefaultMessageListenerContainer produceListenerConnector(ActiveMQConnectionFactory cf)
+	{
+		DestinationResolver resolver = new TempDestinationResolver();
+		UimaDefaultMessageListenerContainer connector = 
+			new UimaDefaultMessageListenerContainer(true);
+		
+		connector.setConnectionFactory(cf);
+		connector.setConcurrentConsumers(1);
+		connector.setDestinationResolver(resolver);
+		connector.initialize();
+		connector.start();
 	
+		while( connector.getListenerEndpoint() == null )
+		{
+			synchronized(connector)
+			{
+				try
+				{
+					connector.wait(50);
+				}
+				catch( InterruptedException e) {}
+			}
+		}
+		return connector;
+	}
+	private ActiveMQConnectionFactory getTopLevelQueueConnectionFactory( ApplicationContext ctx  )
+	{
+		ActiveMQConnectionFactory factory = null;
+		String[] inputChannelBeanIds = ctx.getBeanNamesForType(org.apache.uima.adapter.jms.activemq.JmsInputChannel.class);
+		String beanId = null;
+		for( int i=0; i < inputChannelBeanIds.length; i++)
+		{
+			JmsInputChannel inputChannel = (JmsInputChannel)ctx.getBean(inputChannelBeanIds[i]);
+			if ( inputChannel.getName().startsWith("top_level_input_queue_service") &&
+				 inputChannel instanceof JmsInputChannel )
+			{
+				factory = ((JmsInputChannel)inputChannel).getConnectionFactory();
+				break;
+			}
+		} 
+		return factory;
+	}
 	private void initializeTopLevelController( AnalysisEngineController cntlr, ApplicationContext ctx) 
 	throws Exception
 	{
 		((FileSystemXmlApplicationContext) ctx).setDisplayName(cntlr.getComponentName());
 		cntlr.addControllerCallbackListener(this);
+		
+		String inputQueueName = cntlr.getServiceEndpointName();
+		if ( inputQueueName != null )
+		{
+			if ( ctx.containsBean(inputQueueName) )
+			{
+				ActiveMQQueue queue = (ActiveMQQueue)ctx.getBean(inputQueueName);
+				if ( cntlr.getServiceInfo() != null )
+				{
+					cntlr.getServiceInfo().setInputQueueName(queue.getQueueName());
+				}
+			}
+			else
+			{
+				if ( cntlr.getServiceInfo() != null )
+				{
+					cntlr.getServiceInfo().setInputQueueName(inputQueueName);
+				}
+			}
+		}
+		
+		
+		//	If this is a Cas Multiplier add a special temp queue for receiving Free CAS
+		//	notifications.
+		if ( cntlr.isCasMultiplier() )
+		{
+			
+			ActiveMQConnectionFactory cf = getTopLevelQueueConnectionFactory( ctx );
+			//	Create a listener and a temp queue for Free CAS notifications. 
+			UimaDefaultMessageListenerContainer connector =	produceListenerConnector(cf);
+			System.out.println(">>>> Cas Multiplier Controller:"+cntlr.getComponentName()+" Activated Listener to Receive Free CAS Notifications - Temp Queue Name:"+connector.getEndpointName());
+			//	Direct all messages to the InputChannel 
+			connector.setMessageListener(((JmsInputChannel)cntlr.getInputChannel()));
+			((JmsInputChannel)cntlr.getInputChannel()).setListenerContainer(connector);
+			//	Save the temp queue reference in the Output Channel. The output channel will
+			//	add this queue to every outgoing message containing a CAS generated by the
+			//	Cas Multiplier.
+			((JmsOutputChannel)cntlr.getOutputChannel()).setFreeCasQueue(connector.getListenerEndpoint());
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initializeTopLevelController", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_activated_fcq__CONFIG", new Object[] { cntlr.getComponentName(), connector.getEndpointName() });
+		}
+
 		if (cntlr instanceof AggregateAnalysisEngineController) {
 			// Get a map of delegates for the top level aggregate
 			Map destinationMap = ((AggregateAnalysisEngineController) cntlr).getDestinations();
@@ -197,8 +283,8 @@
 		    if (!springContextFile.startsWith("file:")) {
 		      springContextFile = "file:" + springContextFile;
 		    }
-			ApplicationContext ctx = new FileSystemXmlApplicationContext(springContextFile);
-			return initializeContainer(ctx);
+			context = new FileSystemXmlApplicationContext(springContextFile);
+			return initializeContainer(context);
 		} catch (ResourceInitializationException e) {
 			//e.printStackTrace();
 			throw e;
@@ -226,8 +312,8 @@
 			// synchronous ( one bean at a time), some beans run in a separate
 			// threads. The completion of container deployment doesnt
 			// necessarily mean that all beans have initialized completely.
-			ApplicationContext ctx = new FileSystemXmlApplicationContext(springContextFiles);
-			return initializeContainer(ctx);
+			context = new FileSystemXmlApplicationContext(springContextFiles);
+			return initializeContainer(context);
 		} catch (ResourceInitializationException e) {
 			e.printStackTrace();
 			throw e;
@@ -253,7 +339,7 @@
 			}
 		}
 	}
-	public void notifyOnInitializationFailure(Exception e) {
+	public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
 
 		// Initialization exception. Notify blocking thread and indicate a
 		// problem
@@ -265,14 +351,37 @@
 
 	}
 
-	public void notifyOnInitializationSuccess() {
+	public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
 		serviceInitializationCompleted = true;
+		
 		synchronized (serviceMonitor) {
 			serviceMonitor.notifyAll();
 		}
 	}
+	public void notifyOnInitializationFailure( Exception e)
+	{
+		notifyOnInitializationFailure(null, e);
+	}
+	public void notifyOnInitializationSuccess()
+	{
+		notifyOnInitializationSuccess(null);
+	}
 
 	public void notifyOnTermination(String message) {
 	}
 
+	public FileSystemXmlApplicationContext getSpringContext()
+	{
+		return context;
+	}
+	
+	public boolean isInitialized()
+	{
+		return serviceInitializationCompleted;
+	}
+	
+	public boolean initializationFailed()
+	{
+		return serviceInitializationException;
+	}
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Aug 22 11:52:42 2008
@@ -35,6 +35,7 @@
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
 import org.apache.uima.aae.error.ErrorHandler;
 import org.apache.uima.aae.error.Threshold;
 import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
@@ -50,15 +51,21 @@
 	private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class;
 	private String destinationName="";
 	private Endpoint endpoint;
+	private boolean freeCasQueueListener;
 	private AnalysisEngineController controller;
 	private int retryCount = 2;
+	
 	public UimaDefaultMessageListenerContainer()
 	{
 		super();
-		setRecoveryInterval(20000);
-		//setAcceptMessagesWhileStopping(false);
+		setRecoveryInterval(60000);
+		setAcceptMessagesWhileStopping(false);
 		setExceptionListener(this);
-		
+	}
+	public UimaDefaultMessageListenerContainer(boolean freeCasQueueListener)
+	{
+		this();
+		this.freeCasQueueListener = freeCasQueueListener;
 	}
 	public void setController( AnalysisEngineController aController)
 	{
@@ -84,25 +91,6 @@
 	                "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
 	                new Object[] {  endpointName, getBrokerUrl(), t });
 
-/*
-				while ( retryCount > 0 )
-			{
-				retryCount--;
-				String brokerURL = ((ActiveMQConnectionFactory)getConnectionFactory()).getBrokerURL();
-				System.out.println(">>>>> Retrying Connection To Broker:"+brokerURL+" Endpoint:"+endpoint.getEndpoint()+" Sleeping For 1 Minute Before Retry. Retries Remaining:"+retryCount);
-				//	Retry the connection
-				try
-				{
-					establishSharedConnection();
-					synchronized(this)
-					{
-						wait(60000);
-					}
-					break;
-				}
-				catch( Exception e) {}
-			}
-*/
 			boolean terminate = true;
 			if (  disableListener(t) )
 			{
@@ -311,5 +299,8 @@
 	{
 		endpoint = anEndpoint;
 	}
-
+	public boolean isFreeCasQueueListener()
+	{
+		return freeCasQueueListener;
+	}
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Fri Aug 22 11:52:42 2008
@@ -22,6 +22,7 @@
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -30,6 +31,8 @@
 
 import org.apache.activemq.command.ActiveMQDestination;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
 /**
  * Initializes JMS session and creates JMS MessageProducer to be used for
  * sending messages to a given destination. It extends BaseMessageSender which
@@ -45,7 +48,8 @@
 	private Session session = null;
 	private MessageProducer producer = null;
 	private String destinationName = null;
-
+	private ConcurrentHashMap producerMap = new ConcurrentHashMap();
+	
 	public ActiveMQMessageSender(Connection aConnection,
 			List pendingMessageList, String aDestinationName,
 			BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
@@ -53,15 +57,28 @@
 		connection = aConnection;
 		destinationName = aDestinationName;
 	}
-
+	public MessageProducer getMessageProducer(Destination destination) throws Exception {
+		if ( producerMap.containsKey(destination))
+		{
+			return (MessageProducer) producerMap.get(destination);
+		}
+		createSession();
+		MessageProducer mProducer = session.createProducer(destination);
+		mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+		producerMap.put(destination, mProducer);
+		return mProducer;
+	}
+	private void createSession() throws Exception {
+		if ( session == null )	{
+			session = connection.createSession(false, 0);
+		}
+	}
 	/**
 	 * Creates a jms session object used to instantiate message producer
 	 */
 	protected void initializeProducer() throws Exception {
-		session = connection.createSession(false, 0);
-		Queue producerDestination = session.createQueue(destinationName);
-		producer = session.createProducer(producerDestination);
-		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+		createSession();
+		producer = getMessageProducer(session.createQueue(destinationName));
 	}
 
 	/**
@@ -96,5 +113,6 @@
 		if (producer != null) {
 			producer.close();
 		}
+		producerMap.clear();
 	}
 }
\ No newline at end of file

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Fri Aug 22 11:52:42 2008
@@ -54,6 +54,8 @@
 import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
 import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
 import org.apache.uima.adapter.jms.service.Dd2spring;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.resource.Resource;
 import org.apache.uima.resource.ResourceConfigurationException;
@@ -163,6 +165,7 @@
 			msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
 			msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
 			msg.setJMSReplyTo(consumerDestination);
+			
 		}
 		catch (Exception e)
 		{
@@ -308,7 +311,6 @@
 		//	in pendingMessageList. Upon arrival, each message is removed from 
 		//	pendingMessageList and it is sent to a destination.
 		
-////		Thread t = new Thread((BaseMessageSender)messageDispatcher); //.doStart();
 		t.start();
 		//	Wait until the worker thread is fully initialized
 			synchronized( sender )
@@ -342,6 +344,7 @@
 		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "initializeConsumer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_consumer_INFO", new Object[] { aBrokerURI, consumerDestination.getQueueName() });
 		consumer = consumerSession.createConsumer(consumerDestination);
 		consumer.setMessageListener(this);
+		System.out.println(">>>> Client Activated Temp Reply Queue:"+consumerDestination.getQueueName());
 	}
 	/**
 	 * Initialize the uima ee client. Takes initialization parameters from the
@@ -446,6 +449,16 @@
 				}
 
 				asynchManager.initialize(casPoolSize, "ApplicationCasPoolContext", performanceTuningSettings);
+
+				//	Create a special CasPool of size 1 to be used for deserializing CASes from a Cas Multiplier
+				if ( super.resourceMetadata != null && super.resourceMetadata instanceof AnalysisEngineMetaData )
+				{
+					if ( ((AnalysisEngineMetaData) super.resourceMetadata).getOperationalProperties().getOutputsNewCASes() )
+					{
+						//	Create a Shadow CAS Pool used to de-serialize CASes produced by a CAS Multiplier
+						asynchManager.initialize(1, SHADOW_CAS_POOL, performanceTuningSettings);
+					}
+				}
 				initialized = true;
 				remoteService = true;
 				// running = true;
@@ -715,33 +728,47 @@
 		msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None); 
 		msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
 		msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); 
-		msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.ReleaseCAS); 
+		msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.ReleaseCAS);
+		msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+		msg.setJMSReplyTo(consumerDestination);
 	}
 
   public void notifyOnInitializationFailure(Exception e) {
-
-    //  Initialization exception. Notify blocking thread and indicate a problem
-    serviceInitializationException = true;
-		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
-    synchronized(serviceMonitor)
-    {
-      serviceMonitor.notifyAll();
-    }
-    
+	  notifyOnInitializationFailure(null, e);
   }
 
   public void notifyOnInitializationSuccess() {
-    serviceInitializationCompleted =  true;
-    synchronized(serviceMonitor)
-    {
-      serviceMonitor.notifyAll();
-    }
+	  notifyOnInitializationSuccess(null);
   }
 
-  public void notifyOnTermination(String message) {
+  public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
+
+	    //  Initialization exception. Notify blocking thread and indicate a problem
+	    serviceInitializationException = true;
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
+	    synchronized(serviceMonitor)
+	    {
+	      serviceMonitor.notifyAll();
+	    }
+	    
+	  }
+
+	  public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
+	    serviceInitializationCompleted =  true;
+	    synchronized(serviceMonitor)
+	    {
+	      serviceMonitor.notifyAll();
+	    }
+	  }
+
+	  public void notifyOnTermination(String message) {
     
   }
 
+	protected MessageProducer getMessageProducer( Destination destination ) throws Exception
+	{
+		return sender.getMessageProducer(destination);
+	}
 
 
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=688172&r1=688171&r2=688172&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java Fri Aug 22 11:52:42 2008
@@ -1,3 +1,4 @@
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -22,179 +23,244 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.net.BindException;
-import java.net.ServerSocket;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import java.io.InvalidClassException;
 import org.apache.uima.UIMAFramework;
-import org.apache.uima.aae.controller.ControllerCallbackListener;
+import org.apache.uima.aae.jmx.monitor.BasicUimaJmxMonitorListener;
+import org.apache.uima.aae.jmx.monitor.JmxMonitor;
+import org.apache.uima.aae.jmx.monitor.JmxMonitorListener;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.util.Level;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextClosedEvent;
 import org.springframework.context.support.FileSystemXmlApplicationContext;
 
-public class UIMA_Service implements ControllerCallbackListener
+public class UIMA_Service implements  ApplicationListener
 {
 	private static final Class CLASS_NAME = UIMA_Service.class;
 
-	private BrokerService service;
-	private TransportConnector connector;
-	private TransportConnector stompConnector;
-	private TransportConnector httpConnector;
-	private Object semaphore = new Object();
 	protected boolean serviceInitializationCompleted;
 	protected boolean serviceInitializationException;
 	protected Object serviceMonitor = new Object();
-
-	public void startInternalBroker() throws Exception
+	private JmxMonitor monitor = null;
+	private Thread monitorThread = null;
+	
+	/**
+	 * Parse command args, run dd2spring on the deployment descriptors to generate Spring context files.
+	 * 
+	 * @param args - command line arguments
+	 * @return - an array of Spring context files generated from provided deployment descriptors
+	 * @throws Exception
+	 */
+	public String[] initialize(String[] args) throws Exception
 	{
+		UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, "UIMA-AS version " + UIMAFramework.getVersionString());
+
+		String[] springConfigFileArray =
+		{};
+		String[] deployedDescriptors =
+		{};
+		String[] deploymentDescriptors =
+		{};
+		int nbrOfArgs = args.length;
 
-		service = new BrokerService();
-		String[] connectors = service.getNetworkConnectorURIs();
-		String brokerURI;
-		if (connectors != null)
+		deploymentDescriptors = getMultipleArg("-d", args);
+		if (deploymentDescriptors.length == 0)
 		{
-			for (int i = 0; i < connectors.length; i++)
-			{
-				System.out.println("ActiveMQ Broker Started With Connector:" + connectors[i]);
-			}
-			brokerURI = service.getMasterConnectorURI();
+		  // allow multiple args for one key
+			deploymentDescriptors = getMultipleArg2("-dd", args);
+		}
+		String saxonURL = getArg("-saxonURL", args);
+		String xslTransform = getArg("-xslt", args);
+		String uimaAsDebug = getArg("-uimaEeDebug", args);
+		
+		if (nbrOfArgs < 1 || (args[0].startsWith("-") && (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform.equals(""))))
+		{
+			printUsageMessage();
+			return null;
+		}
+
+		if (deploymentDescriptors.length == 0)
+		{
+			// array of context files passed in
+			springConfigFileArray = args;
+			deployedDescriptors = args;
 		}
 		else
 		{
-			String connectorList = "";
-			service.setPersistent(false);
-			service.setUseJmx(true);
-			brokerURI = generateInternalURI("tcp", 18810, true, false);
-			connector = service.addConnector(brokerURI);
-			System.out.println("Adding TCP Connector:" + connector.getConnectUri());
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG", new Object[]
-			{ "Adding TCP Connector", connector.getConnectUri() });
+			// create a String array of spring context files
+			springConfigFileArray = new String[deploymentDescriptors.length];
+			deployedDescriptors = new String[deploymentDescriptors.length];
 
-			connectorList = connector.getName();
-			if (System.getProperty("StompSupport") != null)
-			{
-				String stompURI = generateInternalURI("stomp", 61613, false, false);
-				stompConnector = service.addConnector(stompURI);
-				System.out.println("Adding STOMP Connector:" + stompConnector.getConnectUri());
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG", new Object[]
-				{ "Adding STOMP Connector", stompConnector.getConnectUri() });
-				connectorList += "," + stompConnector.getName();
-			}
-			if (System.getProperty("HTTP") != null)
+			Dd2spring aDd2Spring = new Dd2spring();
+			for (int dd = 0; dd < deploymentDescriptors.length; dd++)
 			{
-				String stringPort = System.getProperty("HTTP");
-				int p = Integer.parseInt(stringPort);
-				String httpURI = generateInternalURI("http", p, false, true);
-				httpConnector = service.addConnector(httpURI);
-				System.out.println("Adding HTTP Connector:" + httpConnector.getConnectUri());
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_adding_connector__CONFIG", new Object[]
-				{ "Adding HTTP Connector", httpConnector.getConnectUri() });
+				String deploymentDescriptor = deploymentDescriptors[dd];
+
+				File springConfigFile = aDd2Spring.convertDd2Spring(deploymentDescriptor, xslTransform, saxonURL, uimaAsDebug);
+
+				// if any are bad, fail
+				if (null == springConfigFile)
+				{
+					return null;
+				}
+				springConfigFileArray[dd] = springConfigFile.getAbsolutePath();
 
-				connectorList += "," + httpConnector.getName();
+				// get the descriptor to register with the engine controller
+				String deployDescriptor = "";
+				File afile = null;
+				FileInputStream fis = null;
+				try
+				{
+					afile = new File(deploymentDescriptor);
+					fis = new FileInputStream(afile);
+					byte[] bytes = new byte[(int) afile.length()];
+					fis.read(bytes);
+					deployDescriptor = new String(bytes);
+					deployedDescriptors[dd] = deployDescriptor;
+					// Log Deployment Descriptor
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_deploy_desc__FINEST", new Object[]
+					{ deployDescriptor });
+				}
+				catch (IOException e)
+				{
+					e.printStackTrace();
+				}
+				finally
+				{
+					if (fis != null)
+					{
+						try
+						{
+							fis.close();
+						}
+						catch (IOException e)
+						{
+						}
+					}
+				}
 			}
-			service.start();
-			System.setProperty("ActiveMQConnectors", connectorList);
-			System.out.println("Broker Service Started - URL:" + service.getVmConnectorURI());
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_broker_started__CONFIG", new Object[]
-			{ service.getVmConnectorURI() });
 		}
+		return springConfigFileArray;
+		
 
-		// Allow the connector time to start
-		synchronized (semaphore)
-		{
-			semaphore.wait(5000);
-		}
-		System.setProperty("jms.brokerUrl", brokerURI);
+		
 	}
-
 	/**
-	 * Generates a unique port for the Network Connector that will be plugged
-	 * into the internal Broker. This connector externalizes the internal broker
-	 * so that remote delegates can reply back to the Aggregate. This method
-	 * tests port 18810 for availability and it fails increments the port by one
-	 * until a port is valid.
+	 * Deploy Spring context files in a Spring Container.
 	 * 
-	 * @return - Broker URI with a unique port
+	 * @param springContextFiles - array of Spring context files
+	 * 
+	 * @throws Exception
 	 */
-
-	private String generateInternalURI(String aProtocol, int aDefaultPort, boolean cacheURL, boolean oneTry) throws Exception
+	public void deploy( String[] springContextFiles ) throws Exception
 	{
-		boolean success = false;
-		int openPort = aDefaultPort;
-		ServerSocket ssocket = null;
-
-		while (!success)
+		SpringContainerDeployer springDeployer =
+			new SpringContainerDeployer();
+		// now try to deploy the array of spring context files
+		springDeployer.deploy(springContextFiles);
+		//	Poll the deployer for the initialization status. Wait for either successful
+		//	initialization or failure.
+		while( !springDeployer.isInitialized() && !springDeployer.initializationFailed())
 		{
-			try
+			synchronized( springDeployer )
 			{
-				ssocket = new ServerSocket(openPort);
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_port_available__CONFIG", new Object[]
-				{ openPort });
-
-				String uri = aProtocol + "://" + ssocket.getInetAddress().getLocalHost().getCanonicalHostName() + ":" + openPort;
-				success = true;
-				if (cacheURL)
-				{
-					System.setProperty("BrokerURI", uri);
-				}
-				return uri;
+				springDeployer.wait(100);
 			}
-			catch (BindException e)
+		}
+		//	Check if the deployer failed
+		if ( springDeployer.initializationFailed() )
+		{
+			throw new ResourceInitializationException();
+		}
+		//	Register this class to receive Spring container notifications. Specifically, looking
+		//	for an even signaling the container termination. This is done so that we can stop
+		//	the monitor thread
+		FileSystemXmlApplicationContext context = springDeployer.getSpringContext();
+		context.addApplicationListener(this);
+	}
+	/**
+	 * Creates an instance of a {@link JmxMonitor}, initializes it with the JMX Server URI and
+	 * checkpoint frequency, and finally starts the monitor.
+	 * 
+	 * @param samplingFrequency - how often the JmxMonitor should checkpoint to fetch service metrics
+	 * 
+	 * @throws Exception - error on monitor initialization or startup
+	 */
+	public void startMonitor(long samplingFrequency) throws Exception
+	{
+		monitor = new JmxMonitor();
+
+		//	Check if the monitor should run in the verbose mode. In this mode
+		//	the monitor dumps JMX Server URI, and a list of UIMA-AS services
+		//	found in that server. The default is to not show this info.
+		if ( System.getProperty("verbose") != null )
+		{
+			monitor.setVerbose();
+		}
+
+		//	Use the URI provided in the first arg to connect to the JMX server.
+		//	Also define sampling frequency. The monitor will collect the metrics
+		//	at this interval.
+		String jmxServerPort = null;
+		//	get the port of the JMX Server. This property can be set on the command line via -d
+		//	OR it is set automatically by the code that creates an internal JMX Server. The latter
+		//	is created in the {@link BaseAnalysisEngineController} constructor.
+		if ( ( jmxServerPort = System.getProperty("com.sun.management.jmxremote.port")) != null )
+		{
+			//	parameter is set, compose the URI
+			String jmxServerURI = "service:jmx:rmi:///jndi/rmi://localhost:"+jmxServerPort+"/jmxrmi";
+			//	Connect to the JMX Server, configure checkpoint frequency, create MBean proxies for 
+			//	UIMA-AS MBeans and service input queues
+			monitor.initialize(jmxServerURI, samplingFrequency);
+			// Create formatter listener
+			JmxMonitorListener listener = null;
+			String formatterListenerClass = null;
+			//	Check if a custom monitor formatter listener class is provided. The user provides this
+			//	formatter by adding a -Djmx.monitor.formatter=<class> parameter which specifies a class
+			//	that implements {@link JmxMonitorListener} interface
+			if ( (formatterListenerClass = System.getProperty(JmxMonitor.FormatterListener)) != null )
 			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_port_not_available__CONFIG", new Object[]
-				{ openPort });
-				if (oneTry)
+				Object object = null;
+				try
 				{
-					System.out.println("Given port:" + openPort + " is not available for " + aProtocol);
-					throw e;
+					//	Instantiate the formatter listener class
+					Class formatterClass = Class.forName( formatterListenerClass );
+					object = formatterClass.newInstance();
 				}
-				openPort++;
-			}
-			catch (Exception e)
-			{
-				e.printStackTrace();
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[]
-				{ JmsConstants.threadName(), e });
-				if (oneTry)
+				catch( ClassNotFoundException e)
 				{
+					System.out.println("Class Not Found:"+formatterListenerClass+". Provide a Formatter Class Which Implements:org.apache.uima.aae.jmx.monitor.JmxMonitorListener");
 					throw e;
 				}
-			}
-			finally
-			{
-				try
+				if ( object instanceof JmxMonitorListener )
 				{
-					if (ssocket != null)
-					{
-						ssocket.close();
-					}
+					listener = (JmxMonitorListener)object; 
 				}
-				catch (IOException ioe)
+				else
 				{
+					throw new InvalidClassException("Invalid Monitor Formatter Class:"+formatterListenerClass+".The Monitor Requires a Formatter Which Implements:org.apache.uima.aae.jmx.monitor.JmxMonitorListener");
 				}
 			}
+			else
+			{
+				//	The default formatter listener which logs to the UIMA log
+				listener = new BasicUimaJmxMonitorListener(monitor.getMaxServiceNameLength());
+			}
+			//	Plug in the monitor listener
+			monitor.addJmxMonitorListener(listener);
+			//	Create and start the monitor thread
+			monitorThread = new Thread(monitor);
+			
+			//	Start the monitor thread. It will run until the Spring container stops. When this happens
+			//	the UIMA_Service receives notication via a {@code onApplicationEvent()} callback. There
+			//	the monitor is stopped allowing the service to terminate.
+			monitorThread.start();
+			System.out.println(">>> Started JMX Monitor.\n\t>>> MBean Server Port:"+jmxServerPort+"\n\t>>> Monitor Checkpoint Frequency:"+samplingFrequency+"\n\t>>> Monitor Formatter Class:"+listener.getClass().getName());
 		}
-		return null;
-
-	}
-
-	public ApplicationContext initialize(String[] springXmlConfigFiles) throws ResourceInitializationException
-	{
-		try
-		{
-			ApplicationContext ctx = new FileSystemXmlApplicationContext(springXmlConfigFiles);
-			((AbstractApplicationContext) ctx).registerShutdownHook();
-			return ctx;
-		}
-		catch (Exception e)
-		{
-			throw new ResourceInitializationException(e);
-		}
+		
 	}
 
 	/**
@@ -287,46 +353,15 @@
 		System.out.println(" Arguments to the program are as follows : \n" + "-d path-to-UIMA-Deployment-Descriptor [-d path-to-UIMA-Deployment-Descriptor ...] \n" + "-saxon path-to-saxon.jar \n" + "-xslt path-to-dd2spring-xslt\n" + "   or\n"
 				+ "path to Spring XML Configuration File which is the output of running dd2spring\n");
 	}
-	protected void waitForServiceNotification() throws Exception {
-
-			synchronized (serviceMonitor) {
-				while (!serviceInitializationCompleted) {
-					if (serviceInitializationException) {
-						throw new ResourceInitializationException();
-					}
-						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_awaiting_container_init__INFO", new Object[] {});
-
-						serviceMonitor.wait();
-
-						if (serviceInitializationException) {
-							throw new ResourceInitializationException();
-						}
-
-				}
-			}
-	}
-	public void notifyOnInitializationFailure(Exception e) {
-
-		// Initialization exception. Notify blocking thread and indicate a
-		// problem
-		serviceInitializationException = true;
-		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_container_init_exception__WARNING", new Object[] {e});
-		synchronized (serviceMonitor) {
-			serviceMonitor.notifyAll();
-		}
-
-	}
-
-	public void notifyOnInitializationSuccess() {
-		serviceInitializationCompleted = true;
-		synchronized (serviceMonitor) {
-			serviceMonitor.notifyAll();
+	public void onApplicationEvent(ApplicationEvent event) {
+		if ( event instanceof ContextClosedEvent && monitor != null && monitor.isRunning() )
+		{
+			System.out.println("Stopping Monitor");
+			//	Stop the monitor. The service has stopped
+			monitor.doStop();
 		}
 	}
 
-	public void notifyOnTermination(String message) {
-	}
-
 	/**
 	 * The main routine for starting the deployment of a UIMA-AS instance. The
 	 * args are either: 1 or more "paths" to Spring XML descriptors representing
@@ -346,100 +381,31 @@
 	 */
 	public static void main(String[] args)
 	{
-		UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, "UIMA-AS version " + UIMAFramework.getVersionString());
-
-		String[] springConfigFileArray =
-		{};
-		String[] deployedDescriptors =
-		{};
-		String[] deploymentDescriptors =
-		{};
-		int nbrOfArgs = args.length;
-
-		deploymentDescriptors = getMultipleArg("-d", args);
-		if (deploymentDescriptors.length == 0)
-		{
-		  // allow multiple args for one key
-			deploymentDescriptors = getMultipleArg2("-dd", args);
-		}
-		String saxonURL = getArg("-saxonURL", args);
-		String xslTransform = getArg("-xslt", args);
-		String uimaAsDebug = getArg("-uimaEeDebug", args);
-		
-		if (nbrOfArgs < 1 || (args[0].startsWith("-") && (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform.equals(""))))
-		{
-			printUsageMessage();
-			return;
-		}
-
-		if (deploymentDescriptors.length == 0)
-		{
-			// array of context files passed in
-			springConfigFileArray = args;
-			deployedDescriptors = args;
-		}
-		else
+		try
 		{
-			// create a String array of spring context files
-			springConfigFileArray = new String[deploymentDescriptors.length];
-			deployedDescriptors = new String[deploymentDescriptors.length];
-
-			Dd2spring aDd2Spring = new Dd2spring();
-			for (int dd = 0; dd < deploymentDescriptors.length; dd++)
+			UIMA_Service service = new UIMA_Service();
+			//	parse command args and run dd2spring to generate spring context
+			//	files from deployment descriptors
+			String contextFiles[] = service.initialize(args);
+			//	If no context files generated there is nothing to do
+			if ( contextFiles == null )
 			{
-				String deploymentDescriptor = deploymentDescriptors[dd];
-
-				File springConfigFile = aDd2Spring.convertDd2Spring(deploymentDescriptor, xslTransform, saxonURL, uimaAsDebug);
-
-				// if any are bad, fail
-				if (null == springConfigFile)
-				{
-					return;
-				}
-				springConfigFileArray[dd] = springConfigFile.getAbsolutePath();
-
-				// get the descriptor to register with the engine controller
-				String deployDescriptor = "";
-				File afile = null;
-				FileInputStream fis = null;
-				try
-				{
-					afile = new File(deploymentDescriptor);
-					fis = new FileInputStream(afile);
-					byte[] bytes = new byte[(int) afile.length()];
-					fis.read(bytes);
-					deployDescriptor = new String(bytes);
-					deployedDescriptors[dd] = deployDescriptor;
-					// Log Deployment Descriptor
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_deploy_desc__FINEST", new Object[]
-					{ deployDescriptor });
-				}
-				catch (IOException e)
-				{
-					e.printStackTrace();
-				}
-				finally
-				{
-					if (fis != null)
-					{
-						try
-						{
-							fis.close();
-						}
-						catch (IOException e)
-						{
-						}
-					}
-				}
+				return;
+			}
+			//	Deploy components defined in Spring context files. This method blocks until
+			//	the container is fully initialized and all UIMA-AS components are succefully
+			//	deployed.
+			service.deploy( contextFiles );
+			//	Check if we should start an optional JMX-based monitor that will provide service metrics
+			//	The monitor is enabled by existence of -Djmx.monitor.frequency=<number> parameter. By default
+			//	the monitor is not enabled.
+			String monitorCheckpointFrequency;
+			if ( ( monitorCheckpointFrequency = System.getProperty(JmxMonitor.CheckpointFrequency)) != null)
+			{
+				//	Found monitor checkpoint frequency parameter, configure and start the monitor.
+				//	If the monitor fails to initialize the service is not effected. 
+				service.startMonitor(Long.parseLong(monitorCheckpointFrequency));
 			}
-		}
-
-		// now try to deploy the array of spring context files
-		SpringContainerDeployer springDeployer =
-			new SpringContainerDeployer();
-		try
-		{
-			springDeployer.deploy(springConfigFileArray);
 		}
 		catch( Exception e)
 		{