You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:52:49 UTC

svn commit: r688173 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main: java/org/apache/uima/adapter/jms/client/ java/org/apache/uima/adapter/jms/message/ resources/

Author: eae
Date: Fri Aug 22 11:52:48 2008
New Revision: 688173

URL: http://svn.apache.org/viewvc?rev=688173&view=rev
Log:
UIMA-1147 commit Jerry's work (patches) merging the post1st branch into the trunk

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Fri Aug 22 11:52:48 2008
@@ -21,6 +21,8 @@
 
 import java.util.List;
 
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.InvalidDestinationException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -119,7 +121,6 @@
 	/**
 	 * Signals any object that waits for the worker thread to initialize
 	 */
-
 	private void signal() {
 		synchronized (this) {
 			this.notifyAll();
@@ -170,32 +171,37 @@
 
 		producer = getMessageProducer();
 		int counter=0;
-///		MessageProducer producer = getMessageProducer();
-
 		// Wait for messages from application threads. The uima ee client engine
-		// will call
-		// doStop() which sets the global flag 'done' to true.
+		// will call doStop() which sets the global flag 'done' to true.
+		PendingMessage pm = null;
 		while (!done) {
-			// First check if there are any pending messages in the shared
-			// 'queue'
-			if (pendingMessageList.size() == 0) {
-				// Block waiting for a message
-				synchronized (pendingMessageList) {
+			synchronized (pendingMessageList) {
+				// First check if there are any pending messages in the shared
+				// 	'queue'
+				while (pendingMessageList.size() == 0) {
+					// Block waiting for a message
 					try {
 						pendingMessageList.wait(0);
 					} catch (InterruptedException e) {
 					}
+					//	Check if the engine is terminating. When the client is stopping
+					//	it will signal 'pendingMessageList'. Check the state of the client
+					//	and break out from the wait loop if the client is stopping
+					if (done) {
+						break; // done in this loop
+					}
 				}
+				// Check if the uima as client is in stopped state. If it is, don't read
+				//	a message from the queue and just break out from the while loop. When
+				//	the client is stopped, the 'pendingMessageList' is signaled but there
+				//	is no message to read. The signal is done to force this thread to
+				//	break out of wait().
+				if (done) {
+					break; // done here
+				}
+				// Remove the oldest message from the shared 'queue'
+				pm = (PendingMessage) pendingMessageList.remove(0);
 			}
-			// The uima ee engine may have decided to stop, so first check to
-			// see if
-			// we should continue since the thread may have slept for a while
-			// (in wait() )
-			if (done) {
-				break; // done here
-			}
-			// Remove the oldest message from the shared 'queue'
-			PendingMessage pm = (PendingMessage) pendingMessageList.remove(0);
 
 			try {
 				//	Request JMS Message from the concrete implementation
@@ -230,8 +236,6 @@
 						}
 						if (  pm.getMessageType() == AsynchAEMessage.Process )
 						{
-							//					ClientRequest cacheEntry = (ClientRequest)
-							//						engine.getCache().get(pm.get(AsynchAEMessage.CasReference));
 							cacheEntry.setCASDepartureTime(System.nanoTime());
 						}
 					}
@@ -296,4 +300,11 @@
 		engine.onException(e, aDestination);
 
 	}
+	/**
+	 * @override
+	 */
+	public MessageProducer getMessageProducer(Destination destination) throws Exception {
+		return null;
+	}
+
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri Aug 22 11:52:48 2008
@@ -25,19 +25,29 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.uima.UIMAFramework;
+import org.apache.uima.UIMA_IllegalArgumentException;
 import org.apache.uima.aae.AsynchAECasManager;
+import org.apache.uima.aae.AsynchAECasManager_impl;
 import org.apache.uima.aae.UIDGenerator;
+import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaSerializer;
 import org.apache.uima.aae.client.UimaASProcessStatusImpl;
 import org.apache.uima.aae.client.UimaASStatusCallbackListener;
@@ -50,6 +60,7 @@
 import org.apache.uima.aae.error.UimaEEServiceException;
 import org.apache.uima.aae.jmx.UimaASClientInfo;
 import org.apache.uima.aae.jmx.UimaASClientInfoMBean;
+import org.apache.uima.aae.jmx.UimaASClientInfo;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.message.PendingMessage;
@@ -57,19 +68,30 @@
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
 import org.apache.uima.collection.CollectionReader;
 import org.apache.uima.collection.EntityProcessStatus;
+import org.apache.uima.resource.CasDefinition;
+import org.apache.uima.resource.Resource;
+import org.apache.uima.resource.ResourceConfigurationException;
 import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceManager;
 import org.apache.uima.resource.ResourceProcessException;
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
 import org.apache.uima.util.Level;
 import org.apache.uima.util.ProcessTrace;
 import org.apache.uima.util.XMLInputSource;
 import org.apache.uima.util.impl.ProcessTrace_impl;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.client.UimaASStatusCallbackListener;
+import org.apache.uima.adapter.jms.JmsConstants;
+import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.aae.controller.Endpoint;
 
 public abstract class BaseUIMAAsynchronousEngineCommon_impl 
 implements UimaAsynchronousEngine, MessageListener
 {
 	private static final Class CLASS_NAME = BaseUIMAAsynchronousEngineCommon_impl.class;
-
+	protected static final String SHADOW_CAS_POOL = "ShadowCasPool";
 	protected static final int MetadataTimeout = 1;
 
 	protected static final int CpCTimeout = 2;
@@ -201,7 +223,7 @@
 		collectionReader = aCollectionReader;
 	}
 		
-	public void collectionProcessingComplete() throws ResourceProcessException
+	public synchronized void collectionProcessingComplete() throws ResourceProcessException
 	{
 		try
 		{
@@ -506,7 +528,7 @@
 	 * Sends a given CAS for analysis to the UIMA EE Service.
 	 * 
 	 */
-	private String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
+	private synchronized String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
 	{
 		String casReferenceId = requestToCache.getCasReferenceId();
 		try
@@ -722,8 +744,31 @@
 		}
 		return exception;
 	}
+	private void handleProcessReplyFromSynchronousCall(ClientRequest cachedRequest, Message message) throws Exception
+	{
+		//	Save reply message in the cache
+		cachedRequest.setMessage(message);
+		//	Signal a thread that we received a reply
+		if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
+		{
+			ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
+			//	Unblock the sending thread so that it can complete processing
+			//	of the reply. The message has been stored in the cache and 
+			//	when the thread wakes up due to notification below, it will
+			//	retrieve the reply and process it.
+			synchronized( threadMonitor.getMonitor() )
+			{
+				threadMonitor.setWasSignaled();
+				cachedRequest.setReceivedProcessCasReply();
+				threadMonitor.getMonitor().notifyAll();
+			}
+		}
+	}
+
+	
 	/**
-	 * Handles response to Process CAS request. If the message originated in a service that is running in a separate jvm (remote), deserialize the CAS and notify the application of the completed analysis via application listener.
+	 * Handles response to Process CAS request. If the message originated in a service that is running in a separate jvm (remote), 
+	 * deserialize the CAS and notify the application of the completed analysis via application listener.
 	 * 
 	 * @param message -
 	 *            jms message containing serialized CAS
@@ -732,63 +777,122 @@
 	 */
 	protected void handleProcessReply(Message message, boolean doNotify, ProcessTrace pt) throws Exception
 	{
+		if ( !running )
+		{
+			return;
+		}
 		int payload = -1;
-		ClientRequest cachedRequest = null;
-		String casReferenceId = null;
+		String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
+		
+		
+		
+		//	Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
+		if (message.propertyExists(AsynchAEMessage.Payload))
+		{
+			payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+		}
+		if (AsynchAEMessage.Exception == payload)
+		{
+			handleException(message, true);
+			return;
+		}
+		//	If the Cas Reference id not in the message check if the message contains an
+		//	exception and if so, handle the exception and return.
+		if ( casReferenceId == null )
+		{
+			return;
+		}
+		
+		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
+				new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
 
-		try
+		//	Fetch entry from the client cache for a cas id returned from the service
+		//	The client cache maintains an entry for every outstanding CAS sent to the
+		//	service.
+		ClientRequest cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
+		if ( cachedRequest != null )
 		{
-			howManyRecvd++;
-			
-			if ( !running )
+			//	Cancel the timer
+			try
 			{
-				return;
+				cancelTimer(casReferenceId);
 			}
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
-					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
+			catch( Exception e) {}
 
-			casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
-			if (casReferenceId != null && !clientCache.containsKey(casReferenceId))
+			//	If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
+			//	sent the CAS and process the reply
+			if ( cachedRequest.isSynchronousInvocation() )
 			{
-				// Most likely expired message. Already handled as timeout. Discard the message and move on to the next
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
-						new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
-				return;
+				handleProcessReplyFromSynchronousCall(cachedRequest, message);
 			}
-			if (message.propertyExists(AsynchAEMessage.Payload))
+			else
 			{
-				payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+				deserializeAndCompleteProcessingReply( casReferenceId, message, cachedRequest, pt, doNotify );
 			}
-			cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
-
-			if (AsynchAEMessage.Exception == payload)
-			{
-				handleException(message, doNotify);
-				if ( !isShutdownException(message))
-				{
-					clientSideJmxStats.incrementProcessErrorCount();
-				}
-				return;
-			}
-			completeProcessingReply( casReferenceId, payload, doNotify,  message, cachedRequest, pt);
 		}
-		catch (Exception e)
+		else if ( message.propertyExists(AsynchAEMessage.InputCasReference) )
 		{
-			e.printStackTrace();
-			throw e;
+			handleProcessReplyFromCasMultiplier(message, casReferenceId, payload);
 		}
-		finally
+		else
 		{
-			removeFromCache(casReferenceId);
-			synchronized (cpcGate)
+			// Most likely expired message. Already handled as timeout. Discard the message and move on to the next
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
+					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
+		}
+	}
+
+	private void handleProcessReplyFromCasMultiplier( Message message, String casReferenceId, int payload /*, ClientRequest inputCasCachedRequest*/) throws Exception
+	{
+		//	Check if the message contains a CAS that was generated by a Cas Multiplier. If so, 
+		//  verify that the message also includes an input CAS id and that such input CAS id
+		//	exists in the client's cache.
+		//	Fetch the input CAS Reference Id from which the CAS being processed was generated from
+		String inputCasReferenceId = 
+			message.getStringProperty(AsynchAEMessage.InputCasReference);
+		//	Fetch an entry from the client cache for a given input CAS id. This would be an id
+		//	of the CAS that the client sent out to the service.
+		ClientRequest inputCasCachedRequest =
+			(ClientRequest)clientCache.get(inputCasReferenceId);
+		if ( inputCasCachedRequest == null )
+		{
+			// Most likely expired message. Already handled as timeout. Discard the message and move on to the next
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
+					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
+			return;
+		}
+		if (inputCasCachedRequest.isSynchronousInvocation()) {
+			handleProcessReplyFromSynchronousCall(inputCasCachedRequest, message);
+		}
+		//	Fetch the destination for Free CAS notification 
+		Destination freeCASNotificationDestination = message.getJMSReplyTo();
+		if ( freeCASNotificationDestination != null ) 
+		{
+			TextMessage msg = createTextMessage();
+			setReleaseCASMessage(msg, casReferenceId);
+			//	Create Message Producer for the Destination 
+			MessageProducer msgProducer = 
+				getMessageProducer(freeCASNotificationDestination);
+			if ( msgProducer != null )
 			{
-				if (howManyRecvd == howManySent)
+				try
 				{
-					cpcGate.notifyAll();
+					//	Send FreeCAS message to a Cas Multiplier
+					msgProducer.send(msg);
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_release_cas_FINEST",
+							new Object[] { freeCASNotificationDestination, message.getStringProperty(AsynchAEMessage.CasReference) });
 				}
+				catch( Exception e) 
+				{
+					e.printStackTrace();
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING", new Object[] {  "Free Cas Temp Destination", e });
+				} 
 			}
 		}
+		CAS cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL );
+		completeProcessingReply(cas, casReferenceId, payload, true, message, inputCasCachedRequest, null);
 	}
+
 	private boolean isShutdownException( Message message ) throws Exception
 	{
 		Exception exception = retrieveExceptionFormMessage(message);
@@ -806,6 +910,10 @@
 	private void handleException( Message message, boolean doNotify )
 	throws Exception
 	{
+		if ( !isShutdownException(message))
+		{
+			clientSideJmxStats.incrementProcessErrorCount();
+		}
 		Exception exception = retrieveExceptionFormMessage(message);
 		receivedCpcReply = true; // change state as if the CPC reply came in. This is done to prevent a hang on CPC request 
 		synchronized(endOfCollectionMonitor)
@@ -839,32 +947,47 @@
 		}
 
 	}
-	private void completeProcessingReply( String casReferenceId, int payload, boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt  )
+	private void completeProcessingReply( CAS cas, String casReferenceId, int payload, boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt  )
 	throws Exception
 	{
 		if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.CASRefID == payload)
 		{
-			//cancelTimer(casReferenceId);
 			if ( pt == null )
 			{
 				pt = new ProcessTrace_impl();
 			}
-			CAS cas=null;
-			try
+			//	Incremente number of replies
+			if ( casReferenceId.equals(cachedRequest.getCasReferenceId()) )
 			{
-				// If the analysis service is remote deserialize the CAS
-				if (remoteService)
+				synchronized(cpcGate)
 				{
-					long t1 = System.nanoTime();
-					cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest);
-					cachedRequest.setDeserializationTime(System.nanoTime() - t1);
+					//	increment number of replies received
+					howManyRecvd++;
+					cpcGate.notifyAll();
 				}
+			}
+			//	Store the total latency for this CAS. The departure time is set right before the CAS
+			//	is sent to a service.
+			cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
+
+			try
+			{
 				//	Log stats and populate ProcessTrace object
 				logTimingInfo(message, pt, cachedRequest);
 				if ( doNotify )
 				{
+					UimaASProcessStatusImpl status;
+					String inputCasReferenceId = 
+						message.getStringProperty(AsynchAEMessage.InputCasReference);
+					if ( inputCasReferenceId != null && inputCasReferenceId.equals( cachedRequest.getCasReferenceId()))
+					{
+						status = new UimaASProcessStatusImpl(pt, casReferenceId, inputCasReferenceId);
+					}
+					else
+					{
+						status = new UimaASProcessStatusImpl(pt, casReferenceId);
+					}
 					//	Add CAS identifier to enable matching replies with requests
-					UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt, casReferenceId);
 					notifyListeners(cas, status, AsynchAEMessage.Process);
 				}
 			}
@@ -878,10 +1001,16 @@
 						cas.release();
 					}
 				}
