You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2008/10/06 17:22:04 UTC

svn commit: r702174 - in /incubator/uima/sandbox/trunk/uima-as: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ uimaj-as-core/src/main/java/org/apache/uima/aae/ uimaj-...

Author: schor
Date: Mon Oct  6 08:22:03 2008
New Revision: 702174

URL: http://svn.apache.org/viewvc?rev=702174&view=rev
Log:
[UIMA-1140] Jerry's changes to use internal java queues instead of activemq ones.  Applied 3 patches dated 2008-10-6

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Mon Oct  6 08:22:03 2008
@@ -42,12 +42,14 @@
 import org.apache.activemq.advisory.ConsumerListener;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.InvalidMessageException;
 import org.apache.uima.aae.error.ServiceShutdownException;
+import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.util.Level;
 import org.springframework.util.Assert;
@@ -78,11 +80,11 @@
 
 	private Map connectionMap;
 
-	private boolean retryEnabled;
+	private volatile boolean retryEnabled;
 
 	private AnalysisEngineController controller = null;
 
-	private boolean connectionAborted = false;
+	private volatile boolean connectionAborted = false;
 
 	private long connectionCreationTimestamp = 0L;
 
@@ -162,12 +164,24 @@
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
 	                "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_activemq_open__FINE",
 	                new Object[] { endpoint, serverUri });
-
+//			PooledConnectionFactory factory = new PooledConnectionFactory(brokerUri);
+//			factory.setMaximumActive(20);
 			ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+
+			factory.setDispatchAsync(true);
+      factory.setUseAsyncSend(true);
+  
+      factory.setCopyMessageOnSend(false);
+/*
+			factory.setDispatchAsync(true);
+			factory.setOptimizeAcknowledge(true);
+			factory.setUseAsyncSend(true);
+			factory.setUseCompression(false);
 			factory.setCopyMessageOnSend(false);
+	*/
 			conn = factory.createConnection();
 			connectionCreationTimestamp = System.nanoTime();
-			producerSession = conn.createSession(false, 0);
+			producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
 			if ( isReplyEndpoint && delegateEndpoint.getDestination() != null )
 			{
 				producer = producerSession.createProducer(null); 
@@ -307,10 +321,13 @@
 			try
 			{
 				retryCount--;
+/*				
 				if (!((ActiveMQSession) producerSession).isRunning())
 				{
 					open();
 				}
+*/
+				
 				if (aTextMessage == null)
 				{
 					return producerSession.createTextMessage();
@@ -418,7 +435,7 @@
 		}
 	}
 
-	public boolean send(Message aMessage, boolean startTimer) 
+	public boolean send(final Message aMessage, boolean startTimer) 
 	{
 		String destinationName = "";
 
@@ -440,14 +457,28 @@
 					if ( isReplyEndpoint && delegateEndpoint.getDestination() != null  )
 					{
 						destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
-						producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
+						if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
+						{
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
+						}
+						synchronized(producer)
+						{
+              producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
+						}
 					}
 					else
 					{
 						destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
-						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
-						producer.send(aMessage);
+            if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
+            {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
+            }
+            synchronized(producer)
+            {
+              producer.send(aMessage);
+            }
 					}
+					
 				if (startTimer)
 				{
 					startTimer(connectionCreationTimestamp);
@@ -457,13 +488,17 @@
 			}
 			catch ( Exception e)
 			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_ableto_send_msg_INFO", new Object[] { controller.getComponentName(), destinationName, i+1, 10 });
 				lastException = e;
 				//	If the controller has been stopped no need to send messages
 				if ( controller.isStopped())
 				{
 					return true;
 				}
+				else
+				{
+	        e.printStackTrace();
+	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_ableto_send_msg_INFO", new Object[] { controller.getComponentName(), destinationName, i+1, 10 });
+				}
 			}
 			try
 			{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Mon Oct  6 08:22:03 2008
@@ -87,8 +87,8 @@
 	
 	private ServiceInfo serviceInfo = null;
 	
-	private boolean stopped = false;
-	private boolean	channelRegistered = false;
+	private volatile boolean stopped = false;
+	private volatile boolean	channelRegistered = false;
 	
 	private List listenerContainerList = new ArrayList();
 	
@@ -531,29 +531,6 @@
 		{
 			//	Wrap JMS Message in MessageContext
 			messageContext = new JmsMessageContext( aMessage, endpointName );
-			if ( jmsSession == null )
-			{
-				jmsSession = aJmsSession;
-				sessionAckMode = jmsSession.getAcknowledgeMode();
-				if ( sessionAckMode == Session.AUTO_ACKNOWLEDGE)
-				{
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-			                "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_ack_mode__CONFIG",
-			                new Object[] { endpointName, "AUTO_ACKNOWLEDGE" });
-				}
-				else if ( sessionAckMode == Session.CLIENT_ACKNOWLEDGE)
-				{
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-			                "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_ack_mode__CONFIG",
-			                new Object[] { endpointName, "CLIENT_ACKNOWLEDGE" });
-				}
-				else 
-				{
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
-			                "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_ack_mode__CONFIG",
-			                new Object[] { endpointName, sessionAckMode });
-				}
-			}
 			if ( aMessage.getStringProperty(AsynchAEMessage.CasReference) == null )
 			{
 				casRefId = "CasReferenceId Not In Message";

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon Oct  6 08:22:03 2008
@@ -90,9 +90,21 @@
 	
 	private String serviceProtocolList ="";
 
-	private boolean aborting = false;
+	private volatile boolean aborting = false;
 	
 	private Destination freeCASTempQueue;
+
+  private String hostIP = null;
+
+  public JmsOutputChannel()
+  {
+    try
+    {
+      hostIP = InetAddress.getLocalHost().getHostAddress();
+    }
+    catch ( Exception e) {  /* silently deal with this error */ }
+
+  }
 	/**
 	 * Sets the ActiveMQ Broker URI 
 	 */
@@ -1066,7 +1078,11 @@
 				
 				aTextMessage.setLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats.getRawCasSerializationTime());
 				aTextMessage.setLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats.getRawCasDeserializationTime());
