You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/28 17:27:35 UTC
svn commit: r798563 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Author: cwiklik
Date: Tue Jul 28 15:27:32 2009
New Revision: 798563
URL: http://svn.apache.org/viewvc?rev=798563&view=rev
Log:
UIMA-1436 Replaced wait-notify synchronization with semaphores
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=798563&r1=798562&r2=798563&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Jul 28 15:27:32 2009
@@ -30,7 +30,6 @@
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
@@ -103,8 +102,6 @@
protected AsynchAECasManager asynchManager;
- protected Object metadataReplyMonitor = new Object();
-
protected boolean remoteService = false;
protected CollectionReader collectionReader = null;
@@ -138,8 +135,6 @@
protected Exception exc;
- protected long howManySent = 0;
-
// Counter maintaining a number of CASes sent to a service. The counter
// is incremented every time a CAS is sent and decremented when the CAS
// reply is received. It is also adjusted down in case of a timeout or
@@ -148,10 +143,6 @@
protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
- protected volatile boolean receivedMetaReply;
-
- protected volatile boolean receivedCpcReply;
-
protected MessageConsumer consumer = null;
protected String serializationStrategy = "xmi";
@@ -262,9 +253,9 @@
}
private void addMessage(PendingMessage msg ) {
- receivedCpcReply = false;
pendingMessageQueue.add(msg);
}
+
protected void acquireCpcReadySemaphore() {
try {
// Acquire cpcReady semaphore to block sending CPC request until
@@ -355,10 +346,7 @@
Long key = (Long)it.next();
CasQueueEntry entry = threadRegistrar.get(key);
if ( entry != null ) {
- synchronized( entry.getMonitor() ) {
- entry.signal();
- entry.getMonitor().notifyAll();
- }
+ entry.getSemaphore().release();
}
}
}
@@ -378,6 +366,9 @@
running = false;
casQueueProducerReady = false;
uimaSerializer.reset();
+ if ( serviceDelegate != null ) {
+ serviceDelegate.cancelDelegateTimer();
+ }
try
{
try {
@@ -387,10 +378,6 @@
ex.printStackTrace();
}
- synchronized( threadQueue ) {
- threadQueue.notifyAll();
- }
-
// Unblock threads
if( threadMonitorMap.size() > 0 )
{
@@ -404,24 +391,15 @@
{
continue;
}
- synchronized( threadMonitor.getMonitor())
- {
- threadMonitor.setWasSignaled();
- threadMonitor.getMonitor().notifyAll();
- }
+ threadMonitor.getMonitor().release();
}
}
cpcReadySemaphore.release();
outstandingCasRequests.set(0); // reset global counter of outstanding requests
cpcReplySemaphore.release();
- receivedCpcReply = true;
+ getMetaSemaphore.release();
- synchronized(metadataReplyMonitor)
- {
- receivedMetaReply = true;
- metadataReplyMonitor.notifyAll();
- }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO", new Object[] {});
}
@@ -495,11 +473,12 @@
new Object[] {"Time Waiting for CAS", (double)waitingTime / (double)1000000});
}
if ( running ) { // only if the client is still running handle the new cas
- // Associate the CAS with the entry and wake up the Consumer thread
- entry.setCas(cas);
- synchronized( entry.getMonitor() ) {
- entry.signal();
- entry.getMonitor().notifyAll();
+ try {
+ entry.getSemaphore().acquire();
+ // Associate the CAS with the entry and wake up the Consumer thread
+ entry.setCas(cas);
+ } finally {
+ entry.getSemaphore().release();
}
} else {
return; // Client is terminating
@@ -538,16 +517,23 @@
CasQueueEntry entry = getQueueEntry( Thread.currentThread().getId());
// Add this thread entry to the queue of threads waiting for a CAS
threadQueue.add(entry);
- // Wait until the CAS producer adds the CAS to the CasQueueEntry and
- // signals CAS availability.
- while( running && !entry.signaled() ) {
- // Wait until the producer is ready
- synchronized( entry.getMonitor()) {
- entry.getMonitor().wait(100);
- }
+ if ( entry != null ) {
+ while (running) {
+ try {
+ // Wait until the CAS producer adds the CAS to the CasQueueEntry and
+ // signals CAS availability.
+ entry.getSemaphore().acquire();
+ if (entry.getCas() == null) {
+ continue;
+ } else {
+ return entry.getCas();
+ }
+ } finally {
+ entry.getSemaphore().release();
+ }
+ } // while
}
- // This may return null *if* the client is terminating
- return entry.getCas();
+ return null; // client has terminated
}
private CasQueueEntry getQueueEntry(long aThreadId ) {
@@ -564,17 +550,12 @@
return entry;
}
- protected void reset()
- {
- receivedCpcReply = false;
- receivedMetaReply = false;
+ protected void reset() {
}
private class CasQueueEntry {
private CAS cas;
- private Object monitor = new Object();
private Semaphore semaphore = new Semaphore(1);
- private volatile boolean signaled = false;
public CAS getCas() {
return cas;
}
@@ -587,20 +568,7 @@
public Semaphore getSemaphore() {
return semaphore;
}
- public Object getMonitor() {
- return monitor;
- }
- public void setMonitor(Object monitor) {
- this.monitor = monitor;
- }
- public boolean signaled() {
- return signaled;
- }
- public void signal() {
- signaled = true;
- }
public void reset() {
- signaled = false;
cas = null;
}
@@ -636,23 +604,20 @@
cpcReplySemaphore.release();
}
}
-
+ /**
+ * Blocks while trying to acquire a semaphore awaiting receipt of GetMeta Reply.
+ * When the GetMeta is received, or there is a timeout, or the client stops the
+ * semaphore will be released.
+ */
protected void waitForMetadataReply()
{
- synchronized (metadataReplyMonitor)
- {
- while (!receivedMetaReply)
- {
- try
- {
- // This monitor is dedicated to single purpose event.
- metadataReplyMonitor.wait();
- }
- catch (Exception e)
- {
- }
- }
- }
+ try {
+ getMetaSemaphore.acquire();
+ } catch( InterruptedException e) {
+
+ } finally {
+ getMetaSemaphore.release();
+ }
}
public String getPerformanceReport()
@@ -789,7 +754,7 @@
}
// Incremented number of outstanding CASes sent to a service. When a reply comes
// this counter is decremented
- outstandingCasRequests.incrementAndGet();
+ long outstandingCasCount = outstandingCasRequests.incrementAndGet();
// Add message to the pending queue
addMessage(msg);
}
@@ -861,7 +826,6 @@
cleanup(); //Make the receiving thread to complete
// Release the semaphore acquired in collectionProcessingComplete()
cpcReplySemaphore.release();
- receivedCpcReply = true;
}
}
@@ -882,7 +846,6 @@
if ( message.getJMSReplyTo() != null ) {
serviceDelegate.setFreeCasDestination(message.getJMSReplyTo());
}
-
// Check if this is a reply for Ping sent in response to a timeout
if ( serviceDelegate.isAwaitingPingReply() ) {
serviceDelegate.resetAwaitingPingReply();
@@ -900,10 +863,7 @@
Iterator it = threadMonitorMap.entrySet().iterator();
while( it.hasNext() ) {
threadMonitor = ((Entry<Long, ThreadMonitor>)it.next()).getValue();
- synchronized(threadMonitor.getMonitor()) {
- // Awake the send thread
- threadMonitor.getMonitor().notifyAll();
- }
+ threadMonitor.getMonitor().release();
}
} else {
// Asynch API used for sending outgoing messages.
@@ -925,59 +885,57 @@
}
int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
removeFromCache(uniqueIdentifier);
- if (AsynchAEMessage.Exception == payload)
- {
- ProcessTrace pt = new ProcessTrace_impl();
- UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
- Exception exception = retrieveExceptionFromMessage(message);
- clientSideJmxStats.incrementMetaErrorCount();
- status.addEventStatus("GetMeta", "Failed", exception);
- notifyListeners(null, status, AsynchAEMessage.GetMeta);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
- }
- synchronized( metadataReplyMonitor )
- {
- abort = true;
- receivedMetaReply = true; // not really but simulate receiving the meta so that we unblock the monitor
- initialized = false;
- metadataReplyMonitor.notifyAll();
- }
- }
- else
- {
- // Check serialization supported by the service against client configuration.
- // If the client is configured to use Binary serialization *but* the service
- // doesnt support it, change the client serialization to xmi. Old services will
- // not return in a reply the type of serialization supported which implies "xmi".
- // New services *always* return "binary" as a default serialization. The client
- // however may still want to serialize messages using xmi though.
- if ( !message.propertyExists(AsynchAEMessage.Serialization)) {
- // Dealing with an old service here, check if there is a mismatch with the
- // client configuration. If the client is configured with binary serialization
- // override this and change serialization to "xmi".
- if ( getSerializationStrategy().equalsIgnoreCase("binary")) {
- System.out.println("\n\t***** WARNING: Service Doesn't Support Binary Serialization. Client Defaulting to XMI Serialization\n");
- // Override configured serialization
- setSerializationStrategy("xmi");
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] { });
- }
- }
- String meta = ((TextMessage) message).getText();
- ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
- XMLInputSource in1 = new XMLInputSource(bis, null);
- // Adam - store ResouceMetaData in field so we can return it from getMetaData().
- resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser().parseResourceMetaData(in1);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_meta_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
- }
- asynchManager.addMetadata(resourceMetadata);
- synchronized (metadataReplyMonitor)
- {
- receivedMetaReply = true;
- metadataReplyMonitor.notifyAll();
- }
+
+ try {
+ if (AsynchAEMessage.Exception == payload)
+ {
+ ProcessTrace pt = new ProcessTrace_impl();
+ UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+ Exception exception = retrieveExceptionFromMessage(message);
+ clientSideJmxStats.incrementMetaErrorCount();
+ status.addEventStatus("GetMeta", "Failed", exception);
+ notifyListeners(null, status, AsynchAEMessage.GetMeta);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
+ }
+ abort = true;
+ initialized = false;
+ }
+ else
+ {
+ // Check serialization supported by the service against client configuration.
+ // If the client is configured to use Binary serialization *but* the service
+ // doesnt support it, change the client serialization to xmi. Old services will
+ // not return in a reply the type of serialization supported which implies "xmi".
+ // New services *always* return "binary" as a default serialization. The client
+ // however may still want to serialize messages using xmi though.
+ if ( !message.propertyExists(AsynchAEMessage.Serialization)) {
+ // Dealing with an old service here, check if there is a mismatch with the
+ // client configuration. If the client is configured with binary serialization
+ // override this and change serialization to "xmi".
+ if ( getSerializationStrategy().equalsIgnoreCase("binary")) {
+ System.out.println("\n\t***** WARNING: Service Doesn't Support Binary Serialization. Client Defaulting to XMI Serialization\n");
+ // Override configured serialization
+ setSerializationStrategy("xmi");
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] { });
+ }
+ }
+ String meta = ((TextMessage) message).getText();
+ ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
+ XMLInputSource in1 = new XMLInputSource(bis, null);
+ // Adam - store ResouceMetaData in field so we can return it from getMetaData().
+ resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser().parseResourceMetaData(in1);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_meta_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
+ }
+ asynchManager.addMetadata(resourceMetadata);
+ }
+
+ } catch( Exception e) {
+ throw e;
+ } finally {
+ getMetaSemaphore.release();
}
}
@@ -1061,12 +1019,9 @@
// of the reply. The message has been stored in the cache and
// when the thread wakes up due to notification below, it will
// retrieve the reply and process it.
- if ( threadMonitor != null && threadMonitor.getMonitor() != null) {
- synchronized( threadMonitor.getMonitor() )
- {
- threadMonitor.setWasSignaled();
- threadMonitor.getMonitor().notifyAll();
- }
+ if ( threadMonitor != null ) {
+ cachedRequest.setReceivedProcessCasReply();
+ threadMonitor.getMonitor().release();
}
}
@@ -1117,9 +1072,6 @@
}
int payload = -1;
String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
-
-
-
// Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
if (message.propertyExists(AsynchAEMessage.Payload))
{
@@ -1132,8 +1084,16 @@
if ( casReferenceId != null ) {
cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
+ // Incremente number of replies
+ if ( cachedRequest != null && casReferenceId.equals(cachedRequest.getCasReferenceId()) )
+ {
+ // Received a reply, decrement number of outstanding CASes
+ long outstandingCasCount = outstandingCasRequests.decrementAndGet();
+ if ( outstandingCasCount == 0) {
+ cpcReadySemaphore.release();
+ }
+ }
}
-
if (AsynchAEMessage.Exception == payload)
{
handleException(message, cachedRequest, true);
@@ -1286,9 +1246,7 @@
cachedRequest.setProcessException();
}
}
- receivedCpcReply = true; // change state as if the CPC reply came in. This is done to prevent a hang on CPC request
// release the semaphore acquired in collectionProcessingComplete
- cpcReplySemaphore.release();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
@@ -1348,15 +1306,6 @@
}
removeFromCache(casReferenceId);
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
- // Check if CASes sent == CASes received
- long outstandingCasCount = 0;
- if ( ( outstandingCasCount = outstandingCasRequests.get()) == 0) {
- // Received ALL replies, lower CPC latch so that a CPC request
- // is sent
- cpcReadySemaphore.release();
- } else {
- System.out.println("UIMA AS Client Outstanding CAS Count="+outstandingCasCount);
- }
}
}
private void completeProcessingReply( CAS cas, String casReferenceId, int payload, boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt )
@@ -1368,13 +1317,6 @@
{
pt = new ProcessTrace_impl();
}
- // Incremente number of replies
- if ( casReferenceId.equals(cachedRequest.getCasReferenceId()) )
- {
- // Received a reply, decrement number of outstanding CASes
- outstandingCasRequests.decrementAndGet();
- }
-
try
{
// Log stats and populate ProcessTrace object
@@ -1407,14 +1349,6 @@
}
}
removeFromCache(casReferenceId);
- long outstandingCasCount = 0;
- if ( ( outstandingCasCount = outstandingCasRequests.get()) == 0) {
- System.out.println("UIMA AS Client Releasing cpcReadySemaphore");
- cpcReadySemaphore.release();
-// cpcReadyLatch.countDown();
- } else {
- System.out.println("UIMA AS Client Outstanding CAS Count="+outstandingCasCount);
- }
}
}
}
@@ -1687,51 +1621,66 @@
ClientRequest cachedRequest = produceNewClientRequestObject();
cachedRequest.setSynchronousInvocation();
-
+ // This is synchronous call, acquire and hold the semaphore before
+ // dispatching a CAS to a service. The semaphore will be released
+ // iff:
+ // a) reply is received (success or failure with exception)
+ // b) timeout occurs
+ // c) client is stopped
+ // Once the semaphore is acquired and the CAS is dispatched
+ // the thread will block in trying to acquire the semaphore again
+ // below.
+ try {
+ threadMonitor.getMonitor().acquire();
+ } catch( InterruptedException e) {
+ System.out.println("UIMA AS Client Received Interrrupt While Acquiring Monitor Semaphore in sendAndReceive()");
+ }
// send CAS. This call does not block. Instead we will block the sending thread below.
casReferenceId = sendCAS(aCAS, cachedRequest);
if (threadMonitor != null && threadMonitor.getMonitor() != null) {
- // Block here waiting for reply
- synchronized (threadMonitor.getMonitor()) {
- // Block sending thread until a reply is received. The thread
- // will be signaled either when a reply to the request just
- // sent is received OR a Ping reply was received. The latter
- // is necessary to allow handling of CASes delayed due to
- // a timeout. A previous request timed out and the service
- // state was changed to TIMEDOUT. While the service is in this
- // state all sending threads add outstanding CASes to the list
- // of CASes pending dispatch and each waits until the state
- // of the service changes to OK. The state is changed to OK
- // when the client receives a reply to a PING request. When
- // the Ping reply comes, the client will signal this thread.
- // The thread checks the list of CASes pending dispatch trying
- // to find an entry that matches ID of the CAS previously
- // delayed. If the CAS is found in the delayed list, it will
- // be removed from the list and send to the service for
- // processing. The 'wasSignaled' flag is only set when the
- // CAS reply is received. Ping reply logic does not change
- // this flag.
- while (!threadMonitor.wasSignaled && running) {
- try {
- threadMonitor.getMonitor().wait();
- // Send thread was awoken by either process reply or ping reply
- // If there service is in the ok state and the CAS is in the
- // list of CASes pending dispatch, remove the CAS from the list
- // and send it to the service.
- if (cachedRequest.isTimeoutException() ||
- cachedRequest.isProcessException() ) {
- // Handled below
- break;
- }
- if ( running && serviceDelegate.getState() == Delegate.OK_STATE &&
- serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
- sendCAS(aCAS, cachedRequest);
- }
- } catch (InterruptedException e) {
+ while( running ) {
+ try {
+ // Block sending thread until a reply is received. The thread
+ // will be signaled either when a reply to the request just
+ // sent is received OR a Ping reply was received. The latter
+ // is necessary to allow handling of CASes delayed due to
+ // a timeout. A previous request timed out and the service
+ // state was changed to TIMEDOUT. While the service is in this
+ // state all sending threads add outstanding CASes to the list
+ // of CASes pending dispatch and each waits until the state
+ // of the service changes to OK. The state is changed to OK
+ // when the client receives a reply to a PING request. When
+ // the Ping reply comes, the client will signal this thread.
+ // The thread checks the list of CASes pending dispatch trying
+ // to find an entry that matches ID of the CAS previously
+ // delayed. If the CAS is found in the delayed list, it will
+ // be removed from the list and send to the service for
+ // processing. The 'wasSignaled' flag is only set when the
+ // CAS reply is received. Ping reply logic does not change
+ // this flag.
+ threadMonitor.getMonitor().acquire();
+ // Send thread was awoken by either process reply or ping reply
+ // If there service is in the ok state and the CAS is in the
+ // list of CASes pending dispatch, remove the CAS from the list
+ // and send it to the service.
+ if (cachedRequest.isTimeoutException() || cachedRequest.isProcessException() ) {
+ // Handled outside of the while-loop below
+ break;
}
+ if ( running && serviceDelegate.getState() == Delegate.OK_STATE &&
+ serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
+ sendCAS(aCAS, cachedRequest);
+ } else {
+ break; // done here, received a reply or the client is not running
+ }
+ } catch( InterruptedException e ) {
+
+ } finally {
+ threadMonitor.getMonitor().release();
}
}
- }
+ } // if
+
if ( abort ) {
throw new ResourceProcessException(new RuntimeException("Uima AS Client API Stopping"));
}
@@ -1754,10 +1703,7 @@
throw rpe;
} catch (Exception e) {
throw new ResourceProcessException(e);
- } finally {
- // reset 'wasSignaled' flag
- threadMonitor.reset();
- }
+ }
return casReferenceId;
}
private void deserializeAndCompleteProcessingReply( String casReferenceId, Message message, ClientRequest cachedRequest, ProcessTrace pt, boolean doNotify ) throws Exception
@@ -1810,11 +1756,8 @@
}
status.addEventStatus("GetMeta", "Failed", new UimaASMetaRequestTimeout());
notifyListeners(null, status, AsynchAEMessage.GetMeta);
- synchronized (metadataReplyMonitor)
- {
- abort = true;
- metadataReplyMonitor.notifyAll();
- }
+ abort = true;
+ getMetaSemaphore.release();
break;
case (PingTimeout):
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1830,11 +1773,7 @@
anyCasRequest.getCAS().release();
}
}
- synchronized (metadataReplyMonitor)
- {
- abort = true;
- metadataReplyMonitor.notifyAll();
- }
+ abort = true;
break;
case (CpCTimeout):
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1842,7 +1781,6 @@
}
status.addEventStatus("CpC", "Failed", new UimaASCollectionProcessCompleteTimeout());
notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
- receivedCpcReply = true;
// release the semaphore acquired in collectionProcessingComplete()
cpcReplySemaphore.release();
break;
@@ -1875,12 +1813,9 @@
{
ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
// Unblock the sending thread so that it can complete processing with an error
- if ( threadMonitor != null && threadMonitor.getMonitor() != null) {
- synchronized( threadMonitor.getMonitor() )
- {
- threadMonitor.setWasSignaled();
- threadMonitor.getMonitor().notifyAll();
- }
+ if ( threadMonitor != null ) {
+ threadMonitor.getMonitor().release();
+ cachedRequest.setReceivedProcessCasReply(); // should not be needed
}
}
}
@@ -1894,9 +1829,12 @@
cachedRequest.removeEntry(casReferenceId);
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
// Check if all replies have been received
- if ( outstandingCasRequests.get() == 0) {
+ long outstandingCasCount = outstandingCasRequests.decrementAndGet();
+ if ( outstandingCasCount == 0) {
cpcReadySemaphore.release();
- }
+ } else {
+ System.out.println("UIMA AS Client Received Process Timeout - Outstanding CAS Count="+outstandingCasCount);
+ }
break;
} // case
}
@@ -1935,6 +1873,7 @@
private String endpoint;
+ private volatile boolean receivedProcessCasReply = false;
private long threadId=-1;
private Message message;
@@ -2068,6 +2007,10 @@
{
return threadId;
}
+ public void setReceivedProcessCasReply()
+ {
+ receivedProcessCasReply = true;
+ }
public void setMetadataTimeout( int aTimeout )
{
metadataTimeout = aTimeout;
@@ -2185,16 +2128,11 @@
abort = true;
metaTimeoutErrorCount++;
clientSideJmxStats.incrementMetaTimeoutErrorCount();
- synchronized( metadataReplyMonitor )
- {
- receivedMetaReply = true; // not really but simulate receving the meta so that we unblock the monitor
- metadataReplyMonitor.notifyAll();
- }
+ getMetaSemaphore.release();
}
else if (isCPCRequest())
{
timeOutKind = CpCTimeout;
- receivedCpcReply = true;// not really but simulate receving the meta so that we unblock the monitor
cpcReadySemaphore.release();
}
else
@@ -2282,32 +2220,19 @@
protected class ThreadMonitor
{
private long threadId;
- private Object monitor = new Object();
- private volatile boolean wasSignaled = false;
+ private Semaphore monitor = new Semaphore(1);
public ThreadMonitor( long aThreadId )
{
threadId = aThreadId;
}
- public void reset()
- {
- wasSignaled = false;
- }
public long getThreadId()
{
return threadId;
}
- public Object getMonitor()
+ public Semaphore getMonitor()
{
return monitor;
}
- public void setWasSignaled()
- {
- wasSignaled = true;
- }
- public boolean wasSignaled()
- {
- return wasSignaled;
- }
}