-
+				removeFromCache(casReferenceId);
+				if (howManyRecvd == howManySent)
+				{
+					synchronized (cpcGate)
+					{
+						cpcGate.notifyAll();
+					}
+				}
 			}
 		}
-
 	}
 	private void logTimingInfo(Message message, ProcessTrace pt, ClientRequest cachedRequest ) throws Exception
 	{
@@ -909,7 +1038,7 @@
 			long timeToSerializeCAS = message.getLongProperty(AsynchAEMessage.TimeToSerializeCAS);
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
 					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Time To Serialize Cas", (float) timeToSerializeCAS / (float) 1000000 });
-			pt.addEvent("UimaEE", "process", "Time To Serialize Cas", (int)timeToSerializeCAS/1000000, "");
+			pt.addEvent("UimaEE", "process", "Time To Serialize Cas", (int)(timeToSerializeCAS/1000000), "");
 			//	Add the client serialization overhead to the value returned from a service
 			timeToSerializeCAS += cachedRequest.getSerializationTime();
 			clientSideJmxStats.incrementTotalSerializationTime(timeToSerializeCAS);
@@ -919,7 +1048,7 @@
 			long timeToDeserializeCAS = message.getLongProperty(AsynchAEMessage.TimeToDeserializeCAS);
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
 					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Time To Deserialize Cas", (float) timeToDeserializeCAS / (float) 1000000 });
-			pt.addEvent("UimaEE", "process", "Time To Deserialize Cas", (int)timeToDeserializeCAS/1000000, "");
+			pt.addEvent("UimaEE", "process", "Time To Deserialize Cas", (int)(timeToDeserializeCAS/1000000), "");
 			//	Add the client deserialization overhead to the value returned from a service
 			timeToDeserializeCAS += cachedRequest.getDeserializationTime();
 			clientSideJmxStats.incrementTotalDeserializationTime(timeToDeserializeCAS);
@@ -929,14 +1058,14 @@
 			long timeWaitingForCAS = message.getLongProperty(AsynchAEMessage.TimeWaitingForCAS);
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
 					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Time to Wait for CAS", (float) timeWaitingForCAS / (float) 1000000 });
-			pt.addEvent("UimaEE", "process", "Time to Wait for CAS", (int)timeWaitingForCAS/1000000, "");
+			pt.addEvent("UimaEE", "process", "Time to Wait for CAS", (int)(timeWaitingForCAS/1000000), "");
 		}
 		if (message.propertyExists(AsynchAEMessage.TimeInService))
 		{
 			long ttimeInService = message.getLongProperty(AsynchAEMessage.TimeInService);
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
 					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Time In Service", (float) ttimeInService / (float) 1000000 });
-			pt.addEvent("UimaEE", "process", "Time In Service", (int)ttimeInService/1000000, "");
+			pt.addEvent("UimaEE", "process", "Time In Service", (int)(ttimeInService/1000000), "");
 
 		}
 		if (message.propertyExists(AsynchAEMessage.TotalTimeSpentInAnalytic))