-				aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
+
+//				long t = getAnalysisEngineController().getServicePerformance().getRawAnalysisTime();
+//				aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, t);
+        aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
+				
 				long iT =getAnalysisEngineController().getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process); 
 				aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT );
 				String lookupKey = getAnalysisEngineController().getName();
@@ -1170,12 +1186,6 @@
 		if (anEndpoint.isRemote())
 		{
 			aMessage.setStringProperty(UIMAMessage.ServerURI, getServerURI());
-			String hostIP = null;
-			try
-			{
-				hostIP = InetAddress.getLocalHost().getHostAddress();
-			}
-			catch ( Exception e) {  /* silently deal with this error */ }
 			if ( hostIP != null )
 			{
 				aMessage.setStringProperty(AsynchAEMessage.ServerIP,hostIP);
@@ -1301,6 +1311,7 @@
 			{
 				return;
 			}
+			
 			//	Get the connection object for a given endpoint
 			JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
 			//	Create empty JMS Text Message
@@ -1386,12 +1397,10 @@
 			{
 				startConnectionTimer = false;
 			}
-			
 			// ----------------------------------------------------
 			//	Send Request Messsage to the Endpoint
 			// ----------------------------------------------------
 			endpointConnection.send(tm, startConnectionTimer);
-
 //			if ( getAnalysisEngineController().isTopLevelComponent() )
 //			{
 //				getAnalysisEngineController().getInProcessCache().dumpContents(getAnalysisEngineController().getComponentName());

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java Mon Oct  6 08:22:03 2008
@@ -203,10 +203,7 @@
 			int concurrentConsumerCountOnReplies = getConcurrentConsumerCount(ctx);
 			// Configure and initialize vm transport in the top level aggregate.
 			// The aggregate will initialize all delegates with the vm transport.
-			if ( System.getProperty("UseVmTransport") != null )
-			{
-	      ((AggregateAnalysisEngineController) cntlr).initializeVMTransport(concurrentConsumerCountOnReplies);
-			}
+      ((AggregateAnalysisEngineController) cntlr).initializeVMTransport(concurrentConsumerCountOnReplies);
 			// Complete initialization of the aggregate by sending
 			// getMeta requests to
 			// all remote delegates (if any). Collocated delegates

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java Mon Oct  6 08:22:03 2008
@@ -48,11 +48,11 @@
 			if ( destination == null )
 			{
 				destination = session.createTemporaryQueue();
+        System.out.println(">>> Created New Temp Queue:"+destination+" Listener Hash:"+listener.hashCode());
 				if ( listener != null )
 				{
 					listener.setDestination(destination);
 				}
-				System.out.println(">>> Created New Temp Queue:"+destination+" Listener Hash:"+listener.hashCode());
 			}
 		}
 		return destination;

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Mon Oct  6 08:22:03 2008
@@ -51,7 +51,7 @@
   private static final char FS = System.getProperty("file.separator").charAt(0);
 	protected String text = "IBM today elevated five employees to the title of IBM Fellow\n -- its most prestigious technical honor.\n The company also presented more than $2.8 million in cash awards to employees whose technical innovation have yielded exceptional value to the company and its customers.\nIBM conferred the accolades and awards at its 2003 Corporate Technical Recognition Event (CTRE) in Scottsdale, Ariz. CTRE is a 40-year tradition at IBM, established to recognize exceptional technical employees and reward them for extraordinary achievements and contributions to the company's technology leadership.\n Our technical employees are among the best and brightest innovators in the world.\n They share a passion for excellence that defines their work and permeates the products and services IBM delivers to its customers, said Nick Donofrio, senior vice president, technology and manufacturing for IBM.\n CTRE provides the means for us to honor those who have distinguished th
 emselves as exceptional leaders among their peers.\nAmong the special honorees at the 2003 CTRE are five employees who earned the coveted distinction of IBM Fellow:- David Ferrucci aka Dave, Grady Booch, chief scientist of Rational Software, IBM Software Group.\n Recognized internationally for his innovative work on software architecture, modeling, and software engineering process. \nMr. Booch is one of the original authors of the Unified Modeling Language (UML), the industry-standard language of blueprints for software-intensive systems.- Dr. Donald Chamberlin, researcher, IBM Almaden Research Center. An expert in relational database languages, Dr. Chamberlin is co- inventor of SQL, the language that energized the relational database market. He has also";
 	protected String doubleByteText = null;
-	protected boolean unexpectedException = false;
+	protected volatile boolean unexpectedException = false;
 	private static final boolean SEND_CAS_ASYNCHRONOUSLY = true;
 
 	protected static final int CPC_LATCH = 1;
@@ -328,7 +328,9 @@
 	{
 		Thread t1 = null;
 		Thread t2 = null;
-
+		serviceShutdownException = false;
+		unexpectedException = false;
+		
 		if (appCtx == null)
 		{
 			appCtx = buildContext(aBrokerURI, aTopLevelServiceQueueName, 0);
@@ -373,6 +375,10 @@
 						// Send CPC
 						aUimaEeEngine.collectionProcessingComplete();
 					}
+					else
+					{
+            System.out.println(">>>>>>>>>>>>>>>> Not Sending CPC Due To Exception [serviceShutdownException="+serviceShutdownException+"] [isStopped="+isStopped+"] [unexpectedException="+unexpectedException+"]");
+					}
 				}
 
 				// If have skipped CPC trip the latch
@@ -566,10 +572,6 @@
 			}
 			else if (processCountLatch != null)
 			{
-			  
-			  
-			  
-			  
 				if ( !expectedException && !(serviceShutdownException && expectingServiceShutdownException) )
 				{
 				unexpectedException = true;
@@ -669,20 +671,40 @@
 	{
 		if (aStatus != null && aStatus.isException())
 		{
-			System.out.println(" Received CPC Reply Containing Exception");
+		  
+		  List list = aStatus.getExceptions();
+      boolean expectedException = false;
+      for( int i=0; i < list.size(); i++)
+      {
+        Exception e = (Exception)list.get(i);
+        if ( e instanceof ServiceShutdownException || 
+           (e.getCause() != null && e.getCause() instanceof ServiceShutdownException ))
+        {
+          serviceShutdownException = true;
+        }
+        else if ( ignoreException( e.getClass()))
+        {
+          expectedException = true;
+        }
+        if ( !expectedException && !expectingServiceShutdownException )
+        {
+          e.printStackTrace();
+        }
+      }
+      if ( !expectedException && !(serviceShutdownException && expectingServiceShutdownException) )
+      {
+        System.out.println(" Received CPC Reply Containing Exception");
+        System.out.println(" ... when expecting normal CPC reply!");
+        unexpectedException = true;
+      }
 			if (exceptionCountLatch != null)
 			{
 				exceptionCountLatch.countDown();
 			}
-			else
-			{
-				if (cpcLatch != null)
-				{
-					System.out.println(" ... when expecting normal CPC reply!");
-					unexpectedException = true;
-					cpcLatch.countDown();
-				}
-			}
+      if (cpcLatch != null)
+      {
+        cpcLatch.countDown();
+      }
 		}
 		else
 		{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java Mon Oct  6 08:22:03 2008
@@ -39,6 +39,7 @@
 import org.apache.uima.cas.TypeSystem;
 import org.apache.uima.cas.impl.AllowPreexistingFS;
 import org.apache.uima.cas.impl.OutOfTypeSystemData;
+import org.apache.uima.cas.impl.Serialization;
 import org.apache.uima.cas.impl.XCASDeserializer;
 import org.apache.uima.cas.impl.XCASSerializer;
 import org.apache.uima.cas.impl.XmiCasDeserializer;
@@ -167,17 +168,6 @@
 			writer.close();
 		}
 	}
-/*
-	public static  String serializeCasToXmi(CAS cas, XmiSerializationSharedData serSharedData) throws IOException, SAXException
-	{
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		XmiCasSerializer.serialize(cas, null, baos, false, serSharedData);
-		
-		baos.close();
-		String xmiStr = new String(baos.toByteArray(), "UTF-8"); 
-		return xmiStr;
-	}
-*/
 	
 	/** Utility method for deserializing a CAS from an XMI String */
 	public static void deserializeCasFromXmi(String anXmlStr, CAS aCAS, XmiSerializationSharedData aSharedData, 
@@ -187,31 +177,11 @@
 		
 		Reader reader = new StringReader(anXmlStr);
 		XMLReader xmlReader = XMLReaderFactory.createXMLReader();
-	    XmiCasDeserializer deser = new XmiCasDeserializer(aCAS.getTypeSystem());
+    XmiCasDeserializer deser = new XmiCasDeserializer(aCAS.getTypeSystem());
 	    ContentHandler handler = deser.getXmiCasHandler(aCAS, aLenient, aSharedData, aMergePoint);
 	    xmlReader.setContentHandler(handler);
 	    xmlReader.parse(new InputSource(reader));
 
-	    
-	    
-	    
-	    
-	    /*		
-		byte[] bytes = xmlStr.getBytes("UTF-8"); // this assumes the encoding
-													// is UTF-8, which is the
-													// default output encoding
-													// of the XmiCasSerializer
-		ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-		try
-		{
-			XmiCasDeserializer.deserialize(bais, cas, lenient, sharedData, mergePoint);
-		}
-		finally
-		{
-			bais.close();
-		}
-*/
-		
 	}
 
 	public static void deserializeCasFromXmi(String anXmlStr, CAS aCAS, XmiSerializationSharedData aSharedData, 
@@ -225,8 +195,7 @@
 	    ContentHandler handler = deser.getXmiCasHandler(aCAS, aLenient, aSharedData, aMergePoint, allow);
 	    xmlReader.setContentHandler(handler); 
 	    xmlReader.parse(new InputSource(reader));
+	}
 
-
-}
 	
 }	

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Mon Oct  6 08:22:03 2008
@@ -20,7 +20,6 @@
 package org.apache.uima.aae.controller;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -298,7 +297,6 @@
 		FlowContainer flow = lookupFlow(aCasReferenceId);
 			if ( flow != null )
 			{
-				//flow.aborted();
 				synchronized( flowMap )
 				{
 					flowMap.remove(aCasReferenceId);
@@ -401,10 +399,10 @@
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "processCollectionCompleteReplyFromDelegate", 
 					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_recvd_cpc_reply__FINE", new Object[] { key });
 			
-			
-			if (sendReply && allDelegatesCompletedCollection() && getClientEndpoint() != null)
+			Endpoint cEndpoint = null;
+			if (sendReply && allDelegatesCompletedCollection() && (( cEndpoint = getClientEndpoint()) != null) )
 			{
-				sendCpcReply();
+				sendCpcReply(cEndpoint);
 			}
 		}
 		catch ( Exception e)
@@ -412,7 +410,7 @@
 			throw new AsynchAEException(e);
 		}
 	}
