You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2008/06/02 23:07:17 UTC
svn commit: r662564 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima:
adapter/jms/client/BaseMessageSender.java
adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
jms/error/handler/JMSExceptionHandler.java
Author: schor
Date: Mon Jun 2 14:07:17 2008
New Revision: 662564
URL: http://svn.apache.org/viewvc?rev=662564&view=rev
Log:
[UIMA-1056] patch 2
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=662564&r1=662563&r2=662564&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Mon Jun 2 14:07:17 2008
@@ -119,6 +119,7 @@
/**
* Signals any object that waits for the worker thread to initialize
*/
+
private void signal() {
synchronized (this) {
this.notifyAll();
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=662564&r1=662563&r2=662564&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Jun 2 14:07:17 2008
@@ -506,13 +506,11 @@
* Sends a given CAS for analysis to the UIMA EE Service.
*
*/
- private synchronized String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
+ private String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
{
String casReferenceId = requestToCache.getCasReferenceId();
try
{
-// waitUntilReadyToSendMessage(AsynchAEMessage.Process);
-
if (!running)
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_sending_cas_INFO", new Object[] { "Asynchronous Client is Stopping" });
@@ -550,7 +548,11 @@
pendingMessageList.add(msg);
pendingMessageList.notifyAll();
}
- howManySent++;
+
+ synchronized (cpcGate)
+ {
+ howManySent++;
+ }
}
catch (Exception e)
{
@@ -778,16 +780,9 @@
finally
{
removeFromCache(casReferenceId);
- if (howManyRecvd == howManySent)
+ synchronized (cpcGate)
{
- synchronized (cpcGate)
- {
- cpcGate.notifyAll();
- }
- }
- if (howManyRecvd == howManySent)
- {
- synchronized (cpcGate)
+ if (howManyRecvd == howManySent)
{
cpcGate.notifyAll();
}
@@ -822,7 +817,7 @@
if ( doNotify )
{
ProcessTrace pt = new ProcessTrace_impl();
- UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+ UimaASProcessStatusImpl status = null; // new UimaASProcessStatusImpl(pt);
String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
if ( casReferenceId != null && casReferenceId.trim().length() > 0)
{
@@ -1039,17 +1034,6 @@
}
else if (AsynchAEMessage.Process == command)
{
-/*
- if (receiveWindow > 0)
- {
- synchronized (gater)
- {
- howManyBeforeReplySeen--;
- gater.notifyAll();
- }
-
- }
-*/
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
String casReferenceId =
@@ -1110,7 +1094,6 @@
synchronized( threadMonitor.getMonitor() )
{
threadMonitor.setWasSignaled();
- cachedRequest.setReceivedProcessCasReply();
threadMonitor.getMonitor().notifyAll();
}
}
@@ -1128,16 +1111,6 @@
e.printStackTrace();
}
}
-/*
- private void sendRequestToReleaseCas( String aCasReferenceId, Endpoint anEndpoint ) throws Exception
- {
- MessageProducer msgProducer =
- lookupProducerForEndpoint( anEndpoint );
- TextMessage tm = producerSession.createTextMessage("");
- setReleaseCASMessage(tm, aCasReferenceId);
- msgProducer.send(tm);
- }
-*/
/**
* Gets the ProcessingResourceMetadata for the asynchronous AnalysisEngine.
*/
@@ -1155,7 +1128,6 @@
{
throw new ResourceProcessException( new Exception("Uima EE Client Not In Running State"));
}
- String casReferenceId = null;
// keep handle to CAS, we'll deserialize into this same CAS later
sendAndReceiveCAS = aCAS;
@@ -1174,7 +1146,7 @@
ClientRequest cachedRequest = produceNewClientRequestObject();
cachedRequest.setSynchronousInvocation();
// send CAS. This call does not block. Instead we will block the sending thread below.
- casReferenceId = sendCAS(aCAS, cachedRequest);
+ sendCAS(aCAS, cachedRequest);
if ( threadMonitor != null && threadMonitor.getMonitor() != null)
{
// Block here
@@ -1263,48 +1235,46 @@
}
// Store the total latency for this CAS. The departure time is set right before the CAS
// is sent to a service.
- //TODO set to process timeout value in nanos
cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
- // mark timeout exception
- cachedRequest.setTimeoutException();
+ // mark timeout exception
+ cachedRequest.setTimeoutException();
- if ( cachedRequest.isSynchronousInvocation() )
- {
- // Signal a thread that we received a reply, if in the map
- if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
- {
- ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
- // Unblock the sending thread so that it can complete processing with an error
- synchronized( threadMonitor.getMonitor() )
- {
- threadMonitor.setWasSignaled();
- cachedRequest.setReceivedProcessCasReply(); // should not be needed
- threadMonitor.getMonitor().notifyAll();
- }
- }
- }
- else {
- // notify the application listener with the error
- exc = new UimaASProcessCasTimeout();
- status.addEventStatus("Process", "Failed", exc);
- notifyListeners(aCAS, status, AsynchAEMessage.Process);
- }
- cachedRequest.removeEntry(casReferenceId);
-
- synchronized (gater) {
- if (howManyBeforeReplySeen > 0) {
- howManyBeforeReplySeen--;
- }
- //TODO what is being notified???
- gater.notifyAll();
- }
- howManyRecvd++; // increment global counter to enable CPC request to be sent when howManySent = howManyRecvd
- break;
- }
-
+ if ( cachedRequest.isSynchronousInvocation() )
+ {
+ // Signal a thread that we received a reply, if in the map
+ if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
+ {
+ ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
+ // Unblock the sending thread so that it can complete processing with an error
+ synchronized( threadMonitor.getMonitor() )
+ {
+ threadMonitor.setWasSignaled();
+ threadMonitor.getMonitor().notifyAll();
+ }
+ }
+ }
+ else
+ {
+ // notify the application listener with the error
+ exc = new UimaASProcessCasTimeout();
+ status.addEventStatus("Process", "Failed", exc);
+ notifyListeners(aCAS, status, AsynchAEMessage.Process);
+ }
+ cachedRequest.removeEntry(casReferenceId);
- }
+ synchronized (gater)
+ {
+ if (howManyBeforeReplySeen > 0)
+ {
+ howManyBeforeReplySeen--;
+ }
+ gater.notifyAll();
+ howManyRecvd++; // increment global counter to enable CPC request to be sent when howManySent = howManyRecvd
+ }
+ break;
+ } // case
+ }
public class ClientRequest
{
@@ -1334,7 +1304,7 @@
private String endpoint;
- private boolean receivedProcessCasReply = false;
+// private boolean receivedProcessCasReply = false;
private long threadId=-1;
@@ -1437,10 +1407,6 @@
{
return threadId;
}
- public void setReceivedProcessCasReply()
- {
- receivedProcessCasReply = true;
- }
public void setMetadataTimeout( int aTimeout )
{
metadataTimeout = aTimeout;
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java?rev=662564&r1=662563&r2=662564&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java Mon Jun 2 14:07:17 2008
@@ -81,7 +81,6 @@
System.out.println("Handling JMS Connect Exception Due To::"+exception.getLocalizedMessage());
System.out.println("Exception Cause::"+exception.getClass().getName()+":::Message::"+exception.getLocalizedMessage());
-// String casReferenceId = (String)anErrorContext.get(AsynchAEMessage.CasReferenceId);
String casReferenceId = (String)anErrorContext.get(AsynchAEMessage.CasReference);
Endpoint endpoint = (Endpoint)anErrorContext.get(AsynchAEMessage.Endpoint);
@@ -94,11 +93,14 @@
try
{
entry = aController.getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ if ( endpoint.isRemote() && entry != null )
+ {
+ aController.dropCAS(casReferenceId, true );
+ }
}
- catch( AsynchAEException e) {}
- if ( endpoint.isRemote() && entry != null )
+ catch( AsynchAEException e)
{
- aController.dropCAS(casReferenceId, true );
+ System.out.println("Cas:"+casReferenceId+" Not Found In the Cache.");
}
}
}