@@ -944,7 +1073,7 @@
 			long totaltimeInService = message.getLongProperty(AsynchAEMessage.TotalTimeSpentInAnalytic);
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
 					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Total Time In Service", (float) totaltimeInService / (float) 1000000 });
-			pt.addEvent("UimaEE", "process", "Total Time In Service", (int)totaltimeInService/1000000, "");
+			pt.addEvent("UimaEE", "process", "Total Time In Service", (int)(totaltimeInService/1000000), "");
 		}
 		if (message.propertyExists(AsynchAEMessage.TimeInProcessCAS))
 		{
@@ -960,7 +1089,7 @@
 			long totalIdletime = message.getLongProperty(AsynchAEMessage.IdleTime);
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
 					new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Idle Time Waiting For CAS", (float) totalIdletime / (float) 1000000 });
-			pt.addEvent("UimaEE", "process", "Idle Time Waiting For CAS", (int)totalIdletime/1000000, "");
+			pt.addEvent("UimaEE", "process", "Idle Time Waiting For CAS", (int)(totalIdletime/1000000), "");
 			clientSideJmxStats.incrementTotalIdleTime(totalIdletime);
 		}
 		
@@ -978,35 +1107,23 @@
 			clientCache.remove(aCasReferenceId);
 		}
 	}