-	private void sendCpcReply() throws Exception
+	private void sendCpcReply(Endpoint aClientEndpoint) throws Exception
 	{
 		Iterator destIterator = destinationMap.keySet().iterator();
 		while(destIterator.hasNext())
@@ -431,20 +429,22 @@
 		logStats(getComponentName(),servicePerformance);
 		
 		endProcess(AsynchAEMessage.Process);
-
-    if ( !getClientEndpoint().isRemote() && System.getProperty("UseVmTransport") != null)
+		if ( aClientEndpoint == null )
+		{
+		  aClientEndpoint = getClientEndpoint();
+		}
+    if ( !aClientEndpoint.isRemote())
     {
         UimaMessage message = 
-          getTransport(getClientEndpoint().getEndpoint()).produceMessage(AsynchAEMessage.CollectionProcessComplete,AsynchAEMessage.Response,getName());
+          getTransport(aClientEndpoint.getEndpoint()).produceMessage(AsynchAEMessage.CollectionProcessComplete,AsynchAEMessage.Response,getName());
         //  Send reply back to the client. Use internal (non-jms) transport
         getTransport(getName()).getUimaMessageDispatcher().dispatch(message);
     }
     else
     {
-      getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, getClientEndpoint());
+      getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint);
     }
 		
-		clientEndpoint = null;
 		clearStats();
 	}
 	/**
@@ -533,7 +533,7 @@
 		{
 			try
 			{
-				sendCpcReply();
+				sendCpcReply(null);
 			}
 			catch(Exception e)
 			{
@@ -550,7 +550,7 @@
 				if (endpoint != null )
 				{
 				  
-			    if ( !endpoint.isRemote() && System.getProperty("UseVmTransport") != null)
+			    if ( !endpoint.isRemote())
 			    {
 			      try
 			      {
@@ -1033,28 +1033,18 @@
 	      delegateEndpoints[i].initialize();
 	      delegateEndpoints[i].setController(this);
 	      
-	      // Dispatch GetMeta to the collocated component
-	      if ( System.getProperty("UseVmTransport") != null)
-	      {
-	        delegateEndpoints[i].setWaitingForResponse(true);
-	        try
-	        {
-	          UimaMessage message = 
-	            getTransport(delegateEndpoints[i].getEndpoint()).produceMessage(AsynchAEMessage.GetMeta,AsynchAEMessage.Request,getName());
-
-	          getTransport(delegateEndpoints[i].getEndpoint()).getUimaMessageDispatcher().dispatch(message);
-	        }
-	        catch( Exception e)
-	        {
-	          throw new AsynchAEException(e);
-	        }
-	      }
-	      else
-	      {
-	        ((AggregateAnalysisEngineController) this).dispatchMetadataRequest(delegateEndpoints[i]);
-	      }
-	      
-			}
+        delegateEndpoints[i].setWaitingForResponse(true);
+        try
+        {
+          UimaMessage message = 
+            getTransport(delegateEndpoints[i].getEndpoint()).produceMessage(AsynchAEMessage.GetMeta,AsynchAEMessage.Request,getName());
+          getTransport(delegateEndpoints[i].getEndpoint()).getUimaMessageDispatcher().dispatch(message);
+        }
+        catch( Exception e)
+        {
+          throw new AsynchAEException(e);
+        }
+      }
 		}
 	}
 
@@ -1289,13 +1279,6 @@
 		else
 		{
 		  endpoint = getReplyEndpoint( cacheEntry );
-/*
-		  endpoint = getMessageOrigin(cacheEntry.getCasReferenceId());
-			if ( endpoint == null && cacheEntry.getInputCasReferenceId() != null)
-			{
-			  endpoint = getMessageOrigin(cacheEntry.getInputCasReferenceId());
-			}
-*/	
 			dropFlow(cacheEntry.getCasReferenceId(), false);
 		}
 		if ( endpoint != null )
