You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:52:49 UTC
svn commit: r688173 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main:
java/org/apache/uima/adapter/jms/client/
java/org/apache/uima/adapter/jms/message/ resources/
Author: eae
Date: Fri Aug 22 11:52:48 2008
New Revision: 688173
URL: http://svn.apache.org/viewvc?rev=688173&view=rev
Log:
UIMA-1147 commit Jerry's work (patches) merging the post1st branch into the trunk
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Fri Aug 22 11:52:48 2008
@@ -21,6 +21,8 @@
import java.util.List;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.Message;
import javax.jms.MessageProducer;
@@ -119,7 +121,6 @@
/**
* Signals any object that waits for the worker thread to initialize
*/
-
private void signal() {
synchronized (this) {
this.notifyAll();
@@ -170,32 +171,37 @@
producer = getMessageProducer();
int counter=0;
-/// MessageProducer producer = getMessageProducer();
-
// Wait for messages from application threads. The uima ee client engine
- // will call
- // doStop() which sets the global flag 'done' to true.
+ // will call doStop() which sets the global flag 'done' to true.
+ PendingMessage pm = null;
while (!done) {
- // First check if there are any pending messages in the shared
- // 'queue'
- if (pendingMessageList.size() == 0) {
- // Block waiting for a message
- synchronized (pendingMessageList) {
+ synchronized (pendingMessageList) {
+ // First check if there are any pending messages in the shared
+ // 'queue'
+ while (pendingMessageList.size() == 0) {
+ // Block waiting for a message
try {
pendingMessageList.wait(0);
} catch (InterruptedException e) {
}
+ // Check if the engine is terminating. When the client is stopping
+ // it will signal 'pendingMessageList'. Check the state of the client
+ // and break out from the wait loop if the client is stopping
+ if (done) {
+ break; // done in this loop
+ }
}
+ // Check if the uima as client is in stopped state. If it is, don't read
+ // a message from the queue and just break out from the while loop. When
+ // the client is stopped, the 'pendingMessageList' is signaled but there
+ // is no message to read. The signal is done to force this thread to
+ // break out of wait().
+ if (done) {
+ break; // done here
+ }
+ // Remove the oldest message from the shared 'queue'
+ pm = (PendingMessage) pendingMessageList.remove(0);
}
- // The uima ee engine may have decided to stop, so first check to
- // see if
- // we should continue since the thread may have slept for a while
- // (in wait() )
- if (done) {
- break; // done here
- }
- // Remove the oldest message from the shared 'queue'
- PendingMessage pm = (PendingMessage) pendingMessageList.remove(0);
try {
// Request JMS Message from the concrete implementation
@@ -230,8 +236,6 @@
}
if ( pm.getMessageType() == AsynchAEMessage.Process )
{
- // ClientRequest cacheEntry = (ClientRequest)
- // engine.getCache().get(pm.get(AsynchAEMessage.CasReference));
cacheEntry.setCASDepartureTime(System.nanoTime());
}
}
@@ -296,4 +300,11 @@
engine.onException(e, aDestination);
}
+ /**
+ * @override
+ */
+ public MessageProducer getMessageProducer(Destination destination) throws Exception {
+ return null;
+ }
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri Aug 22 11:52:48 2008
@@ -25,19 +25,29 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.uima.UIMAFramework;
+import org.apache.uima.UIMA_IllegalArgumentException;
import org.apache.uima.aae.AsynchAECasManager;
+import org.apache.uima.aae.AsynchAECasManager_impl;
import org.apache.uima.aae.UIDGenerator;
+import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaSerializer;
import org.apache.uima.aae.client.UimaASProcessStatusImpl;
import org.apache.uima.aae.client.UimaASStatusCallbackListener;
@@ -50,6 +60,7 @@
import org.apache.uima.aae.error.UimaEEServiceException;
import org.apache.uima.aae.jmx.UimaASClientInfo;
import org.apache.uima.aae.jmx.UimaASClientInfoMBean;
+import org.apache.uima.aae.jmx.UimaASClientInfo;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.message.PendingMessage;
@@ -57,19 +68,30 @@
import org.apache.uima.cas.impl.XmiSerializationSharedData;
import org.apache.uima.collection.CollectionReader;
import org.apache.uima.collection.EntityProcessStatus;
+import org.apache.uima.resource.CasDefinition;
+import org.apache.uima.resource.Resource;
+import org.apache.uima.resource.ResourceConfigurationException;
import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.util.Level;
import org.apache.uima.util.ProcessTrace;
import org.apache.uima.util.XMLInputSource;
import org.apache.uima.util.impl.ProcessTrace_impl;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.client.UimaASStatusCallbackListener;
+import org.apache.uima.adapter.jms.JmsConstants;
+import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.aae.controller.Endpoint;
public abstract class BaseUIMAAsynchronousEngineCommon_impl
implements UimaAsynchronousEngine, MessageListener
{
private static final Class CLASS_NAME = BaseUIMAAsynchronousEngineCommon_impl.class;
-
+ protected static final String SHADOW_CAS_POOL = "ShadowCasPool";
protected static final int MetadataTimeout = 1;
protected static final int CpCTimeout = 2;
@@ -201,7 +223,7 @@
collectionReader = aCollectionReader;
}
- public void collectionProcessingComplete() throws ResourceProcessException
+ public synchronized void collectionProcessingComplete() throws ResourceProcessException
{
try
{
@@ -506,7 +528,7 @@
* Sends a given CAS for analysis to the UIMA EE Service.
*
*/
- private String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
+ private synchronized String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
{
String casReferenceId = requestToCache.getCasReferenceId();
try
@@ -722,8 +744,31 @@
}
return exception;
}
+ private void handleProcessReplyFromSynchronousCall(ClientRequest cachedRequest, Message message) throws Exception
+ {
+ // Save reply message in the cache
+ cachedRequest.setMessage(message);
+ // Signal a thread that we received a reply
+ if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
+ {
+ ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
+ // Unblock the sending thread so that it can complete processing
+ // of the reply. The message has been stored in the cache and
+ // when the thread wakes up due to notification below, it will
+ // retrieve the reply and process it.
+ synchronized( threadMonitor.getMonitor() )
+ {
+ threadMonitor.setWasSignaled();
+ cachedRequest.setReceivedProcessCasReply();
+ threadMonitor.getMonitor().notifyAll();
+ }
+ }
+ }
+
+
/**
- * Handles response to Process CAS request. If the message originated in a service that is running in a separate jvm (remote), deserialize the CAS and notify the application of the completed analysis via application listener.
+ * Handles response to Process CAS request. If the message originated in a service that is running in a separate jvm (remote),
+ * deserialize the CAS and notify the application of the completed analysis via application listener.
*
* @param message -
* jms message containing serialized CAS
@@ -732,63 +777,122 @@
*/
protected void handleProcessReply(Message message, boolean doNotify, ProcessTrace pt) throws Exception
{
+ if ( !running )
+ {
+ return;
+ }
int payload = -1;
- ClientRequest cachedRequest = null;
- String casReferenceId = null;
+ String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
+
+
+
+ // Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
+ if (message.propertyExists(AsynchAEMessage.Payload))
+ {
+ payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ }
+ if (AsynchAEMessage.Exception == payload)
+ {
+ handleException(message, true);
+ return;
+ }
+ // If the Cas Reference id not in the message check if the message contains an
+ // exception and if so, handle the exception and return.
+ if ( casReferenceId == null )
+ {
+ return;
+ }
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
- try
+ // Fetch entry from the client cache for a cas id returned from the service
+ // The client cache maintains an entry for every outstanding CAS sent to the
+ // service.
+ ClientRequest cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
+ if ( cachedRequest != null )
{
- howManyRecvd++;
-
- if ( !running )
+ // Cancel the timer
+ try
{
- return;
+ cancelTimer(casReferenceId);
}
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
+ catch( Exception e) {}
- casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
- if (casReferenceId != null && !clientCache.containsKey(casReferenceId))
+ // If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
+ // sent the CAS and process the reply
+ if ( cachedRequest.isSynchronousInvocation() )
{
- // Most likely expired message. Already handled as timeout. Discard the message and move on to the next
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
- return;
+ handleProcessReplyFromSynchronousCall(cachedRequest, message);
}
- if (message.propertyExists(AsynchAEMessage.Payload))
+ else
{
- payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ deserializeAndCompleteProcessingReply( casReferenceId, message, cachedRequest, pt, doNotify );
}
- cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
-
- if (AsynchAEMessage.Exception == payload)
- {
- handleException(message, doNotify);
- if ( !isShutdownException(message))
- {
- clientSideJmxStats.incrementProcessErrorCount();
- }
- return;
- }
- completeProcessingReply( casReferenceId, payload, doNotify, message, cachedRequest, pt);
}
- catch (Exception e)
+ else if ( message.propertyExists(AsynchAEMessage.InputCasReference) )
{
- e.printStackTrace();
- throw e;
+ handleProcessReplyFromCasMultiplier(message, casReferenceId, payload);
}
- finally
+ else
{
- removeFromCache(casReferenceId);
- synchronized (cpcGate)
+ // Most likely expired message. Already handled as timeout. Discard the message and move on to the next
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
+ }
+ }
+
+ private void handleProcessReplyFromCasMultiplier( Message message, String casReferenceId, int payload /*, ClientRequest inputCasCachedRequest*/) throws Exception
+ {
+ // Check if the message contains a CAS that was generated by a Cas Multiplier. If so,
+ // verify that the message also includes an input CAS id and that such input CAS id
+ // exists in the client's cache.
+ // Fetch the input CAS Reference Id from which the CAS being processed was generated from
+ String inputCasReferenceId =
+ message.getStringProperty(AsynchAEMessage.InputCasReference);
+ // Fetch an entry from the client cache for a given input CAS id. This would be an id
+ // of the CAS that the client sent out to the service.
+ ClientRequest inputCasCachedRequest =
+ (ClientRequest)clientCache.get(inputCasReferenceId);
+ if ( inputCasCachedRequest == null )
+ {
+ // Most likely expired message. Already handled as timeout. Discard the message and move on to the next
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
+ return;
+ }
+ if (inputCasCachedRequest.isSynchronousInvocation()) {
+ handleProcessReplyFromSynchronousCall(inputCasCachedRequest, message);
+ }
+ // Fetch the destination for Free CAS notification
+ Destination freeCASNotificationDestination = message.getJMSReplyTo();
+ if ( freeCASNotificationDestination != null )
+ {
+ TextMessage msg = createTextMessage();
+ setReleaseCASMessage(msg, casReferenceId);
+ // Create Message Producer for the Destination
+ MessageProducer msgProducer =
+ getMessageProducer(freeCASNotificationDestination);
+ if ( msgProducer != null )
{
- if (howManyRecvd == howManySent)
+ try
{
- cpcGate.notifyAll();
+ // Send FreeCAS message to a Cas Multiplier
+ msgProducer.send(msg);
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_release_cas_FINEST",
+ new Object[] { freeCASNotificationDestination, message.getStringProperty(AsynchAEMessage.CasReference) });
}
+ catch( Exception e)
+ {
+ e.printStackTrace();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING", new Object[] { "Free Cas Temp Destination", e });
+ }
}
}
+ CAS cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL );
+ completeProcessingReply(cas, casReferenceId, payload, true, message, inputCasCachedRequest, null);
}
+
private boolean isShutdownException( Message message ) throws Exception
{
Exception exception = retrieveExceptionFormMessage(message);
@@ -806,6 +910,10 @@
private void handleException( Message message, boolean doNotify )
throws Exception
{
+ if ( !isShutdownException(message))
+ {
+ clientSideJmxStats.incrementProcessErrorCount();
+ }
Exception exception = retrieveExceptionFormMessage(message);
receivedCpcReply = true; // change state as if the CPC reply came in. This is done to prevent a hang on CPC request
synchronized(endOfCollectionMonitor)
@@ -839,32 +947,47 @@
}
}
- private void completeProcessingReply( String casReferenceId, int payload, boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt )
+ private void completeProcessingReply( CAS cas, String casReferenceId, int payload, boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt )
throws Exception
{
if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.CASRefID == payload)
{
- //cancelTimer(casReferenceId);
if ( pt == null )
{
pt = new ProcessTrace_impl();
}
- CAS cas=null;
- try
+ // Incremente number of replies
+ if ( casReferenceId.equals(cachedRequest.getCasReferenceId()) )
{
- // If the analysis service is remote deserialize the CAS
- if (remoteService)
+ synchronized(cpcGate)
{
- long t1 = System.nanoTime();
- cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest);
- cachedRequest.setDeserializationTime(System.nanoTime() - t1);
+ // increment number of replies received
+ howManyRecvd++;
+ cpcGate.notifyAll();
}
+ }
+ // Store the total latency for this CAS. The departure time is set right before the CAS
+ // is sent to a service.
+ cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
+
+ try
+ {
// Log stats and populate ProcessTrace object
logTimingInfo(message, pt, cachedRequest);
if ( doNotify )
{
+ UimaASProcessStatusImpl status;
+ String inputCasReferenceId =
+ message.getStringProperty(AsynchAEMessage.InputCasReference);
+ if ( inputCasReferenceId != null && inputCasReferenceId.equals( cachedRequest.getCasReferenceId()))
+ {
+ status = new UimaASProcessStatusImpl(pt, casReferenceId, inputCasReferenceId);
+ }
+ else
+ {
+ status = new UimaASProcessStatusImpl(pt, casReferenceId);
+ }
// Add CAS identifier to enable matching replies with requests
- UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt, casReferenceId);
notifyListeners(cas, status, AsynchAEMessage.Process);
}
}
@@ -878,10 +1001,16 @@
cas.release();
}
}
-
+ removeFromCache(casReferenceId);
+ if (howManyRecvd == howManySent)
+ {
+ synchronized (cpcGate)
+ {
+ cpcGate.notifyAll();
+ }
+ }
}
}
-
}
private void logTimingInfo(Message message, ProcessTrace pt, ClientRequest cachedRequest ) throws Exception
{
@@ -909,7 +1038,7 @@
long timeToSerializeCAS = message.getLongProperty(AsynchAEMessage.TimeToSerializeCAS);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Time To Serialize Cas", (float) timeToSerializeCAS / (float) 1000000 });
- pt.addEvent("UimaEE", "process", "Time To Serialize Cas", (int)timeToSerializeCAS/1000000, "");
+ pt.addEvent("UimaEE", "process", "Time To Serialize Cas", (int)(timeToSerializeCAS/1000000), "");
// Add the client serialization overhead to the value returned from a service
timeToSerializeCAS += cachedRequest.getSerializationTime();
clientSideJmxStats.incrementTotalSerializationTime(timeToSerializeCAS);
@@ -919,7 +1048,7 @@
long timeToDeserializeCAS = message.getLongProperty(AsynchAEMessage.TimeToDeserializeCAS);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Time To Deserialize Cas", (float) timeToDeserializeCAS / (float) 1000000 });
- pt.addEvent("UimaEE", "process", "Time To Deserialize Cas", (int)timeToDeserializeCAS/1000000, "");
+ pt.addEvent("UimaEE", "process", "Time To Deserialize Cas", (int)(timeToDeserializeCAS/1000000), "");
// Add the client deserialization overhead to the value returned from a service
timeToDeserializeCAS += cachedRequest.getDeserializationTime();
clientSideJmxStats.incrementTotalDeserializationTime(timeToDeserializeCAS);
@@ -929,14 +1058,14 @@
long timeWaitingForCAS = message.getLongProperty(AsynchAEMessage.TimeWaitingForCAS);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Time to Wait for CAS", (float) timeWaitingForCAS / (float) 1000000 });
- pt.addEvent("UimaEE", "process", "Time to Wait for CAS", (int)timeWaitingForCAS/1000000, "");
+ pt.addEvent("UimaEE", "process", "Time to Wait for CAS", (int)(timeWaitingForCAS/1000000), "");
}
if (message.propertyExists(AsynchAEMessage.TimeInService))
{
long ttimeInService = message.getLongProperty(AsynchAEMessage.TimeInService);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Time In Service", (float) ttimeInService / (float) 1000000 });
- pt.addEvent("UimaEE", "process", "Time In Service", (int)ttimeInService/1000000, "");
+ pt.addEvent("UimaEE", "process", "Time In Service", (int)(ttimeInService/1000000), "");
}
if (message.propertyExists(AsynchAEMessage.TotalTimeSpentInAnalytic))
@@ -944,7 +1073,7 @@
long totaltimeInService = message.getLongProperty(AsynchAEMessage.TotalTimeSpentInAnalytic);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Total Time In Service", (float) totaltimeInService / (float) 1000000 });
- pt.addEvent("UimaEE", "process", "Total Time In Service", (int)totaltimeInService/1000000, "");
+ pt.addEvent("UimaEE", "process", "Total Time In Service", (int)(totaltimeInService/1000000), "");
}
if (message.propertyExists(AsynchAEMessage.TimeInProcessCAS))
{
@@ -960,7 +1089,7 @@
long totalIdletime = message.getLongProperty(AsynchAEMessage.IdleTime);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), "Idle Time Waiting For CAS", (float) totalIdletime / (float) 1000000 });
- pt.addEvent("UimaEE", "process", "Idle Time Waiting For CAS", (int)totalIdletime/1000000, "");
+ pt.addEvent("UimaEE", "process", "Idle Time Waiting For CAS", (int)(totalIdletime/1000000), "");
clientSideJmxStats.incrementTotalIdleTime(totalIdletime);
}
@@ -978,35 +1107,23 @@
clientCache.remove(aCasReferenceId);
}
}
+ private CAS deserialize(String aSerializedCAS, CAS aCAS ) throws Exception
+ {
+ XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+ UimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true, -1);
+ return aCAS;
+ }
+
protected CAS deserializeCAS(String aSerializedCAS, ClientRequest cachedRequest) throws Exception
{
CAS cas = cachedRequest.getCAS();
- XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
- UimaSerializer.deserializeCasFromXmi(aSerializedCAS, cas, deserSharedData, true, -1);
- return cas;
+ return deserialize(aSerializedCAS, cas);
}
-
- protected CAS deserializeCAS(String aSerializedCAS) throws Exception
+ protected CAS deserializeCAS(String aSerializedCAS, String aCasPoolName) throws Exception
{
- CAS cas;
-
- synchronized (sendAndReceiveCasMonitor)
- {
- if (sendAndReceiveCAS != null)
- {
- cas = sendAndReceiveCAS;
- }
- else
- {
- cas = getCAS();
- }
- }
-
- XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
- UimaSerializer.deserializeCasFromXmi(aSerializedCAS, cas, deserSharedData, true, -1);
- return cas;
+ CAS cas = asynchManager.getNewCas(aCasPoolName);
+ return deserialize(aSerializedCAS, cas);
}
-
/**
* Listener method receiving JMS Messages from the response queue.
*
@@ -1021,6 +1138,8 @@
{
return;
}
+
+
int command = message.getIntProperty(AsynchAEMessage.Command);
if (AsynchAEMessage.CollectionProcessComplete == command)
{
@@ -1035,75 +1154,7 @@
else if (AsynchAEMessage.Process == command)
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
-
- String casReferenceId =
- message.getStringProperty(AsynchAEMessage.CasReference);
-
- if ( casReferenceId == null )
- {
- int payload;
- if (message.propertyExists(AsynchAEMessage.Payload))
- {
- payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
- if (AsynchAEMessage.Exception == payload)
- {
- if ( !isShutdownException(message))
- {
- clientSideJmxStats.incrementProcessErrorCount();
- }
- handleException(message, true);
- }
- }
-
- return;
- }
-
- try
- {
- cancelTimer(casReferenceId);
- }
- catch( Exception e) {}
-
- ClientRequest cachedRequest =
- (ClientRequest)clientCache.get(casReferenceId);
-
- if ( cachedRequest == null )
- {
- // Most likely expired message. Already handled as timeout. Discard the message and move on to the next
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
- return;
- }
- // Store the total latency for this CAS. The departure time is set right before the CAS
- // is sent to a service.
- cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
- if ( cachedRequest.isSynchronousInvocation() )
- {
- // Save reply message in the cache
- cachedRequest.setMessage(message);
- // Signal a thread that we received a reply
- if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
- {
- ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
- if ( threadMonitor != null && threadMonitor.getMonitor() != null )
- {
- // Unblock the sending thread so that it can complete processing
- // of the reply. The message has been stored in the cache and
- // when the thread wakes up due to notification below, it will
- // retrieve the reply and process it.
- synchronized( threadMonitor.getMonitor() )
- {
- threadMonitor.setWasSignaled();
- threadMonitor.getMonitor().notifyAll();
- }
- }
- }
- }
- else
- {
- // Asynchronous invocation - use notification
- handleProcessReply(message, true, null);
- }
+ handleProcessReply(message, true, null);
}
}
catch (Exception e)
@@ -1111,6 +1162,7 @@
e.printStackTrace();
}
}
+
/**
* Gets the ProcessingResourceMetadata for the asynchronous AnalysisEngine.
*/
@@ -1122,12 +1174,13 @@
* This is a synchronous method which sends a message to a destination and blocks waiting for
* reply.
*/
- public void sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException
+ public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException
{
if ( !running )
{
throw new ResourceProcessException( new Exception("Uima EE Client Not In Running State"));
}
+ String casReferenceId = null;
// keep handle to CAS, we'll deserialize into this same CAS later
sendAndReceiveCAS = aCAS;
@@ -1146,24 +1199,24 @@
ClientRequest cachedRequest = produceNewClientRequestObject();
cachedRequest.setSynchronousInvocation();
// send CAS. This call does not block. Instead we will block the sending thread below.
- sendCAS(aCAS, cachedRequest);
+ casReferenceId = sendCAS(aCAS, cachedRequest);
if ( threadMonitor != null && threadMonitor.getMonitor() != null)
{
- // Block here
- synchronized (threadMonitor.getMonitor())
+ // Block here waiting for reply
+ synchronized (threadMonitor.getMonitor())
+ {
+ // Block sending thread until a reply is received
+ while (!threadMonitor.wasSignaled && running)
{
- // Block sending thread until a reply is received
- while (!threadMonitor.wasSignaled && running)
+ try
+ {
+ threadMonitor.getMonitor().wait();
+ }
+ catch (InterruptedException e)
{
- try
- {
- threadMonitor.getMonitor().wait();
- }
- catch (InterruptedException e)
- {
- }
}
}
+ }
}
try
{
@@ -1173,7 +1226,7 @@
}
// Process reply in the sending thread
Message message = cachedRequest.getMessage();
- handleProcessReply(message, false, pt);
+ deserializeAndCompleteProcessingReply( casReferenceId, message, cachedRequest, pt, false );
}
catch( ResourceProcessException rpe )
{
@@ -1187,10 +1240,27 @@
{
threadMonitor.reset();
}
+ return casReferenceId;
+ }
+ private void deserializeAndCompleteProcessingReply( String casReferenceId, Message message, ClientRequest cachedRequest, ProcessTrace pt, boolean doNotify ) throws Exception
+ {
+ int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ if ( message.propertyExists(AsynchAEMessage.CasSequence) )
+ {
+ handleProcessReplyFromCasMultiplier( message, casReferenceId, payload);//, cachedRequest);
+ }
+ else
+ {
+ long t1 = System.nanoTime();
+ CAS cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest);
+ cachedRequest.setDeserializationTime(System.nanoTime() - t1);
+ completeProcessingReply( cas, casReferenceId, payload, doNotify, message, cachedRequest, pt);
+ }
}
- public void sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException
+
+ public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException
{
- sendAndReceiveCAS( aCAS, null );
+ return sendAndReceiveCAS( aCAS, null );
}
protected void notifyOnTimout(CAS aCAS, String anEndpoint, int aTimeoutKind, String casReferenceId)
@@ -1242,18 +1312,19 @@
if ( cachedRequest.isSynchronousInvocation() )
{
- // Signal a thread that we received a reply, if in the map
- if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
- {
- ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
- // Unblock the sending thread so that it can complete processing with an error
- synchronized( threadMonitor.getMonitor() )
- {
- threadMonitor.setWasSignaled();
- threadMonitor.getMonitor().notifyAll();
- }
- }
- }
+ // Signal a thread that we received a reply, if in the map
+ if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
+ {
+ ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
+ // Unblock the sending thread so that it can complete processing with an error
+ synchronized( threadMonitor.getMonitor() )
+ {
+ threadMonitor.setWasSignaled();
+ cachedRequest.setReceivedProcessCasReply(); // should not be needed
+ threadMonitor.getMonitor().notifyAll();
+ }
+ }
+ }
else
{
// notify the application listener with the error
@@ -1275,7 +1346,13 @@
break;
} // case
}
-
+ /**
+ * @override
+ */
+ protected MessageProducer getMessageProducer( Destination destination ) throws Exception
+ {
+ return null;
+ }
public class ClientRequest
{
private Timer timer = null;
@@ -1304,7 +1381,7 @@
private String endpoint;
-// private boolean receivedProcessCasReply = false;
+ private boolean receivedProcessCasReply = false;
private long threadId=-1;
@@ -1407,6 +1484,10 @@
{
return threadId;
}
+ public void setReceivedProcessCasReply()
+ {
+ receivedProcessCasReply = true;
+ }
public void setMetadataTimeout( int aTimeout )
{
metadataTimeout = aTimeout;
@@ -1494,9 +1575,6 @@
}
}
- // TODO: This needs to be done elsewhere
- //removeEntry(casReferenceId);
-
int timeOutKind;
if (isMetaRequest())
{
@@ -1515,6 +1593,7 @@
{
timeOutKind = CpCTimeout;
receivedCpcReply = true;// not really but simulate receving the meta so that we unblock the monitor
+
synchronized( cpcGate )
{
cpcGate.notifyAll();
@@ -1637,4 +1716,12 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "onException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING", new Object[] { aDestination, aFailure });
stop();
}
+ /**
+ * @override
+ */
+ protected void setReleaseCASMessage(TextMessage msg, String aCasReferenceId)
+ throws Exception
+ {
+ }
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java Fri Aug 22 11:52:48 2008
@@ -19,6 +19,7 @@
package org.apache.uima.adapter.jms.client;
+import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
@@ -33,4 +34,7 @@
public TextMessage createTextMessage() throws Exception;
public void doStop();
+
+ public MessageProducer getMessageProducer(Destination destination) throws Exception;
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java Fri Aug 22 11:52:48 2008
@@ -51,6 +51,42 @@
}
+ public JmsMessageContext(Message aMessage, String anEndpointName) throws AsynchAEException
+ {
+ this();
+ endpointName = anEndpointName;
+ message = aMessage;
+ try
+ {
+ String msgFrom = (String)aMessage.getStringProperty(AsynchAEMessage.MessageFrom);
+ if ( msgFrom != null )
+ {
+ endpoint.setEndpoint( msgFrom);
+ }
+ if ( aMessage.getJMSReplyTo() != null )
+ {
+ endpoint.setDestination(aMessage.getJMSReplyTo());
+ }
+ if ( aMessage.propertyExists(UIMAMessage.ServerURI) )
+ {
+ String selectedServerURI = chooseServerURI(aMessage.getStringProperty(UIMAMessage.ServerURI));
+ endpoint.setServerURI(selectedServerURI);
+ endpoint.setRemote(endpoint.getServerURI().startsWith("vm")==false);
+ }
+ // Check if the client attached a special property that needs to be echoed back.
+ // This enables the client to match the reply with the endpoint.
+ if ( aMessage.propertyExists(AsynchAEMessage.EndpointServer))
+ {
+ endpoint.setRemote(true);
+ endpoint.setEndpointServer(aMessage.getStringProperty(AsynchAEMessage.EndpointServer));
+ }
+
+ }
+ catch( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+ }
public String getEndpointName()
{
return endpointName;
@@ -85,60 +121,6 @@
return serverURI;
}
- public JmsMessageContext(Message aMessage, String anEndpointName) throws AsynchAEException
- {
- this();
- endpointName = anEndpointName;
- message = aMessage;
- try
- {
- String msgFrom = (String)aMessage.getStringProperty(AsynchAEMessage.MessageFrom);
- if ( msgFrom != null )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_message_from__FINE",
- new Object[] { msgFrom, aMessage.getStringProperty(UIMAMessage.ServerURI) });
- endpoint.setEndpoint( msgFrom);
- }
- else
- {
- // Undefined sender of the message. This may be ok.
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_reply_queue_not_defined__WARNING");
- }
- if ( aMessage.getJMSReplyTo() != null )
- {
- endpoint.setDestination(aMessage.getJMSReplyTo());
- }
- if ( aMessage.getStringProperty(UIMAMessage.ServerURI) != null )
- {
-
- String selectedServerURI = chooseServerURI(aMessage.getStringProperty(UIMAMessage.ServerURI));
-
- endpoint.setServerURI(selectedServerURI);
- endpoint.setRemote(endpoint.getServerURI().startsWith("vm")==false);
- }
- else if ( aMessage.getIntProperty(AsynchAEMessage.MessageType) != AsynchAEMessage.Response)
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "JmsMessageContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_reply_queue_server_not_defined__WARNING");
- }
-// if ( aMessage.getBooleanProperty(AsynchAEMessage.RemoveEndpoint))
-// {
-//endpoint.setRemove(true);
-//System.out.println("Remove Endpoint is set:"+endpoint.remove());
-// }
-// else
-// {
-// System.out.println("Remove Endpoint is not set");
-//
-// }
- }
- catch( Exception e)
- {
- throw new AsynchAEException(e);
- }
- }
public boolean propertyExists(String aKey) throws AsynchAEException
{
try
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=688173&r1=688172&r2=688173&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Fri Aug 22 11:52:48 2008
@@ -147,3 +147,5 @@
UIMAJMS_awaiting_container_init__INFO = Uima EE Client Blocking - Awaiting Top Level Controller Initialization Notification
UIMAJMS_container_init_exception__WARNING = Top Level Controller Initialization Exception. Cause: {0}
UIMAJMS_terminate_service_dueto_bad_broker__WARNING = Controller: {0} Unable To Establish Connection To Broker: {1} - Stopping Service
+UIMAJMS_client_sending_release_cas_FINEST = Client Sent Free Cas Notification to Destination: {0} For Cas: {1}
+UIMAJMS_activated_fcq__CONFIG = >>>> Cas Multiplier Controller: {0} Activated Listener To Receive Free CAS Notifications From Clients - Temp Queue Name: {1}