+	private CAS deserialize(String aSerializedCAS, CAS aCAS ) throws Exception
+	{
+		XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+		UimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true, -1);
+		return aCAS;
+	}
+
 	protected CAS deserializeCAS(String aSerializedCAS, ClientRequest cachedRequest) throws Exception
 	{
 		CAS cas = cachedRequest.getCAS();
-		XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
-		UimaSerializer.deserializeCasFromXmi(aSerializedCAS, cas, deserSharedData, true, -1);
-		return cas;
+		return deserialize(aSerializedCAS, cas);
 	}
-	
-	protected CAS deserializeCAS(String aSerializedCAS) throws Exception
+	protected CAS deserializeCAS(String aSerializedCAS, String aCasPoolName) throws Exception
 	{
-		CAS cas;
-
-		synchronized (sendAndReceiveCasMonitor)
-		{
-			if (sendAndReceiveCAS != null)
-			{
-				cas = sendAndReceiveCAS;
-			}
-			else
-			{
-				cas = getCAS();
-			}
-		}
-
-		XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
-		UimaSerializer.deserializeCasFromXmi(aSerializedCAS, cas, deserSharedData, true, -1);
-		return cas;
+		CAS cas = asynchManager.getNewCas(aCasPoolName);
+		return deserialize(aSerializedCAS, cas);
 	}