@@ -1330,21 +1313,20 @@
 					      cmOutstandingCASes.put(cacheEntry.getCasReferenceId(),cacheEntry.getCasReferenceId());
 	            }
 						}
-				    if ( !endpoint.isRemote() && System.getProperty("UseVmTransport") != null)
+				    if ( !endpoint.isRemote() )
 				    {
 				      sendVMMessage(AsynchAEMessage.Request, endpoint, cacheEntry);
 				    }
 				    else
 				    {
 	            // Send response to a given endpoint
-	            //getOutputChannel().sendReply(cacheEntry.getCas(), cacheEntry.getInputCasReferenceId(), aCasReferenceId, endpoint, cacheEntry.getCasSequence());
 	            getOutputChannel().sendReply(cacheEntry, endpoint);
 
 				    }
 					}
 					else
 					{
-            if ( !endpoint.isRemote() && System.getProperty("UseVmTransport") != null)
+            if ( !endpoint.isRemote())
             {
               sendVMMessage(AsynchAEMessage.Response, endpoint, cacheEntry);
             }
@@ -1485,7 +1467,7 @@
 
 	private void dispatch(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException
 	{
-    if ( !anEndpoint.isRemote() && System.getProperty("UseVmTransport") != null)
+    if ( !anEndpoint.isRemote() )
     {
       try
       {
@@ -1606,11 +1588,6 @@
 			}
 		}
 
-//		if (destinationToKeyMap.containsKey(anEndpointName))
-//		{
-//			return (String) destinationToKeyMap.get(anEndpointName);
-//		}
-
 		return key;
 	}
 
@@ -1664,6 +1641,7 @@
     }
     public void mergeTypeSystem(String aTypeSystem, String fromDestination, String fromServer) throws AsynchAEException
 	{
+      
 		try
 		{
 			Set keys = destinationMap.keySet();
@@ -2044,13 +2022,19 @@
 		if ( serviceInfo == null )
 		{
 			serviceInfo = new AggregateServiceInfo(isCasMultiplier());
-			if ( getInputChannel() != null )
+			// if this is a top level service and the input channel not yet initialized
+			// block in getInputChannel() on the latch
+			if ( isTopLevelComponent() && getInputChannel() != null )
 			{
-				serviceInfo.setInputQueueName(getInputChannel().getName());
-				serviceInfo.setBrokerURL(getInputChannel().getServerUri());
-				serviceInfo.setDeploymentDescriptor("");
-
+	      serviceInfo.setInputQueueName(getInputChannel().getName());
+	      serviceInfo.setBrokerURL(getInputChannel().getServerUri());
 			}
+			else
+			{
+	      serviceInfo.setInputQueueName(getName());
+	      serviceInfo.setBrokerURL("vm://localhost");
+			}
+			serviceInfo.setDeploymentDescriptor("");
 			serviceInfo.setState("Running");
 		}
 		return serviceInfo;
@@ -2154,7 +2138,7 @@
 	 * Force all collocated delegates to perform any post-initialization steps.
 	 */
 	public void onInitialize()
-    {
+  {
 		//	For each collocated delegate
 		for( int i=0; i < childControllerList.size(); i++ )
 		{
@@ -2163,6 +2147,6 @@
 			//	notify the delegate 
 			delegateController.onInitialize();
 		}
-    }
+  }
 	
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon Oct  6 08:22:03 2008
@@ -146,21 +146,21 @@
 	
 	private JmxManagement jmxManagement = null;
 	
-	protected boolean stopped = false;
+	protected volatile boolean stopped = false;
 	
 	protected String delegateKey = null;
 	
 	protected List unregisteredDelegateList = new ArrayList();
 	
-	protected boolean allDelegatesAreRemote = false;
+	protected volatile boolean allDelegatesAreRemote = false;
 	
 	protected List controllerListeners = new ArrayList();
 	
-	protected boolean serviceInitialized = false;
+	protected volatile boolean serviceInitialized = false;
 	
 	protected ConcurrentHashMap perCasStatistics = new ConcurrentHashMap();
 
-	private boolean casMultiplier = false;
+	private volatile boolean casMultiplier = false;
 	
 	protected Object syncObject = new Object();
 	
@@ -171,7 +171,7 @@
 	
 	private Object waitmux = new Object();
 	
-	private boolean waitingForCAS = false;
+	private volatile boolean waitingForCAS = false;
 	
 	private long startTime = System.nanoTime();
 	
@@ -292,7 +292,7 @@
 			}
 		}
 		paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, jmxManagement.getJmxDomain()); 