-
 	/**
 	 * Listener method receiving JMS Messages from the response queue.
 	 * 
@@ -1021,6 +1138,8 @@
 			{
 			  return;
 			}
+			
+			
 			int command = message.getIntProperty(AsynchAEMessage.Command);
 			if (AsynchAEMessage.CollectionProcessComplete == command)
 			{
@@ -1035,75 +1154,7 @@
 			else if (AsynchAEMessage.Process == command)
 			{
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
-				
-				String casReferenceId = 
-					message.getStringProperty(AsynchAEMessage.CasReference);
-				
-				if ( casReferenceId == null )
-				{
-					int payload;
-					if (message.propertyExists(AsynchAEMessage.Payload))
-					{
-						payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
-						if (AsynchAEMessage.Exception == payload)
-						{
-							if ( !isShutdownException(message))
-							{
-								clientSideJmxStats.incrementProcessErrorCount();
-							}
-							handleException(message, true);
-						}
-					}
-	
-					return;
-				}
-				
-				try
-				{
-					cancelTimer(casReferenceId);
-				}
-				catch( Exception e) {}
-
-				ClientRequest cachedRequest =
-					(ClientRequest)clientCache.get(casReferenceId);
-
-				if ( cachedRequest == null )
-				{
-					// Most likely expired message. Already handled as timeout. Discard the message and move on to the next
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
-							new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
-					return;
-				}
-				//	Store the total latency for this CAS. The departure time is set right before the CAS
-				//	is sent to a service.
-				cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
-				if ( cachedRequest.isSynchronousInvocation() )
-				{
-					//	Save reply message in the cache
-					cachedRequest.setMessage(message);
-					//	Signal a thread that we received a reply
-					if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
-					{
-						ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
-						if ( threadMonitor != null && threadMonitor.getMonitor() != null )
-						{
-							//	Unblock the sending thread so that it can complete processing
-							//	of the reply. The message has been stored in the cache and 
-							//	when the thread wakes up due to notification below, it will
-							//	retrieve the reply and process it.
-							synchronized( threadMonitor.getMonitor() )
-							{
-								threadMonitor.setWasSignaled();
-								threadMonitor.getMonitor().notifyAll();
-							}
-						}
-					}
-				}
-				else
-				{
-					//	Asynchronous invocation - use notification
-					handleProcessReply(message, true, null);
-				}
+				handleProcessReply(message, true, null);
 			}
 		}
 		catch (Exception e)
@@ -1111,6 +1162,7 @@
 			e.printStackTrace();
 		}
 	}
+
 	/**
 	 * Gets the ProcessingResourceMetadata for the asynchronous AnalysisEngine.
 	 */
@@ -1122,12 +1174,13 @@
 	 * This is a synchronous method which sends a message to a destination and blocks waiting for
 	 * reply. 
 	 */
-	public void sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException
+	public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException
 	{
 		if ( !running )
 		{
 			throw new ResourceProcessException( new Exception("Uima EE Client Not In Running State"));
 		}
+		String casReferenceId = null;
 			// keep handle to CAS, we'll deserialize into this same CAS later
 			sendAndReceiveCAS = aCAS;
 			
@@ -1146,24 +1199,24 @@
 			ClientRequest cachedRequest = produceNewClientRequestObject();
 			cachedRequest.setSynchronousInvocation();
 			// send CAS. This call does not block. Instead we will block the sending thread below.
-			sendCAS(aCAS, cachedRequest);
+			casReferenceId = sendCAS(aCAS, cachedRequest);
 			if ( threadMonitor != null && threadMonitor.getMonitor() != null)
 			{
-				//	Block here
-				synchronized (threadMonitor.getMonitor())
+			//	Block here waiting for reply
+			synchronized (threadMonitor.getMonitor())
+			{
+				//	Block sending thread until a reply is received
+				while (!threadMonitor.wasSignaled && running)
 				{
-					//	Block sending thread until a reply is received
-					while (!threadMonitor.wasSignaled && running)
+					try
+					{
+						threadMonitor.getMonitor().wait();
+					}
+					catch (InterruptedException e)
 					{
-						try
-						{
-							threadMonitor.getMonitor().wait();
-						}
-						catch (InterruptedException e)
-						{
-						}
 					}
 				}
+ }
 			}
 			try
 			{
@@ -1173,7 +1226,7 @@
 				}
 				//	Process reply in the sending thread
 				Message message = cachedRequest.getMessage();
-				handleProcessReply(message, false, pt);
+				deserializeAndCompleteProcessingReply( casReferenceId, message, cachedRequest, pt, false );
 			}
 			catch( ResourceProcessException rpe )
 			{
@@ -1187,10 +1240,27 @@
 			{
 				threadMonitor.reset();
 			}
+			return casReferenceId;
+	}
+	private void deserializeAndCompleteProcessingReply( String casReferenceId, Message message, ClientRequest cachedRequest, ProcessTrace pt, boolean doNotify ) throws Exception
+	{
+		int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+		if ( message.propertyExists(AsynchAEMessage.CasSequence) )
+		{
+			handleProcessReplyFromCasMultiplier( message, casReferenceId, payload);//, cachedRequest);
+		}
+		else
+		{
+			long t1 = System.nanoTime();
+			CAS cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest);
+			cachedRequest.setDeserializationTime(System.nanoTime() - t1);
+			completeProcessingReply( cas, casReferenceId, payload, doNotify,  message, cachedRequest, pt);
+		}
 	}