-		if ( isTopLevelComponent() && System.getProperty("UseVmTransport") != null )
+		if ( isTopLevelComponent())
 		{
 		  System.out.println("Top Level Service:"+getComponentName()+ " Configured to Use Java VM Transport For Internal Messaging");
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -358,7 +358,7 @@
 			jmxManagement.registerMBean(inProcessCache, on);
 		}
 		initializeServiceStats();
-		
+
 	}
   public UimaTransport getTransport(String aKey) throws Exception
   {
@@ -391,11 +391,6 @@
 	  */
 	 public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception
 	 {
-	   //  Check if internal messaging is enabled by checking system properties.
-	   if ( System.getProperty("UseVmTransport") == null)
-	   {
-	     return;
-	   }
 	   //  If this controller is an Aggregate Controller, force delegates to initialize
 	   //  their internal transports.
 	   if ( this instanceof AggregateAnalysisEngineController )
@@ -415,12 +410,27 @@
 	   if ( parentController != null )
      {
        UimaAsContext uimaAsContext = new UimaAsContext();
-       InputChannel ic = getInputChannel(endpointName);
-       int serviceRequestConsumerCount = ic.getConcurrentConsumerCount();
+       if ( !registeredWithJMXServer )
+       {
+         registeredWithJMXServer = true;
+         registerServiceWithJMX(jmxContext, false);
+       }
+       // Determine how many consumer threads to create. First though use the parent Aggregate Controller
+       // to lookup this delegate key. Next fetch the delegate endpoint which contains 
+       // concurrentConsumers property.
+       String key = ((AggregateAnalysisEngineController)parentController).lookUpDelegateKey(getName());
+       int concurrentRequestConsumers = 1;
+       int concurrentReplyConsumers = 1;
+       if ( key != null )
+       {
+         Endpoint e = ((AggregateAnalysisEngineController)parentController).lookUpEndpoint(key, false);
+         concurrentRequestConsumers = e.getConcurrentRequestConsumers();
+         concurrentReplyConsumers = e.getConcurrentReplyConsumers();
+       }
        
-       System.out.println("Controller:"+getComponentName()+" Starting Request Listener With "+serviceRequestConsumerCount+" Concurrent Consumers. Reply Listener Configured With "+parentControllerReplyConsumerCount+" Concurrent Consumers");
+       System.out.println("Controller:"+getComponentName()+" Starting Request Listener With "+concurrentRequestConsumers+" Concurrent Consumers. Reply Listener Configured With "+concurrentReplyConsumers+" Concurrent Consumers");
        
-       uimaAsContext.setConcurrentConsumerCount(serviceRequestConsumerCount);
+       uimaAsContext.setConcurrentConsumerCount(concurrentRequestConsumers);
        uimaAsContext.put("EndpointName", endpointName);
 
        UimaTransport vmTransport = getTransport(uimaAsContext);
@@ -431,7 +441,9 @@
        // Creates parent controller dispatcher for this delegate. The dispatcher is wired
        // with this delegate's listener.
        UimaAsContext uimaAsContext2 = new UimaAsContext();
-       uimaAsContext2.setConcurrentConsumerCount(parentControllerReplyConsumerCount);
+       // Set up as many reply threads as there are threads to process requests
+       uimaAsContext2.setConcurrentConsumerCount(concurrentReplyConsumers);
+//       uimaAsContext2.setConcurrentConsumerCount(parentControllerReplyConsumerCount);
        uimaAsContext2.put("EndpointName", endpointName);
        UimaTransport parentVmTransport = parentController.getTransport(uimaAsContext2, endpointName);
        parentVmTransport.produceUimaMessageDispatcher(this, vmTransport);
@@ -614,8 +626,18 @@
 		
 		registerWithAgent(servicePerformance, name );
 		servicePerformance.setIdleTime(System.nanoTime());
-		
-		ServiceInfo serviceInfo = getInputChannel().getServiceInfo();
+    ServiceInfo serviceInfo = null;
+		if ( remote )
+		{
+	    serviceInfo = getInputChannel().getServiceInfo();
+		}
+		else
+		{
+      serviceInfo = new ServiceInfo();
+      serviceInfo.setBrokerURL(getBrokerURL());
+      serviceInfo.setInputQueueName(getName());
+      serviceInfo.setState("Active");
+		}
 		ServiceInfo pServiceInfo = null;
 
 		if ( this instanceof PrimitiveAnalysisEngineController )
@@ -1475,7 +1497,13 @@
 				}
 			}
 		}