-	public void sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException
+
+	public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException
 	{
-		sendAndReceiveCAS( aCAS, null );
+		return sendAndReceiveCAS( aCAS, null );
 	}
 
 	protected void notifyOnTimout(CAS aCAS, String anEndpoint, int aTimeoutKind, String casReferenceId)
@@ -1242,18 +1312,19 @@
 
 		  if ( cachedRequest.isSynchronousInvocation() )
 		  {
-			  //  Signal a thread that we received a reply, if in the map
-			  if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
-			  {
-				  ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
-				  //  	Unblock the sending thread so that it can complete processing with an error
-				  synchronized( threadMonitor.getMonitor() )
-				  {
-					  threadMonitor.setWasSignaled();
-					  threadMonitor.getMonitor().notifyAll();
-				  }
-			  }
-		  }
+        //  Signal a thread that we received a reply, if in the map
+        if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
+        {
+          ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
+          //  Unblock the sending thread so that it can complete processing with an error
+          synchronized( threadMonitor.getMonitor() )
+          {
+            threadMonitor.setWasSignaled();
+            cachedRequest.setReceivedProcessCasReply(); // should not be needed
+            threadMonitor.getMonitor().notifyAll();
+          }
+        }
+      }
 		  else 
 		  {
 			  // notify the application listener with the error
@@ -1275,7 +1346,13 @@
 		  break;
 		}  // case
  	}
-
+	/**
+	 * @override
+	 */
+	protected MessageProducer getMessageProducer( Destination destination ) throws Exception
+	{
+		return null;
+	}
 	public class ClientRequest
 	{
 		private Timer timer = null;
@@ -1304,7 +1381,7 @@
 
 		private String endpoint;
 
-//		private boolean receivedProcessCasReply = false;
+		private boolean receivedProcessCasReply = false;
 		
 		private long threadId=-1;
 		
@@ -1407,6 +1484,10 @@
 		{
 			return threadId;
 		}
+		public void setReceivedProcessCasReply()
+		{
+			receivedProcessCasReply = true;
+		}
 		public void setMetadataTimeout( int aTimeout )
 		{
 			metadataTimeout = aTimeout;
@@ -1494,9 +1575,6 @@
 						}
 					}
 				
-          //  TODO: This needs to be done elsewhere
-          //removeEntry(casReferenceId);
-          
 					int timeOutKind;
 					if (isMetaRequest())
 					{
@@ -1515,6 +1593,7 @@
 					{
 						timeOutKind = CpCTimeout;
 						receivedCpcReply = true;// not really but simulate receving the meta so that we unblock the monitor
+						
 						synchronized( cpcGate )
 						{
 							cpcGate.notifyAll();
@@ -1637,4 +1716,12 @@
 		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "onException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING", new Object[] {  aDestination, aFailure });
 		stop();
 	}
+	/**
+	 * @override
+	 */
+	protected void setReleaseCASMessage(TextMessage msg, String aCasReferenceId)
+	throws Exception 
+	{
+	}
+
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java Fri Aug 22 11:52:48 2008
@@ -19,6 +19,7 @@
 
 package org.apache.uima.adapter.jms.client;
 
+import javax.jms.Destination;
 import javax.jms.MessageProducer;
 import javax.jms.TextMessage;
 
@@ -33,4 +34,7 @@
 	public TextMessage createTextMessage() throws Exception;
 
 	public void doStop();
+	
+	public MessageProducer getMessageProducer(Destination destination) throws Exception;
+
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java Fri Aug 22 11:52:48 2008
@@ -51,6 +51,42 @@
 		
 	}
 
+	public JmsMessageContext(Message aMessage, String anEndpointName) throws AsynchAEException
+	{
+		this();
+		endpointName = anEndpointName;
+		message = aMessage;
+		try
+		{
+			String msgFrom = (String)aMessage.getStringProperty(AsynchAEMessage.MessageFrom); 
+			if ( msgFrom != null )
+			{
+				endpoint.setEndpoint( msgFrom);
+			}
+			if ( aMessage.getJMSReplyTo() != null )
+			{
+				endpoint.setDestination(aMessage.getJMSReplyTo());
+			}
+			if ( aMessage.propertyExists(UIMAMessage.ServerURI) )
+			{
+				String selectedServerURI = chooseServerURI(aMessage.getStringProperty(UIMAMessage.ServerURI));
+				endpoint.setServerURI(selectedServerURI);
+				endpoint.setRemote(endpoint.getServerURI().startsWith("vm")==false);
+			}
+			//	Check if the client attached a special property that needs to be echoed back.
+			//	This enables the client to match the reply with the endpoint.
+			if ( aMessage.propertyExists(AsynchAEMessage.EndpointServer))
+			{
+				endpoint.setRemote(true);
+				endpoint.setEndpointServer(aMessage.getStringProperty(AsynchAEMessage.EndpointServer));
+			}
+		
+		}
+		catch( Exception e)
+		{
+			throw new AsynchAEException(e);
+		}
+	}
 	public String getEndpointName()
 	{
 		return endpointName;
@@ -85,60 +121,6 @@
 		
 		return serverURI;
 	}
-	public JmsMessageContext(Message aMessage, String anEndpointName) throws AsynchAEException
-	{
-		this();
-		endpointName = anEndpointName;
-		message = aMessage;
-		try
-		{
-			String msgFrom = (String)aMessage.getStringProperty(AsynchAEMessage.MessageFrom); 
-			if ( msgFrom != null )
-			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                    "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_message_from__FINE",
-	                    new Object[] { msgFrom, aMessage.getStringProperty(UIMAMessage.ServerURI) });
-				endpoint.setEndpoint( msgFrom);
-			}
-			else
-			{
-				//	Undefined sender of the message. This may be ok.
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-	                    "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_reply_queue_not_defined__WARNING");
-			}
-			if ( aMessage.getJMSReplyTo() != null )
-			{
-				endpoint.setDestination(aMessage.getJMSReplyTo());
-			}
-			if ( aMessage.getStringProperty(UIMAMessage.ServerURI) != null )
-			{
-				
-				String selectedServerURI = chooseServerURI(aMessage.getStringProperty(UIMAMessage.ServerURI));
-				
-				endpoint.setServerURI(selectedServerURI);
-				endpoint.setRemote(endpoint.getServerURI().startsWith("vm")==false);
-			}
-			else if ( aMessage.getIntProperty(AsynchAEMessage.MessageType) != AsynchAEMessage.Response)
-			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-	                    "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_reply_queue_server_not_defined__WARNING");
-			}
-//			if ( aMessage.getBooleanProperty(AsynchAEMessage.RemoveEndpoint))
-//			{
-//endpoint.setRemove(true);
-//System.out.println("Remove Endpoint is set:"+endpoint.remove());
-//			}
-//			else
-//			{
-//				System.out.println("Remove Endpoint is not set");				
-//				
-//			}
-		}
-		catch( Exception e)
-		{
-			throw new AsynchAEException(e);
-		}
-	}
 	public boolean propertyExists(String aKey) throws AsynchAEException
 	{
 		try

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Fri Aug 22 11:52:48 2008
@@ -147,3 +147,5 @@
 UIMAJMS_awaiting_container_init__INFO = Uima EE Client Blocking - Awaiting Top Level Controller Initialization Notification
 UIMAJMS_container_init_exception__WARNING = Top Level Controller Initialization Exception. Cause: {0} 
 UIMAJMS_terminate_service_dueto_bad_broker__WARNING = Controller: {0} Unable To Establish Connection To Broker: {1} - Stopping Service
+UIMAJMS_client_sending_release_cas_FINEST = Client Sent Free Cas Notification to Destination: {0} For Cas: {1}
+UIMAJMS_activated_fcq__CONFIG = >>>> Cas Multiplier Controller: {0} Activated Listener To Receive Free CAS Notifications From Clients - Temp Queue Name: {1}