+		/*
+		 * Commented this block. It generates ShutdownException which causes problems
+		 * The shutdown of services happens ad hoc and not orderly. This whole logic
+		 * needs to be revisited.
+		 * 
 		//	Send an exception to the client if this is a top level service
+		 */ 
 		if (getOutputChannel() != null && isTopLevelComponent() )
 		{
 			Endpoint clientEndpoint = null;
@@ -1494,16 +1522,15 @@
 				}
 			}
 		}
-		//	Stop output channel
-		getOutputChannel().stop();
 		
-		adminContext = null;
 		if ( !isTopLevelComponent() )
 		{
 			adminContext = null;
 		}
 		else
 		{
+	    //  Stop output channel
+	    getOutputChannel().stop();
 			try
 			{
 				//	Remove all MBeans registered by this service
@@ -2262,7 +2289,7 @@
 	      if ( metadata != null )
 	      {
 	        
-	        if ( !anEndpoint.isRemote() && System.getProperty("UseVmTransport") != null)
+	        if ( !anEndpoint.isRemote())
 	        {
 	          ByteArrayOutputStream bos = new ByteArrayOutputStream();
 	          try

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java Mon Oct  6 08:22:03 2008
@@ -140,4 +140,12 @@
 	public void setEndpointServer( String anEndpointServer );
 	
 	public String getEndpointServer();
+	
+  public void setConcurrentRequestConsumers(int aConsumerCount);
+  
+  public int getConcurrentRequestConsumers();
+
+  public void setConcurrentReplyConsumers(int aConsumerCount);
+  
+  public int getConcurrentReplyConsumers();
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java Mon Oct  6 08:22:03 2008
@@ -105,6 +105,10 @@
 	
 	private long idleTime=0;
 	
+	private int concurrentRequestConsumers = 1;
+	
+	private int concurrentReplyConsumers = 1;
+	
 	//	This is supplied by the remote client. It needs to be
 	//	echoed back to the client. 
 	private String endpointServer = null;
@@ -562,5 +566,22 @@
 	public String getEndpointServer() {
 		return endpointServer;
 	}
-
+	
+  public void setConcurrentRequestConsumers(int aConsumerCount) {
+    concurrentRequestConsumers = aConsumerCount;
+  }
+  
+  public int getConcurrentRequestConsumers() {
+    return concurrentRequestConsumers;
+  }
+
+  public void setConcurrentReplyConsumers(int aConsumerCount)  {
+    concurrentReplyConsumers = aConsumerCount;
+    
+  }
+  
+  public int getConcurrentReplyConsumers() {
+    return concurrentReplyConsumers;
+  }
+  
 }
\ No newline at end of file

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Mon Oct  6 08:22:03 2008
@@ -237,7 +237,7 @@
 			getServicePerformance().incrementAnalysisTime(super.getCpuTime()-start);
 
 			
-	    if ( !anEndpoint.isRemote() && System.getProperty("UseVmTransport") != null)
+	    if ( !anEndpoint.isRemote())
 	    {
 	        UimaMessage message = 
 	          getTransport(anEndpoint.getEndpoint()).produceMessage(AsynchAEMessage.CollectionProcessComplete,AsynchAEMessage.Response,getName());
@@ -409,8 +409,8 @@
 					}
 	        //  Increment number of CASes processed by this service
 	        sequence++;
-				}
-		      if ( !anEndpoint.isRemote() && System.getProperty("UseVmTransport") != null)
+			  }
+		      if ( !anEndpoint.isRemote())
 		      {
 		          UimaMessage message = 
 		            getTransport(anEndpoint.getEndpoint()).produceMessage(AsynchAEMessage.Process,AsynchAEMessage.Request,getName());
@@ -444,7 +444,7 @@
 			
 			// Store total time spent processing this input CAS
 			getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
-      if ( !anEndpoint.isRemote() && System.getProperty("UseVmTransport") != null)
+      if ( !anEndpoint.isRemote())
       {
           inputCASReturned = true;
           UimaMessage message = 
@@ -630,7 +630,7 @@
 		{
 			serviceInfo = new PrimitiveServiceInfo();
 		}
-		if ( getInputChannel() != null )
+		if ( isTopLevelComponent() && getInputChannel() != null )
 		{
 			serviceInfo.setInputQueueName(getInputChannel().getServiceInfo().getInputQueueName());
 			serviceInfo.setBrokerURL(getInputChannel().getServiceInfo().getBrokerURL());

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Mon Oct  6 08:22:03 2008
@@ -109,7 +109,7 @@
 		{
 			try
 			{
-			   if ( !anEndpoint.isRemote() && System.getProperty("UseVmTransport") != null )
+			   if ( !anEndpoint.isRemote())
 			   {
 			      anEndpoint.setReplyEndpoint(true);
 			      UimaTransport vmTransport = aController.getTransport(aController.getName()) ;//anEndpoint.getEndpoint());

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java Mon Oct  6 08:22:03 2008
@@ -313,7 +313,9 @@
 				{
 					if ( delegateServicePerformance != null )
 					{
-						delegateServicePerformance.incrementAnalysisTime(timeInProcessCAS);
+           delegateServicePerformance.incrementAnalysisTime(timeInProcessCAS);
+            //  The remote delegate returns the actual analysis time not the delta
+//					  delegateServicePerformance.setAnalysisTime(timeInProcessCAS);
 					}
 				}
 				else 
@@ -328,7 +330,8 @@
 					// Update processing time for this CAS
 					if ( inputCasStats != null )
 					{
-						inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+//            inputCasStats.setAnalysisTime(timeInProcessCAS);
+            inputCasStats.incrementAnalysisTime(timeInProcessCAS);
 					}
 				}
 			}

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java?rev=702174&r1=702173&r2=702174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java Mon Oct  6 08:22:03 2008
@@ -138,6 +138,17 @@
 		}
 	}
 
+	/**
+	 * Adjust the analysis time. This method is called when a reply is received from a remote
+	 * delegate. Each reply message containing a CAS include the current actual analysis time
+	 * This is not a delta, its the running analysis time.
+	 * 
+	 * @param anAnalysisTime
+	 */
+	public synchronized void setAnalysisTime( long anAnalysisTime )
+	{
+	  analysisTime = anAnalysisTime;
+	}
 	public void incrementAnalysisTime( long anAnalysisTime )
 	{
 		synchronized(sem)
@@ -172,7 +183,17 @@
 	
 	public long getRawAnalysisTime()
 	{
-		return analysisTime;
+    if ( controller != null )
+    {
+      return controller.getAnalysisTime();
+    }
+    else
+    {
+      synchronized( sem )
+      {
+        return analysisTime;
+      }
+    }
 	}
 
 	public long getNumberOfCASesProcessed()